Permalink
Browse files

Fullseq fixes and updates (#339)

* This fixes misbehaving getFeature method of Clone object.

* Number of threads in assembleContigs reduced to 1, added exception for higher value.

* Minor corrections.

* Seed script for tests on big datasets.

* Test data download integrated into travis pipeline.
  • Loading branch information...
dbolotin committed Feb 7, 2018
1 parent f2f137e commit d265cecaad9cdc7c453657ef15aa9b70e3081fa1
@@ -7,3 +7,4 @@ doc/_build
.flooignore
out
test_target
src/test/resources/sequences/big
@@ -13,6 +13,7 @@ before_install:
- cd repseqio/milib && mvn clean install -DskipTests -B && cd ..
- mvn clean install -DskipTests -B && cd ..
- mv ./repseqio/.cache .cache
- ./ensure-test-data.sh
script:
- mvn test -B && ./itests.sh test
@@ -28,3 +29,4 @@ cache:
directories:
- $HOME/.m2
- .cache
- src/test/resources/sequences/big
@@ -0,0 +1,58 @@
#!/bin/bash
set -e
set -o pipefail
# Linux readlink -f alternative for Mac OS X
function readlinkUniversal() {
targetFile=$1
cd `dirname $targetFile`
targetFile=`basename $targetFile`
# iterate down a (possible) chain of symlinks
while [ -L "$targetFile" ]
do
targetFile=`readlink $targetFile`
cd `dirname $targetFile`
targetFile=`basename $targetFile`
done
# compute the canonicalized name by finding the physical path
# for the directory we're in and appending the target file.
phys_dir=`pwd -P`
result=$phys_dir/$targetFile
echo $result
}
os=`uname`
dir=""
case $os in
Darwin)
dir=$(dirname "$(readlinkUniversal "$0")")
;;
Linux)
dir="$(dirname "$(readlink -f "$0")")"
;;
FreeBSD)
dir=$(dirname "$(readlinkUniversal "$0")")
;;
*)
echo "Unknown OS."
exit 1
;;
esac
mkdir -p $dir/src/test/resources/sequences/big/
cd $dir/src/test/resources/sequences/big/
if [[ ! -f CD4M1_test_R1.fastq.gz ]]; then
wget https://s3.amazonaws.com/files.milaboratory.com/test-data/CD4M1_test_R1.fastq.gz
fi
if [[ ! -f CD4M1_test_R2.fastq.gz ]]; then
wget https://s3.amazonaws.com/files.milaboratory.com/test-data/CD4M1_test_R2.fastq.gz
fi
@@ -55,7 +55,7 @@
final int jOffset;
/** end of alignment of V gene in the global coordinate grid */
final int rightAssemblingFeatureBound;
/** splitting rehion in global coordinates */
/** splitting region in global coordinates */
final Range splitRegion;
/** parameters */
FullSeqAssemblerParameters parameters;
@@ -192,13 +192,19 @@ public FullSeqAssemblerReport getReport() {
if (report != null)
report.onVariantsCreated(branches);
// VariantBranch[] branchesBeforeClustering = branches.stream().map(VariantBranch::clone).toArray(VariantBranch[]::new);
clusterizeBranches(data.points, branches);
Clone[] result = branches.stream()
.map(branch -> buildClone(branch.count, assembleBranchSequences(data.points, branch)))
.toArray(Clone[]::new);
assert result.length >= 1;
if (report != null)
report.afterVariantsClustered(clone, result);
return result;
}
@@ -273,6 +279,10 @@ VariantBranch addVariant(Variant variant, int sumSignificant) {
newStates[newStates.length - 1] = variant.variantInfo;
return new VariantBranch(count * variant.nSignificant / sumSignificant, newStates, variant.reads);
}
protected VariantBranch clone() {
return new VariantBranch(count, pointStates, reads);
}
}
BranchSequences assembleBranchSequences(int[] points, VariantBranch branch) {
@@ -829,6 +839,8 @@ else if (point.sequence.size() == 1)
}
}
assert nAlignments > 0;
long[] forSort = new long[coverage.size()];
TIntIntIterator iterator = coverage.iterator();
int i = 0;
@@ -48,6 +48,7 @@
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -64,6 +65,9 @@
// Index data
final long firstClonePosition;
// Index contain two additional records:
// - first = position of alignment block with cloneIndex == -1
// - last = position of the last alignments block end
final long[] index;
final long[] counts;
final long totalAlignmentsCount;
@@ -165,10 +169,7 @@ public VDJCAlignerParameters getAlignerParameters() {
* Returns number of clones in the file
*/
public int numberOfClones() {
// Index contain two additional records:
// - first = position of alignment block with cloneIndex == -1
// - last = position of the last alignments block end
return index.length - 2;
return numberOfClones;
}
public List<VDJCGene> getGenes() {
@@ -276,15 +277,16 @@ public CloneAlignmentsPort clonesAndAlignments() throws IOException {
public final class CloneAlignmentsPort
implements OutputPort<CloneAlignments>, CanReportProgress {
private int cloneIndex = 0;
final PipeDataInputReader<Clone> clones;
boolean isFinished = false;
AtomicLong processedAlignments = new AtomicLong();
private final AtomicInteger cloneIndex = new AtomicInteger();
private final AtomicLong processedAlignments = new AtomicLong();
private final PipeDataInputReader<Clone> clones;
volatile boolean isFinished = false;
CloneAlignmentsPort() throws IOException {
PrimitivI input = new PrimitivI(new InputDataStream(firstClonePosition, index[0]));
IOUtil.registerGeneReferences(input, genes, alignedFeatures);
this.clones = new PipeDataInputReader<>(Clone.class, input, numberOfClones());
this.clones = new PipeDataInputReader<>(Clone.class, input, numberOfClones);
}
@Override
@@ -294,9 +296,8 @@ public CloneAlignments take() {
isFinished = true;
return null;
}
CloneAlignments result = new CloneAlignments(clone, cloneIndex);
CloneAlignments result = new CloneAlignments(clone, cloneIndex.getAndIncrement());
processedAlignments.addAndGet(result.alignmentsCount);
++cloneIndex;
return result;
}
@@ -88,14 +88,6 @@ public NSequenceWithQuality getConcatenatedClonalSequence() {
return SequencesUtils.concatenate(seqs);
}
@Override
public NSequenceWithQuality getFeature(GeneFeature geneFeature) {
for (int i = 0; i < assemblingFeatures.length; ++i)
if (geneFeature.equals(assemblingFeatures[i]))
return targets[i];
return super.getFeature(geneFeature);
}
public int getId() {
return id;
}
@@ -105,7 +105,7 @@ private static String join(String[] strs) {
public void writeSuperReport(ReportHelper helper) {
if (!helper.isStdout())
helper.writeField("Analysis Date", getDate())
helper.writeField("Analysis date", getDate())
.writeField("Input file(s)", join(getInputFiles()))
.writeField("Output file(s)", join(getOutputFiles()))
.writeField("Version", getVersion());
@@ -36,6 +36,10 @@
@Override
public void go(ActionHelper helper) throws Exception {
//TODO FIX!!!!!!!!!!!!!
if (parameters.threads > 1)
throw new ParameterException("Multithreaded processing is not supported yet.");
long beginTimestamp = System.currentTimeMillis();
FullSeqAssemblerParameters p = FullSeqAssemblerParameters.getByName("default");
if (!parameters.overrides.isEmpty()) {
@@ -57,12 +61,13 @@ public void go(ActionHelper helper) throws Exception {
alignerParameters = reader.getAlignerParameters();
genes = reader.getGenes();
assemblingFeatures = reader.getAssemblingFeatures();
IOUtil.registerGeneReferences(tmpOut, genes, alignerParameters);
ClnAReader.CloneAlignmentsPort port = reader.clonesAndAlignments();
SmartProgressReporter.startProgressReport("Assembling", port);
OutputPort<Clone[]> parallelProcessor = new ParallelProcessor<>(port, cloneAlignments -> {
assemblingFeatures = reader.getAssemblingFeatures();
ClnAReader.CloneAlignmentsPort cloneAlignmentsPort = reader.clonesAndAlignments();
SmartProgressReporter.startProgressReport("Assembling", cloneAlignmentsPort);
OutputPort<Clone[]> parallelProcessor = new ParallelProcessor<>(cloneAlignmentsPort, cloneAlignments -> {
FullSeqAssembler fullSeqAssembler = new FullSeqAssembler(assemblerParameters, cloneAlignments.clone, alignerParameters);
fullSeqAssembler.setReport(report);
@@ -81,8 +86,13 @@ public void go(ActionHelper helper) throws Exception {
for (Clone cl : clones)
tmpOut.writeObject(cl);
}
assert report.getInitialCloneCount() == reader.numberOfClones();
}
assert report.getFinalCloneCount() == totalClonesCount;
assert report.getFinalCloneCount() >= report.getInitialCloneCount();
Clone[] clones = new Clone[totalClonesCount];
try (PrimitivI tmpIn = new PrimitivI(new BufferedInputStream(new FileInputStream(parameters.getOutputFileName())))) {
IOUtil.registerGeneReferences(tmpIn, genes, alignerParameters);
@@ -137,7 +147,7 @@ public FullSeqParameters params() {
@Parameter(description = "Processing threads",
names = {"-t", "--threads"}, validateWith = PositiveInteger.class)
public int threads = 2;
public int threads = 1;
@DynamicParameter(names = "-O", description = "Overrides default parameter values.")
public Map<String, String> overrides = new HashMap<>();

0 comments on commit d265cec

Please sign in to comment.