Async BAM decompression #576

Merged
merged 2 commits into from Jan 9, 2017

Conversation

Projects
None yet
9 participants
Contributor

d-cameron commented Apr 18, 2016

No description provided.

Contributor

d-cameron commented Apr 18, 2016

Additional feature in this PR not in #482 is async buffers reuse. This also nicely make mOperations less awkward as the values actually mean something now.

@akiezun akiezun and 1 other commented on an outdated diff Apr 18, 2016

.../htsjdk/samtools/util/BlockCompressedInputStream.java
+ try {
+ while (true) {
+ byte[] buffer = mOperations.take();
+ // snapshot where to output to. If seek() is called
+ // on the input stream, then under no circumstances
+ // do we want to write our result to the post-seek
+ // results
+ BlockingQueue<DecompressedBlock> result = mResult;
+ if (result != null) {
+ DecompressedBlock decompressed = nextBlock(buffer);
+ result.put(decompressed);
+ }
+ }
+ } catch (InterruptedException e) {
+ // Reset interrupt status
+ Thread.interrupted();
@akiezun

akiezun Apr 18, 2016 edited

Contributor

why is this necessary - wouldn't the interrupted flag be reset by whoever threw the exception? Is it necessary because the main thread may be calling interrupt() on the worker after the InterruptedException is thrown but before it is caught (?)

@d-cameron

d-cameron Apr 18, 2016

Contributor

flushDecompressionThread() interrupts the thread (for early abort of unnecesary decompression). This requires the interrupt status to be cleared so processing can continue once the seek() on the foreground thread has completed.

Contributor

akiezun commented Apr 19, 2016 edited

@d-cameron I've tried to plug in this PR's code to gatk4 and run tests (which use asyncIO). However everything deadlocks when I run the tests. See the attached jstack dump. I also see this in the console

Exception in thread "DecompressBGZF" htsjdk.samtools.SAMFormatException: Did not inflate expected amount

testDeadlock2.txt

Can you help me figure out what's causing this? It may be that my test suite is behaving badly, or a bug in the new code.

akiezun self-assigned this Apr 19, 2016

Contributor

akiezun commented Apr 19, 2016

Another stacktrace in the logs:

Exception in thread "DecompressBGZF" htsjdk.samtools.util.RuntimeIOException: java.util.zip.DataFormatException: invalid distance too far back
        at htsjdk.samtools.util.BlockGunzipper.unzipBlock(BlockGunzipper.java:112)
        at htsjdk.samtools.util.BlockCompressedInputStream.inflateBlock(BlockCompressedInputStream.java:490)
        at htsjdk.samtools.util.BlockCompressedInputStream.nextBlock(BlockCompressedInputStream.java:473)
        at htsjdk.samtools.util.BlockCompressedInputStream.access$700(BlockCompressedInputStream.java:56)
        at htsjdk.samtools.util.BlockCompressedInputStream$DecompressionRunnable.decompressBlocks(BlockCompressedInputStream.java:749)
        at htsjdk.samtools.util.BlockCompressedInputStream$DecompressionRunnable.run(BlockCompressedInputStream.java:762)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.util.zip.DataFormatException: invalid distance too far back
        at java.util.zip.Inflater.inflateBytes(Native Method)
        at java.util.zip.Inflater.inflate(Inflater.java:259)
        at htsjdk.samtools.util.BlockGunzipper.unzipBlock(BlockGunzipper.java:96
Contributor

akiezun commented Apr 19, 2016

I cleaned up my reader closing a bit and now the jstack dump is like this (again, tests are stalled):
testDeadlock3.txt

I think it essentially it boils down to the the test runner thread being stuck at

        at htsjdk.samtools.util.BlockCompressedInputStream$DecompressionRunnable.takeNextDecompressedBlock(BlockCompressedInputStream.java:734)
Contributor

akiezun commented Apr 19, 2016

If the worker thread just dies due to an exception, the main thread may never find out if it's blocking on the take(), right?

akiezun referenced this pull request in broadinstitute/gatk Apr 19, 2016

Merged

fix resource leaks in MarkDuplicates #1729

Contributor

d-cameron commented Apr 19, 2016 edited

Is your test harness currently leaking thousands of threads?

I'll refactor the code to use a thread pool tonight and add an extra constructor so I can incorporate tests into htsjdk. This should make testing easier, the class instanciation more lightweight, and remove the thread leak penalty of code that doesn't call close().

Contributor

akiezun commented Apr 19, 2016

@d-cameron It did leak threads - I fixed the thread leakage now, at least in MarkDuplicates.

Contributor

akiezun commented Apr 19, 2016

@d-cameron do let me know when the PR is ready for re-review and re-test

Contributor

d-cameron commented Apr 19, 2016

Ready to go from my perspective. The test case picked up a few bugs in my implementation during development so I'm happy that it is actually testing the async code in a meaningful way.

This async code is both shorter and has a simpler API so reasoning about correctness should be an easier task.

Contributor

akiezun commented Apr 20, 2016 edited

@d-cameron I plugged in this new code to gatk4 and ran the test suite. I see this in the log (it does not happen on master htsjdk)

Caused by: java.io.IOException: Invalid file pointer: 1049
    at htsjdk.samtools.util.BlockCompressedInputStream.seek(BlockCompressedInputStream.java:331)
    at org.seqdoop.hadoop_bam.BAMRecordReader.initialize(BAMRecordReader.java:153)
    at org.seqdoop.hadoop_bam.BAMInputFormat.createRecordReader(BAMInputFormat.java:127)
    at org.seqdoop.hadoop_bam.AnySAMInputFormat.createRecordReader(AnySAMInputFormat.java:187)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
Contributor

akiezun commented Apr 20, 2016

another exception on a rerun (not sure how easy it would be to diagnose from this one because there's no htsjdk on the stack though). These must be due to the async reading because i change nothing else and those tests run reliably on travis on https://github.com/broadinstitute/gatk (and we do use async IO in the tests all the time).

java.lang.ArrayIndexOutOfBoundsException
    at org.seqdoop.hadoop_bam.util.SeekableArrayStream.read(SeekableArrayStream.java:51)
    at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:199)
    at org.seqdoop.hadoop_bam.BAMSplitGuesser.guessNextBGZFPos(BAMSplitGuesser.java:236)
    at org.seqdoop.hadoop_bam.BAMSplitGuesser.guessNextBAMRecordStart(BAMSplitGuesser.java:140)
    at org.seqdoop.hadoop_bam.BAMInputFormat.addProbabilisticSplits(BAMInputFormat.java:246)
    at org.seqdoop.hadoop_bam.BAMInputFormat.getSplits(BAMInputFormat.java:161)
    at org.seqdoop.hadoop_bam.AnySAMInputFormat.getSplits(AnySAMInputFormat.java:245)
Contributor

d-cameron commented Apr 20, 2016

I had a look at org.seqdoop.hadoop_bam.BAMRecordReader and it does seem very much like the Hadoop-Bam code is making assuptions about the synchronization between the position exposed by the BlockCompressedInputStream, and the position of the underlying stream.

Digging around a bit I found the following comment in org.seqdoop.hadoop_bam.util.BGZFSplitCompressionInputStream::
"This implementation requires that the underlying BGZF input stream can advertise

  • when it reaches a BGZF block boundary, which is why a modified version of htsjdk's
  • {@code BlockCompressedInputStream} is used."
    I presume this was the endOfBlock() function that got added.

Looking at the API https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/io/compress/SplitCompressionInputStream.html I see references to the underlying stream being passed to a base class that has a seek() method as well as to BlockCompressedInputStream. It looks like Hadoop-BAM is not compatible with any sort of read-ahead on the stream.

Contributor

akiezun commented Apr 20, 2016

@tomwhite can you comment on whether Hadoop-BAM can be made to work with async reading? If not, we may have to just be able to turn off async IO for Hadoop-BAM.

Contributor

d-cameron commented Apr 20, 2016 edited

I suspect you'll find it is the read-ahead of the underlying stream that's the issue, not the async itself.
For testing purposes, it's probably worth creating a SynchronousExecutorService that performs immediate read-ahead in the current thread. You'd get deterministic errors and be able to identify the root cause much more easily.

Edit: replacing threadpool.execute(this); with run(); would be an even simpler approach as a temporary testing measure.

Contributor

d-cameron commented Apr 20, 2016 edited

Changing the BlockCompressedInputStreams constructors to pass async=false in the Hadoop-BAM codebase would be straight-forward - there's only a handful of reference to the class.

Contributor

tomwhite commented Apr 21, 2016

@akiezun @d-cameron I expected readahead to be a problem with Hadoop-BAM, and to have it disabled for the reasons you say. Even if async is disabled by default (which I believe it is), Hadoop-BAM should explicitly use the non-async constructor.

Contributor

akiezun commented Apr 22, 2016 edited

@d-cameron @tomwhite how should we proceed? we can't update hadoop-bam until the new constructor is in and I can't properly test gatk until hadoop bam is updated. @d-cameron can you move the code that checks Defaults.USE_ASYNC_IO_FOR_SAMTOOLS outside of BlockCompressedInputStream (ie move to the caller) and make it synchronous by default? That way, hadoop-bam would not run in async mode.

Contributor

d-cameron commented Apr 22, 2016

If I'm to do that then it would make more sense to refactor into a separate ASyncBlockCompressedInputStream class. Any objections?

Contributor

akiezun commented Apr 22, 2016

no objections from me. that may indeed be better

Contributor

d-cameron commented Apr 26, 2016

Back to you @akiezun

Contributor

akiezun commented Apr 26, 2016

thanks @d-cameron , i'm on it

@akiezun akiezun commented on an outdated diff Apr 26, 2016

src/java/htsjdk/tribble/TabixFeatureReader.java
@@ -126,7 +128,8 @@ public boolean hasIndex(){
}
public CloseableTribbleIterator<T> iterator() throws IOException {
- final InputStream is = new BlockCompressedInputStream(ParsingUtils.openInputStream(path));
+ final InputStream compressedStream = ParsingUtils.openInputStream(path);
+ final InputStream is = Defaults.USE_ASYNC_IO_FOR_TRIBBLE ? new AsyncBlockCompressedInputStream(compressedStream) : new BlockCompressedInputStream(compressedStream);
@akiezun

akiezun Apr 26, 2016

Contributor

Can we limit this PR to BAMs (as per title)? Tribble is a whole new can of worms and let's keep it as is for now.

@akiezun akiezun commented on an outdated diff Apr 26, 2016

src/java/htsjdk/tribble/index/IndexFactory.java
@@ -183,7 +184,7 @@ private static InputStream indexFileInputStream(final String indexFile) throws I
return new GZIPInputStream(inputStreamInitial);
}
else if (indexFile.endsWith(TabixUtils.STANDARD_INDEX_EXTENSION)) {
- return new BlockCompressedInputStream(inputStreamInitial);
+ return Defaults.USE_ASYNC_IO_FOR_TRIBBLE ? new AsyncBlockCompressedInputStream(inputStreamInitial) : new BlockCompressedInputStream(inputStreamInitial);
@akiezun

akiezun Apr 26, 2016

Contributor

here too - please leave as-is for this PR

@akiezun akiezun commented on an outdated diff Apr 26, 2016

src/java/htsjdk/samtools/SamReaderFactory.java
@@ -261,7 +262,7 @@ public SamReader open(final SamInputResource resource) {
primitiveSamReader = new BAMFileReader(sourceFile, indexFile, false, validationStringency, this.samRecordFactory);
}
} else if (BlockCompressedInputStream.isValidFile(bufferedStream)) {
- primitiveSamReader = new SAMTextReader(new BlockCompressedInputStream(bufferedStream), validationStringency, this.samRecordFactory);
+ primitiveSamReader = new SAMTextReader(Defaults.USE_ASYNC_IO_FOR_SAMTOOLS ? new AsyncBlockCompressedInputStream(bufferedStream) : new BlockCompressedInputStream(bufferedStream), validationStringency, this.samRecordFactory);
@akiezun

akiezun Apr 26, 2016

Contributor

the selection of which stream to take is done in the reader class for BAMs - why is it in the reader factory for SAMs? Also, I find it a bit puzzling that it's the stream that is asynchronous and not the reader (on the writing side, we have asynchronous writers and not streams).

@akiezun akiezun commented on an outdated diff Apr 26, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Asynchronous read-ahead implementation of {@link htsjdk.samtools.util.BlockCompressedInputStream}.
+ *
+ * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally.
+ */
+public class AsyncBlockCompressedInputStream extends BlockCompressedInputStream {
+ private static final int READ_AHEAD_BUFFERS = (int)Math.ceil(Defaults.NON_ZERO_BUFFER_SIZE / BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
+ private static final Executor threadpool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ private final BlockingQueue<DecompressedBlock> mResult = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS);
+ private final BlockingQueue<byte[]> freeBuffers = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS);
@akiezun

akiezun Apr 26, 2016

Contributor

whitespace issues

@akiezun akiezun commented on an outdated diff Apr 26, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+
+ public AsyncBlockCompressedInputStream(final SeekableStream strm) {
+ super(strm);
+ }
+
+ protected DecompressedBlock nextBlock(byte[] bufferAvailableForReuse) {
+ if (bufferAvailableForReuse != null) {
+ freeBuffers.offer(bufferAvailableForReuse);
+ }
+ return nextBlockSync();
+ }
+
+ @Override
+ protected void prepareForSeek() {
+ flushReadAhead();
+ super.prepareForSeek();
@akiezun

akiezun Apr 26, 2016 edited

Contributor

the super call is not needed because the super does nothing (and it will be documented as doing nothing)

@akiezun akiezun and 1 other commented on an outdated diff Apr 26, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+import java.io.InputStream;
+import java.net.URL;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Asynchronous read-ahead implementation of {@link htsjdk.samtools.util.BlockCompressedInputStream}.
+ *
+ * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally.
+ */
+public class AsyncBlockCompressedInputStream extends BlockCompressedInputStream {
+ private static final int READ_AHEAD_BUFFERS = (int)Math.ceil(Defaults.NON_ZERO_BUFFER_SIZE / BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
+ private static final Executor threadpool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@akiezun

akiezun Apr 26, 2016

Contributor

If there is more than 1 thread here, how are we guaranteeing the order of reading of the blocks?

@d-cameron

d-cameron Apr 27, 2016 edited

Contributor

There's only 1 task for any given stream in the pool at any given time.

@akiezun akiezun commented on an outdated diff Apr 26, 2016

.../htsjdk/samtools/util/BlockCompressedInputStream.java
- byte[] buffer = mCurrentBlock;
- mCurrentBlock = null;
- if (buffer == null || buffer.length != uncompressedLength) {
- try {
- buffer = new byte[uncompressedLength];
- } catch (final NegativeArraySizeException e) {
- throw new RuntimeIOException("BGZF file has invalid uncompressedLength: " + uncompressedLength, e);
- }
+ private byte[] inflateBlock(final byte[] compressedBlock, final int compressedLength,
+ final byte[] bufferAvailableForReuse) throws IOException {
+ final int uncompressedLength = unpackInt32(compressedBlock, compressedLength - 4);
+ if (uncompressedLength < 0) {
+ throw new RuntimeIOException("BGZF file has invalid uncompressedLength: " + uncompressedLength);
+ }
+ byte[] buffer = bufferAvailableForReuse;
+ if (buffer == null || uncompressedLength != buffer.length) {
@akiezun

akiezun Apr 26, 2016

Contributor

add a comment to the effect that we must allocate a new buffer because the one that is available is no good

@akiezun akiezun and 1 other commented on an outdated diff Apr 26, 2016

.../htsjdk/samtools/util/BlockCompressedInputStream.java
}
- private void readBlock()
- throws IOException {
-
+ private void readBlock() throws IOException {
+ mCurrentBlock = nextBlock(getBufferForReuse(mCurrentBlock));
+ mCurrentOffset = 0;
+ checkAndRethrowDecompressionException();
+ }
+ /**
+ * Reads and decompresses the next block
+ * @param bufferAvailableForReuse decompression buffer available for reuse
+ * @return next block in the decompressed stream
+ */
+ protected DecompressedBlock nextBlock(byte[] bufferAvailableForReuse) {
@akiezun

akiezun Apr 26, 2016

Contributor

what's the difference between nextBlock(byte[] bufferAvailableForReuse) and processNextBlock(byte[] bufferAvailableForReuse)?

@d-cameron

d-cameron Apr 27, 2016

Contributor

processNextBlock() actually does the reading and decompression, nextBlock() is the method that is called when the next block is required (which in the base class, is just a pass-through to processNextBlock() to do the work).

@akiezun akiezun and 1 other commented on an outdated diff Apr 26, 2016

.../htsjdk/samtools/util/BlockCompressedInputStream.java
*
* c.f. http://samtools.sourceforge.net/SAM1.pdf for details of BGZF format
+ *
+ *
*/
public class BlockCompressedInputStream extends InputStream implements LocationAware {
@akiezun

akiezun Apr 26, 2016 edited

Contributor

hadoop-bam is still failing for me when i use this PR's code. The stream is synchronized and it does not fail on htsjdk 2.2.2 code. I trigger this by running gatk4 test suite after replacing htsjdk with a jar made from this PR.

Caused by: java.io.IOException: Invalid file pointer: 1049
    at htsjdk.samtools.util.BlockCompressedInputStream.seek(BlockCompressedInputStream.java:308)
    at org.seqdoop.hadoop_bam.BAMRecordReader.initialize(BAMRecordReader.java:153)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
@d-cameron

d-cameron Apr 27, 2016 edited

Contributor

Any chance of being able to reproduce in an isolated test case I'm able to run?

Contributor

akiezun commented Apr 26, 2016 edited

I added a few comments. Thanks for your ongoing work on it. Back to you @d-cameron

Two major points:

  • gatk4 tests still failing on this code (seek blows up in hadoop bam even though sync IO is used as far as I can tell)
  • asynchronous stream is a bit strange to have. I think readers or writers (and not the streams) should be asynchronous - for consistency at least. What do you think? @droazen @tfenne any opinion on readers vs streams being synch/asynch?
Contributor

d-cameron commented Apr 27, 2016

This PR is really an async block decompression PR. Performing async stream decompression as well as async iteration (using AsyncBufferedIterator) increases throughput by more than 100% over a single-threaded synchronous iterator for a moderate amount of computation on each record. This is only possible by buffering both the stream and the iterator. Processing time per record is:
Single thread: Read + Decompress + Parse + Process
Async stream only: max(Read + Decompress, Parse + Process)
Async iterator only: max(Read + Decompress + Parse, Process)
Both async: max(Read+Decompress, Parse, Process)

Quite a number of picard tools workloads benefit from having both async.

Contributor

akiezun commented Apr 27, 2016

hmm, nevermind, hadoop-bam still uses the async stream, allocated like this

      at htsjdk.samtools.util.AsyncBlockCompressedInputStream.<init>(AsyncBlockCompressedInputStream.java:56)
        at htsjdk.samtools.BAMFileReader.<init>(BAMFileReader.java:106)
        at htsjdk.samtools.SamReaderFactory$SamReaderFactoryImpl.open(SamReaderFactory.java:259)
        at org.seqdoop.hadoop_bam.util.SAMHeaderReader.readSAMHeaderFrom(SAMHeaderReader.java:73)
        at org.seqdoop.hadoop_bam.util.SAMHeaderReader.readSAMHeaderFrom(SAMHeaderReader.java:52)
        at org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource.getHeader(ReadsSparkSource.java:205)

and others, all sharing SAMHeaderReader.readSAMHeaderFrom

Contributor

akiezun commented Apr 27, 2016

@tomwhite what do you think can be done here to enforce the use of sync stream in Hadoop-BAM? The SAMHeaderReader in hadoop-bam is using the SamReaderFactory to read the header but it does not close the underlying reader or the wrapped stream (which is a Async stream that wraps FSDataInputStream) - this leaves the background thread running and blows up in seek on the wrapped FSDataInputStream a few lines down.

(BTW, I feel like all these ultimately are due to the way sync/async is specified which is through this single global setting in Defaults - it leaves so little control to the clients. ).

Contributor

tomwhite commented Apr 27, 2016

@akiezun Perhaps use SAMHeaderReader#readSAMHeaderFrom() that takes a Path and does close the stream. Would that help?

Contributor

akiezun commented May 10, 2016

As discussed with @tomwhite , I'll attempt to add an option to the reader factory to enable overriding Defaults - for now, just the async.

akiezun referenced this pull request May 10, 2016

Merged

adding a useAsynchronousIO flag to SamReaderFactory #601

2 of 4 tasks complete
Contributor

akiezun commented May 11, 2016

@d-cameron can you rebase and use the new API in the SamReaderFactory (see #601) to not rely on Defaults but on the flag passed to the factory? Thanks

@d-cameron d-cameron added a commit to d-cameron/htsjdk that referenced this pull request May 12, 2016

@d-cameron d-cameron Reverting tribble async code as per #576 @akiezun request b2fdb56
Contributor

d-cameron commented May 12, 2016

Back to @akiezun

Contributor

tomwhite commented May 12, 2016

I've tried this and an updated Hadoop-BAM in GATK and the tests are still not passing. Looking into why now...

Contributor

tomwhite commented May 12, 2016

Investigated a bit more and the issue I'm seeing is not due to this change, but another one in Hadoop-BAM (HadoopGenomics/Hadoop-BAM#80). When I remove that change all the GATK tests pass for me when running with the change in this issue (#576), and updates in Hadoop-BAM to disable async IO. So +1 from me.

tomwhite referenced this pull request in HadoopGenomics/Hadoop-BAM May 12, 2016

Closed

Release Hadoop-BAM 7.5.0 #88

Contributor

akiezun commented May 12, 2016

thanks @tomwhite. Can Hadoop-bam switch to synch reader for BAMSplitGuesser.java BTW, does that method guessNextBAMRecordStart close the reader?

Contributor

tomwhite commented May 13, 2016

@akiezun here's the PR to switch Hadoop-BAM to use synchronous streams: HadoopGenomics/Hadoop-BAM#94. The guessNextBAMRecordStart method does not close the stream, which means it can be used by subsequent calls to the same method.

BTW I've also opened broadinstitute/gatk#1817 to address the other issue I was seeing which was due to sharded BAM files not have a .bam extension.

Contributor

akiezun commented May 20, 2016

The GATK tests now work on this branch which is great.

But the threads in the pool are non-deamon and so the vm does not die even when main thread is dead.

Run this for synch IO

java -Dsamjdk.use_async_io_samtools=false -cp dist/htsjdk-2.2.4.jar htsjdk.samtools.example.PrintReadsExample ./testdata/htsjdk/samtools/BAMFileIndexTest/index_test.bam false

and this for async IO

java -Dsamjdk.use_async_io_samtools=true -cp dist/htsjdk-2.2.4.jar htsjdk.samtools.example.PrintReadsExample ./testdata/htsjdk/samtools/BAMFileIndexTest/index_test.bam false

The latter never terminates and there is a bunch of threads from the pool that are still alive:


"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fdfde834800 nid=0x5b03 waiting on condition [0x0000000128187000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076b087fd0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

back to @d-cameron

Contributor

d-cameron commented May 23, 2016

Should be good to go now @akiezun.

Contributor

akiezun commented May 23, 2016

Thanks. that is working now. I'll test it on gatk and picard next

Contributor

akiezun commented May 23, 2016

BTW, perf gain is still the same, running on a 1.6GB file (reading only + eager decode).

(I had those runs alternated, printing consecutive here for clarity)

Sync
real 0m29.688s
real 0m28.671s
real 0m28.322s
real 0m28.525s
real 0m28.399s

Async
real 0m16.459s
real 0m16.100s
real 0m16.792s
real 0m16.560s
real 0m15.964s

@akiezun akiezun and 1 other commented on an outdated diff May 24, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+ * Asynchronous read-ahead implementation of {@link htsjdk.samtools.util.BlockCompressedInputStream}.
+ *
+ * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally.
+ */
+public class AsyncBlockCompressedInputStream extends BlockCompressedInputStream {
+ private static final int READ_AHEAD_BUFFERS = (int)Math.ceil(Defaults.NON_ZERO_BUFFER_SIZE / BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
+ private static final Executor threadpool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ private final BlockingQueue<DecompressedBlock> mResult = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS);
+ private final BlockingQueue<byte[]> freeBuffers = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS);
+ private final Semaphore running = new Semaphore(1);
@akiezun

akiezun May 24, 2016

Contributor

why do we need a semaphore and a volatile boolean here? - they both seem to indicate similar things. What's the semaphore for? I'm concerned that there are 2 blocking queues, a semaphore and a volatile boolean to communicate between the threads. That seems like a lot of shared data

@d-cameron

d-cameron May 25, 2016 edited

Contributor

It does seem like a lot, but they're all there for a reason.

  • mResult is the expected results buffer
  • freeBuffers is used to reduce memory. This is just a synchronized queue
  • running is required for the threadpool implementation, because unlike a dedicated backing thread, there are no guarantees that the background task is actually running, nor on which thread it is running.
  • mAbort is used as a safe early exit strategy to ensure read-ahead is stopped cleanly. Could be omitted if we were willing to unnecessarily fill the read-ahead buffer before we could seek()
  • runnable isn't needed so I've dropped it

@akiezun akiezun commented on an outdated diff May 24, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+ return nextBlockSync();
+ }
+
+ @Override
+ protected void prepareForSeek() {
+ flushReadAhead();
+ super.prepareForSeek();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Suppress interrupts while we close.
+ final boolean isInterrupted = Thread.interrupted();
+ mAbort = true;
+ flushReadAhead();
+ super.close();
@akiezun

akiezun May 24, 2016

Contributor

what if this close blows up (as it can by the IOException in the signature)? the interrupt status will not be restored, right?

@akiezun akiezun and 1 other commented on an outdated diff May 24, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+ private void flushReadAhead() {
+ final boolean abortStatus = mAbort;
+ mAbort = true;
+ try {
+ // block until the thread pool operation has completed
+ running.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting for decompression thread", e);
+ }
+ // flush any read-ahead results
+ mResult.clear();
+ mAbort = abortStatus;
+ running.release();
+ }
+ private void ensureReadAhead() {
+ if (running.tryAcquire()) {
@akiezun

akiezun May 24, 2016

Contributor

so tryAcquire means that ensureReadAhead may sometimes not ensure that read ahead happened?

@d-cameron

d-cameron May 25, 2016

Contributor

It ensures a read ahead is scheduled to happen/is happening. If read-ahead is already in progress, nothing needs to be done.

@akiezun akiezun commented on an outdated diff May 24, 2016

...dk/samtools/util/AsyncBlockCompressedInputStream.java
+ }
+
+ public AsyncBlockCompressedInputStream(final File file)
+ throws IOException {
+ super(file);
+ }
+
+ public AsyncBlockCompressedInputStream(final URL url) {
+ super(url);
+ }
+
+ public AsyncBlockCompressedInputStream(final SeekableStream strm) {
+ super(strm);
+ }
+
+ protected DecompressedBlock nextBlock(byte[] bufferAvailableForReuse) {
@akiezun

akiezun May 24, 2016

Contributor

add @Override

Contributor

d-cameron commented May 25, 2016

Back to you @akiezun

@d-cameron d-cameron added a commit to PapenfussLab/gridss that referenced this pull request May 29, 2016

@d-cameron d-cameron #30: reverting htsjdk performance optimisation in favour of waiting f…
…or the thread pool version samtools/htsjdk#576 to be integrated into htsjdk.
725c6ac
Contributor

akiezun commented Jun 8, 2016 edited

performance testing update: on my use cases (HaplotypeCaller and BQSR in GATK4) the improvements are somewhat less pronounced because these tools don't do as much reading as simple print reads example.

Example (all tested on a 8-core MacOSX machine):
BQSR (BaseRecalibrator - noBAQ = most read-heavy option): 16% speedup over a 1min test run
BQSR (BaseRecalibrator - withBAQ = less read-heavy option): 6% speedup over a 1min test run
HaplotypeCaller - less than 1% improvement over a 2 min test run

akiezun removed their assignment Jun 8, 2016

Contributor

akiezun commented Jun 8, 2016

Correctness update: with this change on, tests pass on both https://github.com/broadinstitute/gatk and https://github.com/broadinstitute/gatk-protected test suites (both use async IO everywhere) so I don't have any problems with merging it.

However, given that BQSR runs on 1 core in our production and there's no speedup for HaplotypeCaller and complex mutli-threaded code like this always carries risks, I feel like I can't champion this PR. I will unassign myself accurdingly as a reviewer. Thanks @d-cameron for your work on this PR.

What do others in the community think? Should this be merged given the benefits and the risks?

Contributor

droazen commented Jun 8, 2016

@akiezun Perhaps we should provide a way to turn async on/off separately for reading and writing bams, so that we can insulate ourselves from the risks you mention while turning it on where we want/need it. These global properties in Defaults are somewhat problematic, since they're not granular enough -- ideally async would be turned on/off via the reader/writer factories exclusively.

Owner

tfenne commented Jun 8, 2016

I agree that control of read vs. write separately is important - I often use async write because the performance benefit is large, but for read I don't see a huge benefit. That said @droazen I disagree that the reader/writer factories should be the exclusive place to control this, as that means that every single command line tool has to expose (hopefully consistent) parameters for these. Having a toolkit that is configured via system properties is very convenient.

Contributor

droazen commented Jun 8, 2016 edited

@tfenne I'd argue that other possible solutions to that would be to create a common base class for your tools to handle exposing settings for reader/writer creation, or create shared utility methods for reader/writer creation called by convention. Centralizing reader/writer creation in your toolkit has other benefits as well (eg., you can make sure that CRAM is properly supported across all tools).

That said, I think I'd be satisfied as long as the factories have a mechanism to control async directly, and we're not compelled to rely on the system properties.

Owner

tfenne commented Jun 8, 2016

@droazen And if I have a dozen independent toolkits that all need the same functionality? That have to remain separate for licensing/IP issues?

Contributor

droazen commented Jun 8, 2016

@tfenne Writing one shared utility method or base class per toolkit (not per tool) to configure your readers/writers in a consistent fashion still seems better than dealing with global configuration via properties, which is never sufficiently granular for everyone's needs, and only covers a subset of the total configuration options available anyway.

Contributor

d-cameron commented Jun 9, 2016

Async read performance improvements are only become significant when computation per read is relatively low. Have you benchmarked any of the read-intensive Picard tools? I expect you'll get a lot more than 16% out of Collect*Metrics and the likes.

Contributor

akiezun commented Jun 9, 2016 edited

FYI I looked at 2 simple metrics - speedup is indeed decent:
CollectBaseDistributionByCycle file 32% speedup
CollectInsertSizeMetrics 34% speedup

@droazen @tfenne how about splitting samjdk.use_async_io_samtools into 2 options so that everyone can pick the level of exposure they want (reader factory itself also supports this setting):
samjdk.use_async_io_samtools_read
samjdk.use_async_io_samtools_write

and we'll do similarly for tribble once async reading is turned back on (it's deleted now, see #583).

Contributor

droazen commented Jun 10, 2016

@akiezun I'm happy with anything as long as it's possible to configure/override at the factory level.

Contributor

droazen commented Jun 10, 2016

Re-assigning to @akiezun now that he's gotten the requested feedback

akiezun was assigned by droazen Jun 10, 2016

This was referenced Jun 11, 2016

Contributor

akiezun commented Jun 17, 2016

I think this should wait until #641 is merged

Contributor

akiezun commented Jun 21, 2016

@d-cameron can you rebase on top of master (now contains #641) and use the USE_ASYNC_IO_READ_FOR_SAMTOOLS flag for your async reader? when that's in, i'll do a final review/tests and we can merge it (we want to make this opt-in to lower the risk while the code is still new)

Coverage Status

Coverage increased (+0.05%) to 68.466% when pulling acd5bb1 on d-cameron:asyncread into 2e26fe8 on samtools:master.

Contributor

d-cameron commented Jun 23, 2016

First attempt at merging trashed symlinked since it was done on a windows machine. Redid merge from scratch on a linux machine so should be good to go now @akiezun

@akiezun akiezun commented on an outdated diff Jun 24, 2016

src/main/java/htsjdk/samtools/SamReaderFactory.java
+ abstract public SamReaderFactory referenceSequence(File referenceSequence);
+
+ /** Sets the specified reference sequence * */
+ abstract public SamReaderFactory referenceSource(CRAMReferenceSource referenceSequence);
+
+ /** Utility method to open the file get the header and close the file */
+ abstract public SAMFileHeader getFileHeader(File samFile);
+
+ /** Reapplies any changed options to the reader * */
+ abstract public void reapplyOptions(SamReader reader);
+
+ /** Set this factory's {@link ValidationStringency} to the provided one, then returns itself. */
+ abstract public SamReaderFactory validationStringency(final ValidationStringency validationStringency);
+
+ /** Set whether readers created by this factory will use asynchronous IO.
+ * If this methods is not called, this flag will default to the value of {@link Defaults#USE_ASYNC_IO_FOR_SAMTOOLS}.
@akiezun

akiezun Jun 24, 2016

Contributor

update to new constants

Coverage Status

Coverage increased (+0.06%) to 68.467% when pulling f562f65 on d-cameron:asyncread into 2e26fe8 on samtools:master.

Contributor

akiezun commented Jul 1, 2016

@jabbarish can you help review this PR?

Contributor

akiezun commented Jul 12, 2016

I tested on gatk and picard. GATK test suite works (both sync and async readinng).

Picard test suite fails with strange 5 errors:

java.lang.NoClassDefFoundError: org/apache/commons/compress/utils/CountingOutputStream
    at htsjdk.samtools.cram.build.CramIO.writeContainerForSamFileHeader(CramIO.java:269)
    at htsjdk.samtools.cram.build.CramIO.writeCramHeader(CramIO.java:183)
    at htsjdk.samtools.cram.build.CramIO.writeHeader(CramIO.java:118)
    at htsjdk.samtools.CRAMContainerStreamWriter.writeHeader(CRAMContainerStreamWriter.java:116)
    at htsjdk.samtools.CRAMFileWriter.writeHeader(CRAMFileWriter.java:119)
    at htsjdk.samtools.SAMFileWriterImpl.setHeader(SAMFileWriterImpl.java:142)
    at htsjdk.samtools.CRAMFileWriter.<init>(CRAMFileWriter.java:105)
    at htsjdk.samtools.SAMFileWriterFactory.createCRAMWriterWithSettings(SAMFileWriterFactory.java:485)
    at htsjdk.samtools.SAMFileWriterFactory.makeCRAMWriter(SAMFileWriterFactory.java:435)
    at htsjdk.samtools.SAMFileWriterFactory.makeWriter(SAMFileWriterFactory.java:381)
    at picard.sam.RevertSam$RevertSamWriter.<init>(RevertSam.java:556)
    at picard.sam.RevertSam.doWork(RevertSam.java:222)
    at picard.cmdline.CommandLineProgram.instanceMain(CommandLineProgram.java:208)
    at picard.cmdline.PicardCommandLine.instanceMain(PicardCommandLine.java:95)
    at picard.cmdline.PicardCommandLine.instanceMain(PicardCommandLine.java:100)
    at picard.cmdline.CommandLineProgramTest.runPicardCommandLine(CommandLineProgramTest.java:48)
    at picard.sam.RevertSamTest.testOutputByReadGroupWithOutputMap(RevertSamTest.java:172)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:86)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:643)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:820)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1128)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
    at org.testng.TestRunner.privateRun(TestRunner.java:782)
    at org.testng.TestRunner.run(TestRunner.java:632)
    at org.testng.SuiteRunner.runTest(SuiteRunner.java:366)
    at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:361)
    at org.testng.SuiteRunner.privateRun(SuiteRunner.java:319)
    at org.testng.SuiteRunner.run(SuiteRunner.java:268)
    at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
    at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
    at org.testng.TestNG.runSuitesSequentially(TestNG.java:1244)
    at org.testng.TestNG.runSuitesLocally(TestNG.java:1169)
    at org.testng.TestNG.run(TestNG.java:1064)
    at org.gradle.api.internal.tasks.testing.testng.TestNGTestClassProcessor.runTests(TestNGTestClassProcessor.java:133)
    at org.gradle.api.internal.tasks.testing.testng.TestNGTestClassProcessor.stop(TestNGTestClassProcessor.java:83)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:120)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:364)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
    at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.utils.CountingOutputStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 63 more

(and it does work fine on master picard)
I'm not sure what causing it yet. @lbergelson have you seen this?

Contributor

ZLyanov commented Jul 12, 2016

@akiezun Sorry for delay. Sure, we will review this PR.

akiezun removed their assignment Jul 26, 2016

Contributor

akiezun commented Jul 26, 2016

@jabbarish Any update on the review? I'm going to un-assign myself because I wont be able to work on this for now.

Contributor

lbergelson commented Jul 26, 2016

@akiezun I just noticed you mentioned me here. I haven't seen that error in picard. It looks like either a packaging error in htsjdk or in picard. Do you see the same error again if you try with the current picard master? It's possible it was some messed up intermediate build that's fixed now?

Contributor

ZLyanov commented Jul 26, 2016

@akiezun Sorry for delay, we will come back with detailed reviews of several issues soon. Quick answer: this exception occurs when you use 'not complete' jar. Try the one created with shadowJar, it should work.

Contributor

yfarjoun commented Aug 9, 2016

@jabbarish just a quick note to say that we are still waiting for the review. please let us know if you are still planing on reviewing it. Thanks.

@lbergelson lbergelson commented on an outdated diff Aug 23, 2016

src/main/java/htsjdk/samtools/SamReaderFactory.java
@@ -1,528 +1,528 @@
-package htsjdk.samtools;
@lbergelson

lbergelson Aug 23, 2016

Contributor

The file permissions here got changed making it look like a huge diff in this file. Lets put it back to how it was.

Contributor

yfarjoun commented Aug 23, 2016

@jacarey We need some multithreaded expertise to PR this. would you be able to take another final look?

Contributor

yfarjoun commented Aug 24, 2016

@jacarey, I just heard via a private channel that there might be a competing PR being prepared that has (supposedly) better performance. it would be best if we can do a comparative reivew somehow...

Contributor

droazen commented Sep 29, 2016

@jabbarish have you had time to look at this? You are the reviewer for this branch as nominated by @akiezun, and @d-cameron was recently asking about the status of this.

Contributor

ZLyanov commented Sep 29, 2016

In short: we propose to introduce centralized thread management tool to control CPU load in all metrics (we will make a separate PR), we think the proposed changes should be implemented using Java 8 features, and finally there is more room for improvements here when using threads. I think we should better make another PR with our solution rather then commenting this one.

Contributor

droazen commented Sep 29, 2016

@jabbarish Since this PR has already undergone several review passes and is now pretty mature, it seems worth merging this work while waiting for your proposed more generalized solution to materialize. Are you able to take a look at the code here and offer specific suggestions/improvements?

Contributor

ZLyanov commented Sep 29, 2016

Sure. I'll make it by Monday.
On Thu, Sep 29, 2016 at 7:51 PM droazen notifications@github.com wrote:

@jabbarish https://github.com/jabbarish Since this PR has already
undergone several review passes and is now pretty mature, it seems worth
merging this work while waiting for your proposed more generalized solution
to materialize. Are you able to take a look at the code here and offer
specific suggestions/improvements?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#576 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AJOt7hFRV57KYQa1XlcsBUUGsNcd0a6cks5qu-xygaJpZM4IJwGF
.

@ZLyanov

Finally, the consequence of the above mentioned decisions is the intensive and complicated usage of methods like ensureReadAhead and tryQueueTask in both foreground and background threads.

We propose to consider classic single-threaded producer-consumer solution with minimum sync primitives involved, based on a cycle in the background thread and communication via the existing bounded blocking queue (mResult).

Thanks.

+ * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally.
+ */
+public class AsyncBlockCompressedInputStream extends BlockCompressedInputStream {
+ private static final int READ_AHEAD_BUFFERS = (int)Math.ceil(Defaults.NON_ZERO_BUFFER_SIZE / BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
@ZLyanov

ZLyanov Oct 3, 2016

Contributor

WIll READ_AHEAD_BUFFERS always be greater than zero?

@d-cameron

d-cameron Oct 4, 2016 edited

Contributor

Yes. Math.ceil and the upstream guarantee of a non-zero Defaults.NON_ZERO_BUFFER_SIZE ensure this.

+ t.setDaemon(true);
+ return t;
+ }
+ });
@ZLyanov

ZLyanov Oct 3, 2016 edited

Contributor

this object will always use no more than one thread, so why create thread pool for all available cores? What if we would use this as a part of some pipeline on an AWS instance with 128 or more cores? Even if we would have several AsyncBlockCompressedInputStreams in one app, one dedicated thread per one AsyncBlockCompressedInputStream would be enough.

@d-cameron

d-cameron Oct 4, 2016 edited

Contributor

The use of a thread pool ensure that when reading from many BAM files at once, the number of threads spawned is limited to the number of CPUs available. The scenario the thread pool protects against is the concurrent opening of many (hundreds of) BAM files, a scenario I have personally encountered.

+ * Indicates whether a read-ahead task has been scheduled to run. Only one read-ahead task
+ * per stream can be scheduled at any one time.
+ */
+ private final Semaphore running = new Semaphore(1);
@ZLyanov

ZLyanov Oct 3, 2016

Contributor

Due to the decision of using static multi-threaded pool you have to provide sequential nature of block reading via the semaphore. It provides some sort of critical section support too. It would not be necessary if you would use single threaded executor: it is sequential by nature.

@ZLyanov

ZLyanov Oct 3, 2016

Contributor

Sorry, some comments need to be reviewed, temporarily removed.

@d-cameron

d-cameron Oct 4, 2016

Contributor

The executor is static and shared across all input streams. A single-threaded executor is not appropriate.

Contributor

ZLyanov commented Oct 3, 2016

What we also must keep in mind, that this approach makes it difficult to provide further improvements in BlockCompressedInputStream. For example, reading from file and decompressing a block could also be done in a separate background threads.

Contributor

ZLyanov commented Oct 3, 2016

One more thought: we should not consider this approach or ours mentioned above to improve BlockCompressedInputStream as a final solutions to provide fast reading of information. For example, we can concurrently load SamRecords via iterator and process them. Here are my marks on different launches of CollectAlignmentSummaryMetrics with combinations of Async Block Compressed Read and Async Load/Process (time to process 20 mln. records):
Original: 00:02:09s
This branch: 00:01:48s.
Async Load/Process: 00:01:44s.
Combination of both: 00:01:33s.

Contributor

d-cameron commented Oct 4, 2016 edited

As for performance, do you have an idea of why yours differs from the earlier benchmarking of this PR (see #482)? That parent PR also includes an async iterator which allows for eager SAMRecord parsing which, when combined with aysnc decompression resulted in processing times of less than half that of the single-threaded version (of course this is dependent on the amount of processing that is done per record).

Contributor

d-cameron commented Oct 4, 2016 edited

An early version of this PR did indeed use the classic single-threaded producer-consumer solution with a dedicated background thread but it was found that in use cases such as merging many BAM files (such as occurs in the final stage of a sort), a thread-per-BAM model was not acceptable. In my use case, not only was the memory overhead of thousands of threads unacceptable, but I ran into the per-user OS limits. A typical linux distribution will have a process limit at least an order a magnitude less than the file handle limit.

This PR uses a shared executor (with the associated additional complications in the synchronisation logic) because the producer-consumer implementation resulted in unacceptable per BAM overhead.

Contributor

d-cameron commented Oct 4, 2016

You are correct that async decompression and file reading would provide further improvements, but this design does not preclude such an optimisation, as eager file reading could be provided by a wrapper to the stream passed to the BlockCompressedInputStream. For example, a eager loading version of BufferedInputStream could be implemented and passed to the BlockCompressedInputStream. Such as design would improve multi-BAM BlockCompressedInputStream performance as CPU under-utilisation as background threads may be blocking on the underlying IO and the total number of background threads is limited.

Contributor

ZLyanov commented Oct 4, 2016

@d-cameron,
I used my laptop with SSD for benchmarking, this is why results differ, I think.
As for common pool, I understand the reasons, but the implementation is both difficult to extend and read. I believe we could find another solution(s). For example, separate concrete logic of block decompression calls and providing sequential manner of tasks in background thread. I saw several solutions of different degree of complexity, here is one: serial executor from Executor interface docs: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
It looks very similar to what you did. Could we try to use it?

Contributor

d-cameron commented Oct 4, 2016 edited

@jabbarish If there's a simple design pattern that covers the concurrency requirements I'm all for that. The pattern needs to be:

  • Light-weight per class instance
  • Able to limit total concurrency across all instances
  • Able to queue multiple background tasks per instance in serial (eager decompression of multiple blocks)
  • Able to cancel all outstanding tasks for a given instance (due to seek() calls).
  • Able to have no blocking background thread task when read-ahead buffer is full

The implementation complexity in this PR is due to the seek() cancelling of all outstanding work, the delaying of the eager look-ahead until data is requested (to remove any overhead from the stream open -> seek use case), and the serial scheduling of multiple background tasks. Separating out some of these concurrency concerns by replacing some of the concurrency constructs (eg swapping a semaphore for an Executor instance), may make reasoning about correctness simpler.

You mentioned a larger concurrency/thread management PR. What time-frame are we talking about for such a design change, and is there a design pattern that this would integrate with, or would this PR get replaced wholesale by the larger PR?

Edit: additional constraint

Contributor

ZLyanov commented Oct 4, 2016

@d-cameron I suppose the coming PR from our team should not replace yours. We only want to propose a convenient common solution for thread management. On the other hand, your PR seems too complicated in both reading and extensibility, as I wrote before. Your current code is correct and fast, so we could accept it but with immediate effort to improve it. Actually, easy-to-read is not as important as easy-to-extend.
I propose to try to find a better solution together. Let's try to do it during this week, and if we fail - simply accept this PR as is and try harder later.
What do you think?

Contributor

d-cameron commented Oct 4, 2016

@jabbarish Sounds good. I'm currently on holidays, but I'll see what I can put together.

Contributor

ZLyanov commented Oct 9, 2016

@d-cameron please have a look at https://github.com/jabbarish/htsjdk/blob/zl_async/src/main/java/htsjdk/samtools/util/AsyncBlockCompressedInputStream.java
I've tried to use mentioned above SerialExecutor combined with CompletableFuture.

Contributor

droazen commented Nov 1, 2016

@ZLyanov @d-cameron How would you like to proceed with this PR?

Contributor

ZLyanov commented Nov 1, 2016

@droazen as I stated above, I think the proposed PR is correct, but it will be harder later to extend it. Actually, no problem, we can afford it. I see direct continuation in taking 'unpack' onto another thread. We can do it (as well as a necessary refactoring) in subsequent PRs.

Contributor

d-cameron commented Nov 3, 2016

@droazen my preference is to merge, then refactor along the lines of @ZLyanov's proposal at a later stage. The details under discussion are entirely internal to the async class so a reimplementation is fairly trivial to push through in another PR.

The code @ZLyanov has linked is much more elegant design but also looks like seek() does not guarantee that no more read-ahead will be performed (since it's there's no guarantee on thread ordering into synchronized blocks). It also looks vulnerable to stream corruption race condition as a scheduled read-ahead task could be removed from the queues, but then run processNextBlock() during a call to seek().

Contributor

droazen commented Nov 7, 2016

@d-cameron Sounds like a good plan. Can you rebase/squash this branch into a single commit, and let us know here when it's ready for merge?

Coverage Status

Coverage increased (+0.04%) to 69.889% when pulling 58dd6a2 on d-cameron:asyncread into 1c66107 on samtools:master.

Contributor

d-cameron commented Nov 8, 2016

Rebase/squash complete and ready for merging.

Contributor

droazen commented Nov 29, 2016

@d-cameron Looks like we need one final rebase -- sorry! We promise to merge this time :)

Coverage Status

Coverage increased (+0.2%) to 70.056% when pulling e9c29a3 on d-cameron:asyncread into 1c66107 on samtools:master.

@d-cameron d-cameron Added async BAM decompression
edbe317

Coverage Status

Coverage increased (+0.04%) to 70.05% when pulling edbe317 on d-cameron:asyncread into 6469969 on samtools:master.

Contributor

d-cameron commented Dec 1, 2016

Resolved merge conflicts. Should be good to go @droazen

Contributor

droazen commented Dec 1, 2016

@d-cameron Had a final look at the branch before merge, and I'm noticing that there are no direct unit tests for BlockCompressedInputStream itself at all, even though that class is modified quite a bit by this PR. Since BlockCompressedInputStream is such a critical class in htsjdk, I'd feel better about merging this PR if you'd add some direct unit tests for that class. Ideally the tests should use a bam file that contains more than one compressed block.

@d-cameron d-cameron Added BlockCompressedInputStream test coverage
30eb537

Coverage Status

Coverage increased (+0.4%) to 70.404% when pulling 30eb537 on d-cameron:asyncread into 6469969 on samtools:master.

Contributor

d-cameron commented Dec 6, 2016

@droazen I've added test coverage based on an input file of random (non-BAM) data which I independently compressed using tabix bgzip.

Contributor

d-cameron commented Jan 6, 2017

@droazen do you have any ETA on when this will get merged?

Contributor

lbergelson commented Jan 6, 2017

The pull request that lived for ever...

@d-cameron David should be back from Vacation on Monday. I'm going to make sure he takes a look at this first thing.

Contributor

droazen commented Jan 9, 2017

👍 merging -- sorry for the long wait @d-cameron

@droazen droazen merged commit 7dbe733 into samtools:master Jan 9, 2017

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
coverage/coveralls Coverage increased (+0.4%) to 70.404%
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment