Async BAM decompression #576

Merged
merged 2 commits into from Jan 9, 2017
Jump to file or symbol
Failed to load files and symbols.
+590 −105
Split
@@ -25,6 +25,7 @@
import htsjdk.samtools.seekablestream.SeekableStream;
+import htsjdk.samtools.util.AsyncBlockCompressedInputStream;
import htsjdk.samtools.util.BinaryCodec;
import htsjdk.samtools.util.BlockCompressedInputStream;
import htsjdk.samtools.util.CloseableIterator;
@@ -67,10 +68,6 @@
// If true, all SAMRecords are fully decoded as they are read.
private boolean eagerDecode;
- // If true, the BAMFileReader will use asynchronous IO.
- // Note: this field currently has no effect (is not hooked up anywhere), but will be in the future. See https://github.com/samtools/htsjdk/pull/576
- private final boolean useAsynchronousIO;
-
// For error-checking.
private ValidationStringency mValidationStringency;
@@ -107,8 +104,7 @@
throws IOException {
mIndexFile = indexFile;
mIsSeekable = false;
- this.useAsynchronousIO = useAsynchronousIO;
- mCompressedInputStream = new BlockCompressedInputStream(stream);
+ mCompressedInputStream = useAsynchronousIO ? new AsyncBlockCompressedInputStream(stream) : new BlockCompressedInputStream(stream);
mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream));
this.eagerDecode = eagerDecode;
this.mValidationStringency = validationStringency;
@@ -129,7 +125,7 @@
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
- this(new BlockCompressedInputStream(file), indexFile!=null ? indexFile : SamFiles.findIndex(file), eagerDecode, useAsynchronousIO, file.getAbsolutePath(), validationStringency, factory);
+ this(useAsynchronousIO ? new AsyncBlockCompressedInputStream(file) : new BlockCompressedInputStream(file), indexFile!=null ? indexFile : SamFiles.findIndex(file), eagerDecode, useAsynchronousIO, file.getAbsolutePath(), validationStringency, factory);
if (mIndexFile != null && mIndexFile.lastModified() < file.lastModified()) {
System.err.println("WARNING: BAM index file " + mIndexFile.getAbsolutePath() +
" is older than BAM " + file.getAbsolutePath());
@@ -145,7 +141,7 @@
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
- this(new BlockCompressedInputStream(strm), indexFile, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory);
+ this(useAsynchronousIO ? new AsyncBlockCompressedInputStream(strm) : new BlockCompressedInputStream(strm), indexFile, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory);
}
BAMFileReader(final SeekableStream strm,
@@ -155,7 +151,7 @@
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
- this(new BlockCompressedInputStream(strm), indexStream, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory);
+ this(useAsynchronousIO ? new AsyncBlockCompressedInputStream(strm) : new BlockCompressedInputStream(strm), indexStream, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory);
}
private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
@@ -171,7 +167,6 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
mCompressedInputStream = compressedInputStream;
mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream));
this.eagerDecode = eagerDecode;
- this.useAsynchronousIO = useAsynchronousIO;
this.mValidationStringency = validationStringency;
this.samRecordFactory = factory;
this.mFileHeader = readHeader(this.mStream, this.mValidationStringency, source);
@@ -191,7 +186,6 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
mCompressedInputStream = compressedInputStream;
mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream));
this.eagerDecode = eagerDecode;
- this.useAsynchronousIO = useAsynchronousIO;
this.mValidationStringency = validationStringency;
this.samRecordFactory = factory;
this.mFileHeader = readHeader(this.mStream, this.mValidationStringency, source);
@@ -118,7 +118,7 @@ public SamReader open(final Path path) {
abstract public SamReaderFactory validationStringency(final ValidationStringency validationStringency);
/** Set whether readers created by this factory will use asynchronous IO.
- * If this methods is not called, this flag will default to the value of {@link Defaults#USE_ASYNC_IO_FOR_SAMTOOLS}.
+ * If this methods is not called, this flag will default to the value of {@link Defaults#USE_ASYNC_IO_READ_FOR_SAMTOOLS}.
* Note that this option may not be applicable to all readers returned from this factory.
* Returns the factory itself. */
abstract public SamReaderFactory setUseAsyncIo(final boolean asynchronousIO);
@@ -0,0 +1,215 @@
+/*
+ * The MIT License
+ *
+ * Copyright (c) 2016 Daniel Cameron
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package htsjdk.samtools.util;
+
+
+import htsjdk.samtools.Defaults;
+import htsjdk.samtools.seekablestream.SeekableStream;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Asynchronous read-ahead implementation of {@link htsjdk.samtools.util.BlockCompressedInputStream}.
+ *
+ * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally.
+ */
+public class AsyncBlockCompressedInputStream extends BlockCompressedInputStream {
+ private static final int READ_AHEAD_BUFFERS = (int)Math.ceil(Defaults.NON_ZERO_BUFFER_SIZE / BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
@ZLyanov

ZLyanov Oct 3, 2016

Contributor

WIll READ_AHEAD_BUFFERS always be greater than zero?

@d-cameron

d-cameron Oct 4, 2016 edited

Contributor

Yes. Math.ceil and the upstream guarantee of a non-zero Defaults.NON_ZERO_BUFFER_SIZE ensure this.

+ private static final Executor threadpool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
@ZLyanov

ZLyanov Oct 3, 2016 edited

Contributor

this object will always use no more than one thread, so why create thread pool for all available cores? What if we would use this as a part of some pipeline on an AWS instance with 128 or more cores? Even if we would have several AsyncBlockCompressedInputStreams in one app, one dedicated thread per one AsyncBlockCompressedInputStream would be enough.

@d-cameron

d-cameron Oct 4, 2016 edited

Contributor

The use of a thread pool ensure that when reading from many BAM files at once, the number of threads spawned is limited to the number of CPUs available. The scenario the thread pool protects against is the concurrent opening of many (hundreds of) BAM files, a scenario I have personally encountered.

+ /**
+ * Next blocks (in stream order) that have already been decompressed.
+ */
+ private final BlockingQueue<DecompressedBlock> mResult = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS);
+ /**
+ * Buffers used to decompress previous blocks that are no longer in use.
+ * These buffers are reused if possible.
+ * Note that no blocking occurs on this buffer and a blocking queue is used purely
+ * because it is a base library synchronized queue implementation
+ * (and Collections.synchronizedQueue() does not exist).
+ */
+ private final BlockingQueue<byte[]> freeBuffers = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS);
+ /**
+ * Indicates whether a read-ahead task has been scheduled to run. Only one read-ahead task
+ * per stream can be scheduled at any one time.
+ */
+ private final Semaphore running = new Semaphore(1);
@ZLyanov

ZLyanov Oct 3, 2016

Contributor

Due to the decision of using static multi-threaded pool you have to provide sequential nature of block reading via the semaphore. It provides some sort of critical section support too. It would not be necessary if you would use single threaded executor: it is sequential by nature.

@ZLyanov

ZLyanov Oct 3, 2016

Contributor

Sorry, some comments need to be reviewed, temporarily removed.

@d-cameron

d-cameron Oct 4, 2016

Contributor

The executor is static and shared across all input streams. A single-threaded executor is not appropriate.

+ /**
+ * Indicates whether any scheduled task should abort processing and terminate
+ * as soon as possible since the result will be discarded anyway.
+ */
+ private volatile boolean mAbort = false;
+
+ public AsyncBlockCompressedInputStream(final InputStream stream) {
+ super(stream, true);
+ }
+
+ public AsyncBlockCompressedInputStream(final File file)
+ throws IOException {
+ super(file);
+ }
+
+ public AsyncBlockCompressedInputStream(final URL url) {
+ super(url);
+ }
+
+ public AsyncBlockCompressedInputStream(final SeekableStream strm) {
+ super(strm);
+ }
+
+ @Override
+ protected DecompressedBlock nextBlock(byte[] bufferAvailableForReuse) {
+ if (bufferAvailableForReuse != null) {
+ freeBuffers.offer(bufferAvailableForReuse);
+ }
+ return nextBlockSync();
+ }
+
+ @Override
+ protected void prepareForSeek() {
+ flushReadAhead();
+ super.prepareForSeek();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Suppress interrupts while we close.
+ final boolean isInterrupted = Thread.interrupted();
+ mAbort = true;
+ try {
+ flushReadAhead();
+ super.close();
+ } finally {
+ if (isInterrupted) Thread.currentThread().interrupt();
+ }
+ }
+ /**
+ * Foreground thread blocking operation that aborts all read-ahead tasks
+ * and flushes all read-ahead results.
+ */
+ private void flushReadAhead() {
+ final boolean abortStatus = mAbort;
+ mAbort = true;
+ try {
+ // block until the thread pool operation has completed
+ running.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting for decompression thread", e);
+ }
+ // flush any read-ahead results
+ mResult.clear();
+ mAbort = abortStatus;
+ running.release();
+ }
+ /**
+ * Ensures that a read-ahead task for this stream exists in the thread pool.
+ */
+ private void ensureReadAhead() {
+ if (running.tryAcquire()) {
+ tryQueueTask();
+ }
+ }
+ /**
+ * Try to queue another read-ahead buffer
+ * This method should only be invoked by the owner of the running semaphore
+ */
+ private void tryQueueTask() {
+ if (mAbort) {
+ // Potential deadlock between getNextBlock() and flushReadAhead() here
+ // This requires seek()/close() and another method to be called
+ // at the same time. Since the parent class is not thread-safe
+ // this is an acceptable behavior.
+ running.release();
+ return;
+ }
+ if (mResult.remainingCapacity() == 0) {
+ // read-ahead has already filled the results buffer
+ running.release();
+ if (mResult.remainingCapacity() > 0) {
+ // race condition this second check fixes:
+ // - worker thread context switch after checking remaining capacity is zero
+ // - foreground thread calls getNextBlock() repeatedly until blocking
+ // - worker thread switches back in and releases mutex
+ // = foreground blocking on mResult.take(), mutex free, no worker
+ // -> try to take back mutex and start worker
+ // if that fails, the someone else took the lock and would
+ // have started the background worker. (except if flushReadAhead()
+ // took the lock with getNextBlock() still blocking: not thread-safe
+ // so we don't care)
+ ensureReadAhead();
+ return;
+ } else {
+ return;
+ }
+ }
+ // we are able to perform a read-ahead operation
+ // ownership of the running mutex is now with the threadpool task
+ threadpool.execute(new AsyncBlockCompressedInputStreamRunnable());
+ }
+ /**
+ * Foreground thread blocking operation that retrieves the next read-ahead buffer.
+ * Lazy initiation of read-ahead is performed if required.
+ * @return next decompressed block in input stream
+ */
+ private DecompressedBlock nextBlockSync() {
+ ensureReadAhead();
+ DecompressedBlock nextBlock;
+ try {
+ nextBlock = mResult.take();
+ } catch (InterruptedException e) {
+ return new DecompressedBlock(0, 0, e);
+ }
+ ensureReadAhead();
+ return nextBlock;
+ }
+ private class AsyncBlockCompressedInputStreamRunnable implements Runnable {
+ /**
+ * Thread pool operation that fills the read-ahead queue
+ */
+ @Override
+ public void run() {
+ final DecompressedBlock decompressed = processNextBlock(freeBuffers.poll());
+ if (!mResult.offer(decompressed)) {
+ // offer should never block since we never queue a task when the results buffer is full
+ running.release(); // safety release to ensure foreground close() does not block indefinitely
+ throw new IllegalStateException("Decompression buffer full");
+ }
+ tryQueueTask();
+ }
+ }
+}
Oops, something went wrong.