overloading BlockCompressedInputStream.checkTerminator to support NIO #890

Merged
merged 2 commits into from Jun 8, 2017
Jump to file or symbol
Failed to load files and symbols.
+155 −34
Split
@@ -32,16 +32,15 @@
import htsjdk.samtools.seekablestream.SeekableStream;
import htsjdk.samtools.util.zip.InflaterFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.util.Arrays;
-import java.util.zip.Inflater;
/**
* Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream.
@@ -587,41 +586,98 @@ private int unpackInt32(final byte[] buffer, final int offset) {
public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE}
+ /**
+ *
+ * @param file the file to check
+ * @return status of the last compressed block
+ * @throws IOException
+ */
public static FileTermination checkTermination(final File file) throws IOException {
- final long fileSize = file.length();
+ return checkTermination(file == null ? null : file.toPath());
@pshapiro4broad

pshapiro4broad Jun 8, 2017

Won't this cause checkTermination(Path) to throw when file == null? If so avoiding the NPE here doesn't seem worthwhile. Maybe you want to return FileTermination.DEFECTIVE in that case?

@lbergelson

lbergelson Jun 8, 2017

Contributor

@pshapiro4broad It absolutely just shifts the npr to later. I wanted to just keep the exact behavior of the original function, which was to NPR somewhere later. It does probably makes sense to check for null and throw a nice exception instead of an NPR. Returning defective would be a bad idea for a null file I think since it would hide other errors.

+ }
+
+ /**
+ *
+ * @param path to the file to check
+ * @return status of the last compressed block
+ * @throws IOException
+ */
+ public static FileTermination checkTermination(final Path path) throws IOException {
+ try( final SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ) ){
+ return checkTermination(channel);
+ }
+ }
+
+ /**
+ * check the status of the final bzgipped block for the given bgzipped resource
+ *
+ * @param channel an open channel to read from,
+ * the channel will remain open and the initial position will be restored when the operation completes
+ * this makes no guarantee about the state of the channel if an exception is thrown during reading
+ *
+ * @return the status of the last compressed black
+ * @throws IOException
+ */
+ public static FileTermination checkTermination(SeekableByteChannel channel) throws IOException {
+ final long fileSize = channel.size();
if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) {
return FileTermination.DEFECTIVE;
}
- final RandomAccessFile raFile = new RandomAccessFile(file, "r");
+ final long initialPosition = channel.position();
+ boolean exceptionThrown = false;
try {
- raFile.seek(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
- byte[] buf = new byte[BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length];
- raFile.readFully(buf);
- if (Arrays.equals(buf, BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) {
+ channel.position(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
+
+ //Check if the end of the file is an empty gzip block which is used as the terminator for a bgzipped file
+ final ByteBuffer lastBlockBuffer = ByteBuffer.allocate(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
+ readFully(channel, lastBlockBuffer);
+ if (Arrays.equals(lastBlockBuffer.array(), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) {
return FileTermination.HAS_TERMINATOR_BLOCK;
}
- final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
- buf = new byte[bufsize];
- raFile.seek(fileSize - bufsize);
- raFile.read(buf);
- for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
- i >= 0; --i) {
+
+ //if the last block isn't an empty gzip block, check to see if it is a healthy compressed block or if it's corrupted
+ final int bufsize = (int) Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
+ final byte[] bufferArray = new byte[bufsize];
+ channel.position(fileSize - bufsize);
+ readFully(channel, ByteBuffer.wrap(bufferArray));
+ for (int i = bufferArray.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
+ i >= 0; --i) {
if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE,
- buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) {
+ bufferArray, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) {
continue;
}
- final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4);
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(bufferArray,
+ i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length,
+ 4);
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
- final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF;
- if (buf.length - i == totalBlockSizeMinusOne + 1) {
+ final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF;
+ if (bufferArray.length - i == totalBlockSizeMinusOne + 1) {
return FileTermination.HAS_HEALTHY_LAST_BLOCK;
} else {
return FileTermination.DEFECTIVE;
}
}
return FileTermination.DEFECTIVE;
+ } catch (final Throwable e) {
+ exceptionThrown = true;
+ throw e;
} finally {
- raFile.close();
+ //if an exception was thrown we don't want to reset the position because that would be likely to throw again
+ //and suppress the initial exception
+ if(!exceptionThrown) {
@pshapiro4broad

pshapiro4broad Jun 8, 2017

This code is not consistent with its use of spaces around if elements. Here there's no space between if and (. On line 679 there is a space there, but not after the ). On line 654 there are spaces both before and after.

The code should be consistent; putting a space before and after the ()s is the java standard and is what most of the existing code does, so I would recommend formatting it that way.

The try on line 606 is also formatted inconsistently with respect to the surrounding code.

@lbergelson

lbergelson Jun 8, 2017

Contributor

That's a good point. I'm not sure it's worth opening another pr over though since I've already merged this one.

@pshapiro4broad

pshapiro4broad Jun 8, 2017

no worries, something to keep in mind for next time :)

+ channel.position(initialPosition);
+ }
+ }
+ }
+
+ /**
+ * read as many bytes as dst's capacity into dst or throw if that's not possible
+ * @throws EOFException if channel has fewer bytes available than dst's capacity
+ */
+ static void readFully(SeekableByteChannel channel, ByteBuffer dst) throws IOException {
+ final int bytesRead = channel.read(dst);
+ if (bytesRead < dst.capacity()){
+ throw new EOFException();
}
}
@@ -23,38 +23,103 @@
*/
package htsjdk.samtools.util;
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
import htsjdk.HtsjdkTest;
+import htsjdk.samtools.SeekableByteChannelFromBuffer;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.EOFException;
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
/**
* @author alecw@broadinstitute.org
*/
public class BlockCompressedTerminatorTest extends HtsjdkTest {
private static final File TEST_DATA_DIR = new File("src/test/resources/htsjdk/samtools/util");
+ private static final File DEFECTIVE = new File(TEST_DATA_DIR, "defective_bgzf.bam");
+ private static final File NO_TERMINATOR = new File(TEST_DATA_DIR, "no_bgzf_terminator.bam");
- @Test
- public void testFileWithTerminator() throws Exception {
+ @DataProvider
+ public Object[][] getFiles() throws IOException {
+ return new Object[][]{
+ {getValidCompressedFile(), BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK},
+ {NO_TERMINATOR, BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK},
+ {DEFECTIVE, BlockCompressedInputStream.FileTermination.DEFECTIVE}
+ };
+ }
+
+ @Test( dataProvider = "getFiles")
+ public void testCheckTerminationForFiles(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
+ Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFile), expected);
+ }
+
+ @Test( dataProvider = "getFiles")
+ public void testCheckTerminationForPaths(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
+ try(FileSystem fs = Jimfs.newFileSystem("test", Configuration.unix())){
+ final Path compressedFileInJimfs = Files.copy(compressedFile.toPath(), fs.getPath("something"));
+ Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFileInJimfs), expected);
+ }
+ }
+
+ @Test( dataProvider = "getFiles")
+ public void testCheckTerminationForSeekableByteChannels(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
+ try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){
+ Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected);
+ }
+ }
+
+ @Test(dataProvider = "getFiles")
+ public void testChannelPositionIsRestored(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
+ final long position = 50;
+ try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){
+ channel.position(position);
+ Assert.assertEquals(channel.position(), position);
+ Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected);
+ Assert.assertEquals(channel.position(), position);
+ }
+ }
+
+ private static File getValidCompressedFile() throws IOException {
final File tmpCompressedFile = File.createTempFile("test.", ".bgzf");
tmpCompressedFile.deleteOnExit();
final BlockCompressedOutputStream os = new BlockCompressedOutputStream(tmpCompressedFile);
os.write("Hi, Mom!\n".getBytes());
os.close();
- Assert.assertEquals(BlockCompressedInputStream.checkTermination(tmpCompressedFile),
- BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK);
+ return tmpCompressedFile;
}
@Test
- public void testValidFileWithoutTerminator() throws Exception {
- Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "no_bgzf_terminator.bam")),
- BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK);
+ public void testReadFullyReadsBytesCorrectly() throws IOException {
+ try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ Assert.assertTrue(channel.size() > readBuffer.capacity());
+ BlockCompressedInputStream.readFully(channel, readBuffer);
+
+ ByteBuffer expected = ByteBuffer.allocate(10);
+ channel.position(0).read(expected);
+ Assert.assertEquals(readBuffer.array(), expected.array());
+ }
}
- @Test
- public void testDefectiveFile() throws Exception {
- Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "defective_bgzf.bam")),
- BlockCompressedInputStream.FileTermination.DEFECTIVE);
+ @Test(expectedExceptions = EOFException.class)
+ public void testReadFullyThrowWhenItCantReadEnough() throws IOException {
+ try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){
+ final ByteBuffer readBuffer = ByteBuffer.allocate(1000);
+ Assert.assertTrue(channel.size() < readBuffer.capacity());
+ BlockCompressedInputStream.readFully(channel, readBuffer);
+ }
}
+
+
+
}