Permalink
Browse files

Alignment compression (#426)

This allows to decrease output file size (vdjca and clna) 4-fold.

Implements block-compression of alignments with LZ4 algorithm.

Uses parallel compression to keep up with massive output from aligner in highly multithreaded environments. 1 compression thread per 8 processing threads in align action.

Fast compression implemented, but not connected to any input options. Also requires additional testing.
  • Loading branch information...
dbolotin committed Sep 28, 2018
1 parent 1021b78 commit afab7a697641adf214b85edc248c7fe687c302e6
View
12 pom.xml
@@ -74,6 +74,12 @@
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
@@ -241,6 +247,12 @@
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.lz4:lz4-java</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>log4j:log4j</artifact>
<includes>
@@ -32,13 +32,10 @@
import cc.redberry.pipe.OutputPortCloseable;
import com.milaboratory.mixcr.basictypes.VDJCAlignments;
import com.milaboratory.mixcr.basictypes.VDJCAlignmentsReader;
import com.milaboratory.util.Factory;
import io.repseq.core.VDJCLibraryRegistry;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
public interface AlignmentsProvider {
OutputPortCloseable<VDJCAlignments> create();
@@ -52,48 +49,19 @@
final class Util {
static AlignmentsProvider createProvider(final byte[] rawData, final VDJCLibraryRegistry geneResolver) {
return new VDJCAlignmentsReaderWrapper(new Factory<VDJCAlignmentsReader>() {
@Override
public VDJCAlignmentsReader create() {
return new VDJCAlignmentsReader(new ByteArrayInputStream(rawData), geneResolver);
}
});
return new VDJCAlignmentsReaderWrapper(() ->
new VDJCAlignmentsReader(new ByteArrayInputStream(rawData), geneResolver, rawData.length, true)
);
}
public static AlignmentsProvider createProvider(final String file, final VDJCLibraryRegistry geneResolver) {
return new VDJCAlignmentsReaderWrapper(new Factory<VDJCAlignmentsReader>() {
@Override
public VDJCAlignmentsReader create() {
try {
return new VDJCAlignmentsReader(file, geneResolver);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
public static AlignmentsProvider createProvider(final File file, final VDJCLibraryRegistry geneResolver) {
return new VDJCAlignmentsReaderWrapper(new Factory<VDJCAlignmentsReader>() {
@Override
public VDJCAlignmentsReader create() {
try {
return new VDJCAlignmentsReader(file, geneResolver);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new VDJCAlignmentsReaderWrapper(() -> {
try {
return new VDJCAlignmentsReader(file, geneResolver, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
public static AlignmentsProvider createProvider(final InputStream file, final VDJCLibraryRegistry geneResolver) {
return new VDJCAlignmentsReaderWrapper(new Factory<VDJCAlignmentsReader>() {
@Override
public VDJCAlignmentsReader create() {
return new VDJCAlignmentsReader(file, geneResolver);
}
});
}
}
}
@@ -34,6 +34,7 @@
import cc.redberry.pipe.blocks.FilteringPort;
import com.milaboratory.mixcr.basictypes.CloneSet;
import com.milaboratory.mixcr.basictypes.VDJCAlignments;
import com.milaboratory.mixcr.basictypes.VDJCAlignmentsReader;
import com.milaboratory.mixcr.vdjaligners.VDJCAlignerParameters;
import com.milaboratory.util.CanReportProgress;
import com.milaboratory.util.CanReportProgressAndStage;
@@ -44,6 +45,7 @@
final int threads;
volatile String stage = "Initialization";
volatile CanReportProgress innerProgress;
volatile VDJCAlignmentsReader alignmentReader = null;
volatile boolean isFinished = false;
public CloneAssemblerRunner(AlignmentsProvider alignmentsProvider, CloneAssembler assembler, int threads) {
@@ -72,6 +74,8 @@ public boolean isFinished() {
public void run() {
//run initial assembler
try (OutputPortCloseable<VDJCAlignments> alignmentsPort = alignmentsProvider.create()) {
if (alignmentsPort instanceof VDJCAlignmentsReaderWrapper.OP)
alignmentReader = ((VDJCAlignmentsReaderWrapper.OP) alignmentsPort).reader;
synchronized (this) {
stage = "Assembling initial clonotypes";
if (alignmentsPort instanceof CanReportProgress)
@@ -82,6 +86,7 @@ public void run() {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
alignmentReader = null;
}
// run mapping if required
if (assembler.beginMapping()) {
@@ -90,6 +95,8 @@ public void run() {
innerProgress = null;
}
try (OutputPortCloseable<VDJCAlignments> alignmentsPort = alignmentsProvider.create()) {
if (alignmentsPort instanceof VDJCAlignmentsReaderWrapper.OP)
alignmentReader = ((VDJCAlignmentsReaderWrapper.OP) alignmentsPort).reader;
synchronized (this) {
stage = "Mapping low quality reads";
if (alignmentsPort instanceof CanReportProgress)
@@ -104,6 +111,7 @@ public void run() {
throw new RuntimeException(e);
}
}
alignmentReader = null;
assembler.endMapping();
}
assembler.preClustering();
@@ -124,6 +132,12 @@ public void run() {
isFinished = true;
}
public int getQueueSize() {
if (alignmentReader == null)
return -1;
return alignmentReader.getQueueSize();
}
public CloneSet getCloneSet(VDJCAlignerParameters alignerParameters) {
return assembler.getCloneSet(alignerParameters);
}
@@ -55,8 +55,8 @@ public long getTotalNumberOfReads() {
return totalNumberOfReads.get();
}
private class OP implements OutputPortCloseable<VDJCAlignments>, CanReportProgress {
final VDJCAlignmentsReader reader;
public class OP implements OutputPortCloseable<VDJCAlignments>, CanReportProgress {
public final VDJCAlignmentsReader reader;
private OP(VDJCAlignmentsReader reader) {
this.reader = reader;
Oops, something went wrong.

0 comments on commit afab7a6

Please sign in to comment.