Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelization of bgzf compression #17

Closed
KHanghoj opened this issue May 17, 2021 · 21 comments
Closed

parallelization of bgzf compression #17

KHanghoj opened this issue May 17, 2021 · 21 comments
Assignees
Labels
bgzf enhancement New feature or request

Comments

@KHanghoj
Copy link

hi,

I used noodles to modify and filter bam files with the primary focus of learning rust. Is it possible to parallelize the compression part when writing to a bam file.

Great package :)

Kristian

@jamestwebber
Copy link
Contributor

Hah! I was just thinking about this as well. Unfortunately I think it would be rather complicated to implement--you'd want to parallelize the compression step but preserve the original ordering when writing out the records. It would be super cool though!

@zaeleus zaeleus added the bgzf label May 18, 2021
@KHanghoj
Copy link
Author

ha.

Given my current rust skills im not of much help but some of the multi-threading in htslib can be found here: https://github.com/samtools/htslib/blob/515f6df8f7f7dab6c80d0e7aede6e60826ef5374/bgzf.c#L1722 i believe.

Dont know if that is of any help.

This is relevant for BAM/bcf format ( i think) as the threads are only used form compression

@jamestwebber
Copy link
Contributor

Note: I'm not @zaeleus so I can't speak for their plans, but I just started implementing some things using the library and parallel writes were on my mind.

I'm a little confused about whether the htslib implementation preserves read order. It looks like they have a threadpool to do the compression and they push the results to a queue, and it seems like that might reorder the reads slightly. Maybe that's accepted behavior, I couldn't find any documentation on this.

I was trying to find a way to preserve ordering but it's a tricky problem, e.g. see this Rayon issue which has been open for quite a while. But I think the rayon folks are looking for an extremely general solution, while the heap-queue thing proposed in that issue would probably work great for BAM processing (maybe with a little parameter tweaking).

I'm not sure I'm gonna try it, but if you want to try implementing it as a way to learn a ton of Rust all at once, I'd love to see it. 😁

@jkbonfield
Copy link

The htslib thread pool is complicated!

The pool has the notion of input (things to do) and output (completed jobs) queues. The output queue can be read from either as they arrive or in strict order of submission. Clearly with BGZF it is critical to get the ordering correct, so that's how it utilises the thread pool.

If we're reading and writing data (eg BAM to BAM, BAM to CRAM, etc) then we have one shared thread pool with multiple input and output queues attached to cover the decode and encode tasks. The tricky bit was stopping deadlocks when a queue fills up, while limiting the number of items (decoding is vastly faster than encoding so you don't want to buffer up most of the uncompressed file because it's running ahead of the encoder tasks).

Lots of "fun" to be had here! :-)

@jamestwebber
Copy link
Contributor

The htslib thread pool is complicated!

My very sparse C skills are definitely not helping. 😁 So the ordering is preserved there, that's cool! I didn't see how that happened.

If we're reading and writing data (eg BAM to BAM, BAM to CRAM, etc) then we have one shared thread pool with multiple input and output queues attached to cover the decode and encode tasks. The tricky bit was stopping deadlocks when a queue fills up, while limiting the number of items (decoding is vastly faster than encoding so you don't want to buffer up most of the uncompressed file because it's running ahead of the encoder tasks).

I was wondering about the use-case of "read a BAM, do something to each record, write a new BAM" (because that's what I want to do). Would the ordering be preserved there? Maybe that's an implementation detail that depends on the application.

@jkbonfield
Copy link

I was wondering about the use-case of "read a BAM, do something to each record, write a new BAM" (because that's what I want to do). Would the ordering be preserved there? Maybe that's an implementation detail that depends on the application.

Yes, for BGZF all ordering is preserved. It needs to be too because BGZF has no guarantees that a gzip block boundary lies on a sequence boundary (and indeed long sequences may need many blocks to cover a single record!). There isn't even a guarantee that if a sequence doesn't fit in a block that it'll start a new BGZF block. It does in our implementation, but it's not required by the specification. That makes random jumping around a bit of a pain in the neck.

@zaeleus
Copy link
Owner

zaeleus commented May 21, 2021

As others have mentioned, I don't think this is trivial. I hope to explore parallel block decompression after #13 and then compression. It will likely be a good opportunity to introduce async interfaces (#9).


One thing that should be noted is that noodles-bgzf uses flate2, which is a frontend to different general-purpose zlib-compatible libraries. The default backend is the Rust implementation miniz_oxide, but you can override it in your own application (Cargo.toml) with other supported implementations, e.g., the more optimized zlib-ng:

flate2 = { version = "1.0.20", default-features = false, features = ["zlib-ng-compat"] }

This improves both decoding and encoding performance but requires building and linking to a C library. Specialized libraries like libdeflate are not supported.

@zaeleus zaeleus added the enhancement New feature or request label May 21, 2021
@d-cameron
Copy link

d-cameron commented Jul 15, 2021

I consider multi-threaded read/write for SAM/BAM/CRAM an essential feature for a high performance htslib-style library. Having written tools using java/htsjdk, I had to port key sections of my program to C/htslib purely because htsjdk doesn't support multithreaded SAM parsing (so a bwa | tool bottlenecks SAM parsing for multithreaded bwa), and BAM parsing didn't scale well enough. I wrote a PR to fix this (samtools/htsjdk#1249) but the Broad never merged it so I have to maintain my own htsjdk fork just for the performance gains from multithreaded BAM reading.

Unfortunately I think it would be rather complicated to implement--you'd want to parallelize the compression step but preserve the original ordering when writing out the records.

I'm not sure why the consensus in this thread is that adding multithread capability is complicated. Speaking from my htsjdk PR experience, I can say that's it's not too bad if you're willing to accept intermediate buffers & an extra data copy. Essentially, there are two steps in SAM.bgz/BAM/VCF.bgz/BCF I/O: compression/decompression and serialisation/parsing.

in-memory format <-> serialised format <-> compressed format

For async multithreaded support, it's just a matter of adding buffers between each of these steps and kicking off the async jobs when it's full. The high-level design of my htsjdk implement goes along the lines of:

stream of input records -> convert to serialised format -> [CP] chuck records together until 64Kb bgz limit -> compress block -> [CP] write to disk

[CP] are the checkpoints where you need to ensure the records are presented to the next step in-order. I implemented this with an incrementing u64 on the input and logic to buffer if recordindex != expectedindex. I used an unbounded threadpool for the actual disk I/O, and a #threads=#cores threadpool for the computational steps (parsing & compression). Since both are purely computational steps a single thread-pool was fine. You can prevent unbounded async queue size through backpressure on the calling thread (for writing) and limiting the amount of read-ahead (for reading).

There's probably a more elegant rust-like design that noodles could take but I'm relatively new to rust so I won't presume to recommend a design.

Downsides of this approach:

  • Need to control read-ahead when doing random seeks. There's no point in pre-loading a MB of records if you're not doing in-order traversal
  • Potential extra data copying as you can't send your serialised records directly to your compression buffer - you need to wait for 64Kb of data before sending off as a compression job. Similarly, you can't write to disk without first checking that there are no earlier blocks that haven't been written yet (i.e. they're still getting compressed on another thread).
  • Higher latency. I buffered records before performing async parsing. Not a big concern for the vast majority of async I/O use cases as they almost exclusely want throughput and don't care about latency.

I've recently taken up rust for my latest tool so, if you're open to it, I'm happy to contribute both code and design inputs. I deal mostly in structural variation analysis and contribute to both the SAM and VCF specifications. htsjdk has very poor VCF SV support so it'd be nice to have a library that properly supports these variants (and handles the upcoming VCFv4.4 SV changes).

@jkbonfield
Copy link

jkbonfield commented Jul 15, 2021

Firstly, making SAM MT and speeding up the BAM threading was my work, so thanks for the kind comments about Htslib. :-)

Some of my own comments.

  • Just having 1 read and 1 write thread to implement asynchronous I/O is often a big win by itself. It typically costs no extra CPU, but removes latency as you're prepping ahead of your main algorithm.

  • Threading is relatively easy provided you're willing to control the number of decode and encode threads independently. Eg +1 thread for decompression and +5 for compression.

  • If you want to share a single thread pool, it gets complex. This is what I did with Htslib (actually it was just stolen from my Scramble implementation), but it's also where the majority of the initial bugs were. Given the speed of decompression over compression, it's too easy to just build up a huge queue of decoded blocks and start sluriping the entire file into memory. You need to have a finite queue length. You also need to queues of job types so you can make sure there's always room for different types of jobs, otherwise you can get deadlocks. In short, it's complex, but maybe there are premade task queuing systems to use.

  • If you have the ability to do things like index generation on-the-fly, it can get tricky with BAM due to the nature of undoing the last partial record and punting it to the next block (assuming the encoder attempts to start BAM blocks on a read boundary where possible).

  • Seeking really is a pain. In theory if you know all the region offsets up-front then the threaded decoder can seek to the next region and be populating the decoded block list it needs ahead of time, but that's complex! We didn't do that, yet - instead we just track the end point of current region and/or discard stuff on seek.

All that aside, a naive threading system with X threads for encode and Y for decode is quite simple, so I'd start off with the basics.

@jamestwebber
Copy link
Contributor

I think the main question is not "can this be done" (of course) but rather "who has the time to do it properly." As I understand it, this isn't exactly @zaeleus's day job.

@zaeleus
Copy link
Owner

zaeleus commented Jul 16, 2021

Thanks for the comments, @d-cameron and @jkbonfield. I'll take them into consideration while working on this.

I'm focusing on simplicity for the the first iteration, i.e., async I/O in the async branch. The implementation for the async BGZF reader uses a bounded subscheduler to decompress blocks in parallel (depending on your async executor). I'm happy with how clean it turned out, and it should be a good foundation for further optimization.

@zaeleus zaeleus self-assigned this Jul 23, 2021
@zaeleus
Copy link
Owner

zaeleus commented Aug 2, 2021

I want to mention that both multi-task BGZF block compression (writing) and decompression (reading) are available in the async branch. There are also async BAM, BAI, and VCF readers and writers that use the new implementations. Because it's behind a feature flag, I will likely merge into the main branch soon to allow for easier testing.

For discussion, below are some casual benchmarks comparing noodles-bgzf (5885fcc) readers and writers against bgzip 1.13. The input is a fruit fly reference genome. It can be prepared by recompressing the file using bgzip.

$ wget https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/001/215/GCF_000001215.4_Release_6_plus_ISO1_MT/GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna.gz
$ gzip -d GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna.gz
$ bgzip -c GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna > GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna.gz

The uncompressed file is 139 MiB; and the bgzipped-compressed file, 44 MiB. Benchmarks are measured using hyperfine 1.11.0 with 3 warmup runs.

Example: Read BGZF

Benchmark Mean [ms] Min [ms] Max [ms] Relative
bgzip -dc@8 (1.13; zlib 1.2.11) 90.8 ± 2.2 85.9 96.0 1.00
noodles (async; zlib-ng 2.0.2; 8 workers) 96.4 ± 1.8 93.9 101.7 1.06 ± 0.03
noodles (async; zlib 1.2.11; 8 workers) 98.7 ± 2.0 93.8 102.4 1.09 ± 0.03
noodles (async; miniz_oxide 0.4.4; 8 workers) 133.0 ± 2.1 129.0 136.4 1.46 ± 0.04
noodles (sync; zlib-ng 2.0.2) 334.0 ± 6.7 327.3 349.5 3.68 ± 0.12
noodles (sync; zlib 1.2.11) 371.9 ± 4.5 367.5 380.1 4.09 ± 0.11
bgzip -dc (1.13; zlib 1.2.11) 385.5 ± 5.4 380.0 394.4 4.24 ± 0.12
noodles (sync; miniz_oxide 0.4.4) 553.2 ± 5.0 546.8 558.8 6.09 ± 0.16

The pure Rust implementation (noodles + miniz_oxide) isn't quite able to keep up the decompression speeds of zlib, particularly in the sync version. Sync noodles + zlib(-ng) can outperform single-threaded bgzip, but multithreaded bgzip outperforms noodles + zlib(-ng).

Example: Write BGZF (compression level = 6)

Benchmark Mean [s] Min [s] Max [s] Relative
noodles (async; zlib-ng 2.0.2; 8 workers) 1.290 ± 0.012 1.276 1.311 1.00
noodles (async; miniz_oxide 0.4.4; 8 workers) 3.506 ± 0.059 3.397 3.574 2.72 ± 0.05
bgzip -c@8 (1.13; zlib 1.2.11) 4.338 ± 0.059 4.218 4.394 3.36 ± 0.06
noodles (async; zlib 1.2.11; 8 workers) 4.424 ± 0.085 4.337 4.620 3.43 ± 0.07
noodles (sync; zlib-ng 2.0.2) 5.664 ± 0.021 5.634 5.711 4.39 ± 0.05
noodles (sync; miniz_oxide 0.4.4) 14.409 ± 0.026 14.359 14.454 11.17 ± 0.11
noodles (sync; zlib 1.2.11) 20.992 ± 0.054 20.952 21.137 16.28 ± 0.16
bgzip -c (1.13; zlib 1.2.11) 21.018 ± 0.063 20.932 21.101 16.30 ± 0.16

The write benchmarks are surprising. So much so that I'll confirm that the output is correct:

$ cargo run --release --features async --example bgzf_write_async GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna > dm6.fna.gz
$ gzip -dc dm6.fna.gz | sha256sum - GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna
4e14dbd8ea213a19ebf27057ab62a57ca83951e79ae0c2501ae4c460b7c3d415  -
4e14dbd8ea213a19ebf27057ab62a57ca83951e79ae0c2501ae4c460b7c3d415  GCF_000001215.4_Release_6_plus_ISO1_MT_genomic.fna

noodles + miniz_oxide has better write characteristics than zlib; however, zlib-ng shows that it provides significant performance improvements, particularly for being a drop-in replacement. Curiously, noodles + zlib and bgzip perform nearly the same in both contexts.

@jkbonfield
Copy link

Zlib-ng appears to have come on well then. Last I tried it wasn't a huge leap above zlib. Are the sizes comparable?

If you're curious, you may also wish to try bgzip with libdeflate. This isn't a drop-in replacement, but htslib has bindings for it. It's also a very fast modern implementation of deflate, but it also has a faster decompression too. So it's our recommended method for htslib.

There are some fun benchmarks at https://quixdb.github.io/squash-benchmark/unstable/

@brainstorm
Copy link

brainstorm commented Aug 3, 2021

Fantastic benchmarking Michael, kudos!

Also nice compression resource James! I recently came across this one, perhaps you knew about it already?:

http://www.mattmahoney.net/dc/text.html

I suppose that there's no urge to implement NN-based compression (looking at the nr1 place), quite impressive :-!

Other than that, I wanted to comment that, if possible, those C-bindings should be entirely optional? One of the main reasons I started using Noodles was to avoid C dependencies (after dealing with rust-htslib).

@jkbonfield
Copy link

Yep, I've been a long follower of Matt's work. His sibling page on http://mattmahoney.net/dc/dce.html is also a useful resource. I also regularly look at the text compression page (as well as encode.su). I find the really slow tools can still be a useful research guide. For example, if we can get within a few percent of them for a specific type of data (eg read names in CRAM) then I know I can stop trying for better compression - close is "good enough" given the CPU time is vastly superior.

There are a few serial winners which always perform amazingly for the speed / size tradeoffs - bsc and mcm. I've considered both for CRAM 4, but as yet they're not in there.

Agreed that yes all bindings ought to be optional, permitting a nice simple pure-rust implementation while offering potential gains for those who want the hassle of a more complicated setup. For the same reason, htslib can build against standard zlib without needing libdeflate.

@zaeleus
Copy link
Owner

zaeleus commented Aug 4, 2021

Are the sizes comparable?

Yes, all the outputs are close: 44.3 MiB +/- 0.07 MiB.

Other than that, I wanted to comment that, if possible, those C-bindings should be entirely optional?

The C libraries (zlib, zlib-ng, etc.) are definitely optional. The Rust implementation miniz_oxide is the default used in noodles. I chose to show different backends since encoding/decoding tend to be the bottleneck. They're easy to change via feature flags [1] [2] but do require their specific toolchains (e.g., zlib-ng needs cmake to build).

@zaeleus
Copy link
Owner

zaeleus commented Aug 12, 2021

The async branch was merged into the main branch. It currently includes BAM (R/W), BAI (R/W), BCF (R), BGZF (R/W), tabix (R/W), and VCF (R/W). It is part of noodles 0.5.0 release.

As for the initial use case of this issue to write filtered BAM files, here are some notable changes:

  • Async I/O is behind the async feature flag. It is disabled by default and must be explicitly enabled, e.g.,

    noodles = { version = "0.5.0", features = ["bam"] }
    noodles-bam = { version = "0.3.0", features = ["async"] }
  • Use the async equivalent of different I/O objects: std::io::File => tokio::fs::File, bam::Reader => bam::AsyncReader, bam::Writer => bam::AsyncWriter, etc.

  • Async{Reader,Writer} have builders to control the number of workers (and compression level), e.g.,

    let file = File::create("sample.bam").await?;
    let mut writer = bam::AsyncWriter::builder(file)
        .set_worker_count(4)
        .set_compression_level(Compression::fast())
        .build();

    The default worker count is the number of available logical CPUs; and compression level, 6.

  • bam::AsyncReader::records et al. returns a Stream, not an Iterator. There is no syntactic sugar applied to streams used in for loops, but one alternative is to use while let and StreamExt::next/TryStreamExt::try_next.

  • Rust does not have async destructors (i.e., an async version of Drop). When working with a BGZF async writer (e.g., BAM, BGZF, tabix, etc.), an appropriate shutdown method must be called to finalize the output stream with the last flush and writing the EOF block.

See noodles-bam/examples/bam_reheader_async.rs for an example of reading from and writing to the BAM format using async I/O. Many of the other examples were also rewritten using async/await.

@brainstorm
Copy link

This is fantastic Michael!!!

We were discussing with @mmalenic about htsget-rs async port (soon to be merged) and couldn't help notice that CRAM and CSI are not async (yet?).

What roadblocks did you find for those two? Are they significantly more complex to implement or is it just a matter of time?

Please don't take this as criticism, your implementation/fixing speed is admirable :)

@jamestwebber
Copy link
Contributor

This is awesome, thanks for all your work @zaeleus! I am going to try this very soon.

@zaeleus
Copy link
Owner

zaeleus commented Aug 12, 2021

What roadblocks did you find for those two? Are they significantly more complex to implement or is it just a matter of time?

No blockers (that I know of :)). They're on the roadmap and will be in a (near) future release.

@andy-thomason
Copy link

Very nice...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bgzf enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants