Skip to content

Commit

Permalink
Add more calls to bgzf_flush_try.
Browse files Browse the repository at this point in the history
This makes bgzipped SAM, VCF, FASTA and FASTQ start blocks on a new
record (except for the case of a single record being too large to fit
in a single block).

It is a companion PR to #1369
  • Loading branch information
jkbonfield authored and daviesrob committed Feb 25, 2022
1 parent 0d83a7b commit d5a00db
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
75 changes: 73 additions & 2 deletions sam.c
Expand Up @@ -2910,6 +2910,10 @@ ssize_t bam_parse_cigar(const char *in, char **end, bam1_t *b) {
// Number of BAM records (writing)
#define NB 1000

// FIXME: this is too large for ONT data.
// We should have NB as a maximum for allocation purposes, but bail out
// early if it grows beyond NM so we have constant memory usage.

struct SAM_state;

// Output job - a block of BAM records
Expand Down Expand Up @@ -3444,6 +3448,8 @@ static void *sam_dispatcher_write(void *vp) {
i++;

if (fp->is_bgzf) {
if (bgzf_flush_try(fp->fp.bgzf, i-j) < 0)
goto err;
if (bgzf_write(fp->fp.bgzf, &gl->data[j], i-j) != i-j)
goto err;
} else {
Expand Down Expand Up @@ -3483,8 +3489,69 @@ static void *sam_dispatcher_write(void *vp) {
pthread_mutex_unlock(&fd->lines_m);
} else {
if (fp->is_bgzf) {
if (bgzf_write(fp->fp.bgzf, gl->data, gl->data_size) != gl->data_size)
goto err;
// We keep track of how much in the current block we have
// remaining => R. We look for the last newline in input
// [i] to [i+R], backwards => position N.
//
// If we find a newline, we write out bytes i to N.
// We know we cannot fit the next record in this bgzf block,
// so we flush what we have and copy input N to i+R into
// the start of a new block, and recompute a new R for that.
//
// If we don't find a newline (i==N) then we cannot extend
// the current block at all, so flush whatever is in it now
// if it ends on a newline.
// We still copy i(==N) to i+R to the next block and
// continue as before with a new R.
//
// The only exception on the flush is when we run out of
// data in the input. In that case we skip it as we don't
// yet know if the next record will fit.
//
// Both conditions share the same code here:
// - Look for newline (pos N)
// - Write i to N (which maybe 0)
// - Flush if block ends on newline and not end of input
// - write N to i+R

int i = 0;
BGZF *fb = fp->fp.bgzf;
while (i < gl->data_size) {
// remaining space in block
int R = BGZF_BLOCK_SIZE - fb->block_offset;
int eod = 0;
if (R > gl->data_size-i)
R = gl->data_size-i, eod = 1;

// Find last newline in input data
int N = i + R;
while (--N > i) {
if (gl->data[N] == '\n')
break;
}

if (N != i) {
// Found a newline
N++;
if (bgzf_write(fb, &gl->data[i], N-i) != N-i)
goto err;
}

// Flush bgzf block
int b_off = fb->block_offset;
if (!eod && b_off &&
((char *)fb->uncompressed_block)[b_off-1] == '\n')
if (bgzf_flush_try(fb, BGZF_BLOCK_SIZE) < 0)
goto err;

// Copy from N onwards into next block
if (i+R > N)
if (bgzf_write(fb, &gl->data[N], i+R - N)
!= i+R - N)
goto err;

i = i+R;
}
} else {
if (hwrite(fp->fp.hfile, gl->data, gl->data_size) != gl->data_size)
goto err;
Expand Down Expand Up @@ -4348,6 +4415,8 @@ int sam_write1(htsFile *fp, const sam_hdr_t *h, const bam1_t *b)
if (sam_format1(h, b, &fp->line) < 0) return -1;
kputc('\n', &fp->line);
if (fp->is_bgzf) {
if (bgzf_flush_try(fp->fp.bgzf, fp->line.l) < 0)
return -1;
if ( bgzf_write(fp->fp.bgzf, fp->line.s, fp->line.l) != fp->line.l ) return -1;
} else {
if ( hwrite(fp->fp.hfile, fp->line.s, fp->line.l) != fp->line.l ) return -1;
Expand Down Expand Up @@ -4387,6 +4456,8 @@ int sam_write1(htsFile *fp, const sam_hdr_t *h, const bam1_t *b)
if (fastq_format1(fp->state, b, &fp->line) < 0)
return -1;
if (fp->is_bgzf) {
if (bgzf_flush_try(fp->fp.bgzf, fp->line.l) < 0)
return -1;
if (bgzf_write(fp->fp.bgzf, fp->line.s, fp->line.l) != fp->line.l)
return -1;
} else {
Expand Down
Binary file modified test/index.vcf.gz.csi
Binary file not shown.
Binary file modified test/index.vcf.gz.tbi
Binary file not shown.
13 changes: 9 additions & 4 deletions vcf.c
Expand Up @@ -2226,10 +2226,12 @@ int vcf_hdr_write(htsFile *fp, const bcf_hdr_t *h)
}
while (htxt.l && htxt.s[htxt.l-1] == '\0') --htxt.l; // kill trailing zeros
int ret;
if ( fp->format.compression!=no_compression )
if ( fp->format.compression!=no_compression ) {
ret = bgzf_write(fp->fp.bgzf, htxt.s, htxt.l);
else
if (bgzf_flush(fp->fp.bgzf) != 0) return -1;
} else {
ret = hwrite(fp->fp.hfile, htxt.s, htxt.l);
}
free(htxt.s);
return ret<0 ? -1 : 0;
}
Expand Down Expand Up @@ -3401,10 +3403,13 @@ int vcf_write(htsFile *fp, const bcf_hdr_t *h, bcf1_t *v)
fp->line.l = 0;
if (vcf_format1(h, v, &fp->line) != 0)
return -1;
if ( fp->format.compression!=no_compression )
if ( fp->format.compression!=no_compression ) {
if (bgzf_flush_try(fp->fp.bgzf, fp->line.l) < 0)
return -1;
ret = bgzf_write(fp->fp.bgzf, fp->line.s, fp->line.l);
else
} else {
ret = hwrite(fp->fp.hfile, fp->line.s, fp->line.l);
}

if (fp->idx) {
int tid;
Expand Down

0 comments on commit d5a00db

Please sign in to comment.