Add optional wrapper for the underlying SeekableByteChannel #775

Merged
merged 3 commits into from Jan 6, 2017
@@ -39,9 +39,11 @@
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
+import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.function.Function;
/**
* Describes a SAM-like resource, including its data (where the records are), and optionally an index.
@@ -89,7 +91,15 @@ public String toString() {
public static SamInputResource of(final File file) { return new SamInputResource(new FileInputResource(file)); }
/** Creates a {@link SamInputResource} reading from the provided resource, with no index. */
- public static SamInputResource of(final Path path) { return new SamInputResource(new PathInputResource(path)); }
@lbergelson

lbergelson Jan 4, 2017

Contributor

we should probably keep original overload for backwards compatibility reasons.

@lbergelson

lbergelson Jan 4, 2017

Contributor

Could you add javadoc explaining what the wrapper is for on all these constructors that take one?

@jean-philippe-martin

jean-philippe-martin Jan 5, 2017

Contributor

Kept overload, added comment here. Which other ctors would you like to add a comment to? I think this one's the only public one so it would be the right place for the comment.

@lbergelson

lbergelson Jan 5, 2017

Contributor

I thought it might be worth it to add an explanation to the PathInputResource constructor in case someone wanders into the code there and wants to know why path inputs have this special thing that the others don't.

+ public static SamInputResource of(final Path path) {
+ return new SamInputResource(new PathInputResource(path));
+ }
+
+ /** Creates a {@link SamInputResource} reading from the provided resource, with no index,
+ * and with a wrapper to apply to the SeekableByteChannel for custom prefetching/buffering. */
+ public static SamInputResource of(final Path path, Function<SeekableByteChannel, SeekableByteChannel> wrapper) {
+ return new SamInputResource(new PathInputResource(path, wrapper));
+ }
/** Creates a {@link SamInputResource} reading from the provided resource, with no index. */
public static SamInputResource of(final InputStream inputStream) { return new SamInputResource(new InputStreamInputResource(inputStream)); }
@@ -121,7 +131,7 @@ public SamInputResource index(final File file) {
/** Updates the index to point at the provided resource, then returns itself. */
public SamInputResource index(final Path path) {
- this.index = new PathInputResource(path);
+ this.index = new PathInputResource(path, Function.identity());
return this;
}
@@ -268,11 +278,12 @@ public SRAAccession asSRAAccession() {
class PathInputResource extends InputResource {
final Path pathResource;
+ final Function<SeekableByteChannel, SeekableByteChannel> wrapper;
final Lazy<SeekableStream> lazySeekableStream = new Lazy<SeekableStream>(new Lazy.LazyInitializer<SeekableStream>() {
@Override
public SeekableStream make() {
try {
- return new SeekablePathStream(pathResource);
+ return new SeekablePathStream(pathResource, wrapper);
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
@@ -281,8 +292,14 @@ public SeekableStream make() {
PathInputResource(final Path pathResource) {
+ this(pathResource, Function.identity());
+ }
+
+ // wrapper applies to the SeekableByteChannel for custom prefetching/buffering.
+ PathInputResource(final Path pathResource, Function<SeekableByteChannel, SeekableByteChannel> wrapper) {
super(Type.PATH);
this.pathResource = pathResource;
+ this.wrapper = wrapper;
}
@Override
@@ -33,9 +33,11 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.function.Function;
import java.util.zip.GZIPInputStream;
/**
@@ -74,11 +76,13 @@
public abstract class SamReaderFactory {
private static ValidationStringency defaultValidationStringency = ValidationStringency.DEFAULT_STRINGENCY;
-
+
+ private Function<SeekableByteChannel, SeekableByteChannel> pathWrapper = Function.identity();
+
abstract public SamReader open(final File file);
public SamReader open(final Path path) {
- final SamInputResource r = SamInputResource.of(path);
+ final SamInputResource r = SamInputResource.of(path, getPathWrapper());
final Path indexMaybe = SamFiles.findIndex(path);
if (indexMaybe != null) r.index(indexMaybe);
@lbergelson

lbergelson Jan 4, 2017

Contributor

I just noticed, this isn't going to wrap the index. That seems like a problem.

We might actually want the ability to treat the index and the main path separately? Do you think that's necessary? I.e. download the index in it's entirety and store it in on disk vs cache chunks bam in memory as we stream over them.

@jean-philippe-martin

jean-philippe-martin Jan 5, 2017

Contributor

Not wrapping the index is exactly the right thing to do. The index is small, we already read the whole thing into memory at startup anyways.
We could optionally add a way for the user to specify their own wrapper for the index, if we see a need for it later.

@lbergelson

lbergelson Jan 5, 2017 edited

Contributor

You're certain the index is always read into memory and cached? I could have sworn @kcibul was having issues where there was slowdown due to repeatedly accessing the index over the network. He ended up copying the index files locally. It looks like there's an option CACHE_FILE_BASED_INDEXES, maybe he wasn't using that while you are?

@jean-philippe-martin

jean-philippe-martin Jan 5, 2017

Contributor

It's up to user code; reading the whole thing is the right thing to do so I expect performance-oriented code will do that already (by "we" earlier I meant my own code).

return open(r);
@@ -102,6 +106,25 @@ public SamReader open(final Path path) {
/** Sets a specific Option to a boolean value. * */
abstract public SamReaderFactory setOption(final Option option, boolean value);
+ /** Sets a wrapper to modify the SeekableByteChannel from an opened Path, e.g. to add
+ * buffering or prefetching. This only works on Path inputs since we need a SeekableByteChannel.
+ *
+ * @param wrapper how to modify the SeekableByteChannel (Function.identity to unset)
+ * @return this
+ */
+ public SamReaderFactory setPathWrapper(Function<SeekableByteChannel, SeekableByteChannel> wrapper) {
+ this.pathWrapper = wrapper;
+ return this;
+ }
+
+ /** Gets the wrapper previously set via setPathWrapper.
+ *
+ * @return the wrapper.
+ */
+ public Function<SeekableByteChannel, SeekableByteChannel> getPathWrapper() {
+ return pathWrapper;
+ }
+
/** Sets the specified reference sequence * */
abstract public SamReaderFactory referenceSequence(File referenceSequence);
@@ -138,8 +161,8 @@ public static SamReaderFactory makeDefault() {
}
/**
- * Creates an "empty" factory with no enabled {@link Option}s, {@link ValidationStringency#DEFAULT_STRINGENCY}, and
- * {@link htsjdk.samtools.DefaultSAMRecordFactory}.
+ * Creates an "empty" factory with no enabled {@link Option}s, {@link ValidationStringency#DEFAULT_STRINGENCY},
+ * no path wrapper, and {@link htsjdk.samtools.DefaultSAMRecordFactory}.
*/
public static SamReaderFactory make() {
return new SamReaderFactoryImpl(EnumSet.noneOf(Option.class), ValidationStringency.DEFAULT_STRINGENCY, DefaultSAMRecordFactory.getInstance());
@@ -155,10 +178,15 @@ public static SamReaderFactory make() {
private CRAMReferenceSource referenceSource;
private SamReaderFactoryImpl(final EnumSet<Option> enabledOptions, final ValidationStringency validationStringency, final SAMRecordFactory samRecordFactory) {
+ this(enabledOptions, validationStringency, samRecordFactory, Function.identity());
+ }
+
+ private SamReaderFactoryImpl(final EnumSet<Option> enabledOptions, final ValidationStringency validationStringency, final SAMRecordFactory samRecordFactory, final Function<SeekableByteChannel, SeekableByteChannel> wrapper) {
this.enabledOptions = EnumSet.copyOf(enabledOptions);
this.samRecordFactory = samRecordFactory;
this.validationStringency = validationStringency;
this.customReaderFactory = CustomReaderFactory.getInstance();
+ setPathWrapper(wrapper);
}
@Override
@@ -273,6 +301,7 @@ public SamReader open(final SamInputResource resource) {
// TODO: Throw an exception here? An index _may_ have been provided, but we're ignoring it
bufferedIndexStream = null;
}
+
primitiveSamReader = new BAMFileReader(
IOUtil.maybeBufferedSeekableStream(data.asUnbufferedSeekableStream()),
bufferedIndexStream,
@@ -388,7 +417,7 @@ private boolean isSra(final File sourceFile) {
}
public static SamReaderFactory copyOf(final SamReaderFactoryImpl target) {
- return new SamReaderFactoryImpl(target.enabledOptions, target.validationStringency, target.samRecordFactory);
+ return new SamReaderFactoryImpl(target.enabledOptions, target.validationStringency, target.samRecordFactory, target.getPathWrapper());
}
}
@@ -9,6 +9,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.function.Function;
/**
* An implementation of {@link SeekableStream} for {@link Path}.
@@ -29,8 +30,16 @@
private final ByteBuffer oneByteBuf = ByteBuffer.allocate(1);
public SeekablePathStream(final Path path) throws IOException {
+ this(path, null);
+ }
+
+ public SeekablePathStream(final Path path, Function<SeekableByteChannel, SeekableByteChannel> wrapper) throws IOException {
this.path = path;
- this.sbc = Files.newByteChannel(path);
+ if (null==wrapper) {
+ this.sbc = Files.newByteChannel(path);
+ } else {
+ this.sbc = wrapper.apply(Files.newByteChannel(path));
+ }
ALL_INSTANCES.add(this);
@lbergelson

lbergelson Jan 4, 2017

Contributor

Should this constructor one just delegate to the new one with Function.identity()?

@jean-philippe-martin

jean-philippe-martin Jan 5, 2017

Contributor

Yes, done (though with null since we also want to accept that)

}
@@ -6,6 +6,10 @@
import htsjdk.samtools.seekablestream.SeekableHTTPStream;
import htsjdk.samtools.seekablestream.SeekableStreamFactory;
import htsjdk.samtools.util.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Paths;
+import java.util.function.Function;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -59,6 +63,41 @@ private int countRecords(final SamReader reader) {
return count;
}
+ private static SeekableByteChannel addHeader(SeekableByteChannel input) {
+ try {
+ int total = (int)input.size();
+ final String comment = "@HD\tVN:1.0 SO:unsorted\n" +
+ "@SQ\tSN:chr1\tLN:101\n" +
+ "@SQ\tSN:chr2\tLN:101\n" +
+ "@SQ\tSN:chr3\tLN:101\n" +
+ "@RG\tID:0\tSM:JP was here\n";
+
+ byte[] commentBuf = comment.getBytes();
+ ByteBuffer buf = ByteBuffer.allocate(total + commentBuf.length);
+ buf.put(commentBuf);
+ input.position(0);
+ while (input.read(buf)>0) {
+ // read until EOF
+ }
+ buf.flip();
+ return new SeekableByteChannelFromBuffer(buf);
+ } catch (IOException x) {
+ throw new RuntimeException(x);
+ }
+ }
+
+ @Test
+ public void testWrap() throws IOException {
+ final Path input = Paths.get(TEST_DATA_DIR.getPath(), "noheader.sam");
+ final SamReader wrappedReader =
+ SamReaderFactory
+ .makeDefault()
+ .setPathWrapper(SamReaderFactoryTest::addHeader)
+ .open(input);
+ int records = countRecords(wrappedReader);
+ Assert.assertEquals(10, records);
+ }
+
// See https://github.com/samtools/htsjdk/issues/76
@Test(dataProvider = "queryIntervalIssue76TestCases")
public void queryIntervalIssue76(final String sequenceName, final int start, final int end, final int expectedCount) throws IOException {
@@ -194,7 +233,7 @@ private InputResource composeInputResourceForType(final InputResource.Type type,
case FILE:
return new FileInputResource(f);
case PATH:
- return new PathInputResource(f.toPath());
+ return new PathInputResource(f.toPath(), Function.identity());
case URL:
return new UrlInputResource(url);
case SEEKABLE_STREAM:
@@ -0,0 +1,85 @@
+package htsjdk.samtools;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * A buffer-backed SeekableByteChannel, for testing.
+ */
+public class SeekableByteChannelFromBuffer implements SeekableByteChannel {
+
+ private ByteBuffer buf;
+ private boolean open = true;
+
+ public SeekableByteChannelFromBuffer(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (buf.position() == buf.limit()) {
+ // signal EOF
+ return -1;
+ }
+ int before = dst.position();
+ dst.put(buf);
+ return dst.position() - before;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new IOException("read-only channel");
+ }
+
+ @Override
+ public long position() throws IOException {
+ checkOpen();
+ return buf.position();
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ checkOpen();
+ buf.position((int)newPosition);
+ return this;
+ }
+
+ @Override
+ public long size() throws IOException {
+ checkOpen();
+ return buf.limit();
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ checkOpen();
+ if (size <0) {
+ throw new IllegalArgumentException("negative size");
+ }
+ if (size > buf.limit()) {
+ throw new IllegalArgumentException("size larger than current");
+ }
+ buf.limit((int)size);
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public void close() throws IOException {
+ open = false;
+ }
+
+ private void checkOpen() throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+ }
+}
@@ -0,0 +1,10 @@
+A 73 chr2 1 255 10M * 0 0 CAACAGAAGC )'.*.+2,)) RG:Z:0
+A 133 * 0 0 * chr2 1 0 CAACAGAAGC )'.*.+2,)) RG:Z:0
+B 99 chr1 1 255 10M = 26 35 CAACAGAAGC )'.*.+2,)) RG:Z:0
+B 147 chr1 26 255 10M = 1 -35 CAACAGAAGC )'.*.+2,)) RG:Z:0
+C 99 chr2 1 255 10M = 26 35 CAACAGAAGC )'.*.+2,)) RG:Z:0
+C 147 chr2 26 255 10M = 1 -35 CAACAGAAGC )'.*.+2,)) RG:Z:0
+D 99 chr3 1 255 10M = 25 35 CAACAGAAGC )'.*.+2,)) RG:Z:0
+D 147 chr3 26 255 10M = 1 -35 CAACAGAAGC )'.*.+2,)) RG:Z:0
+E 99 chr1 2 255 10M = 15 30 CAACAGAAGC )'.*.+2,)) RG:Z:0
+E 147 chr1 15 255 10M = 2 -30 CAACAGAAGC )'.*.+2,)) RG:Z:0