Skip to content

Commit

Permalink
analysis: Implement VariantStatsMongoDBLocalAnalysisExecutor. #1376
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Oct 16, 2019
1 parent b4674f3 commit 61df855
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 1 deletion.
@@ -0,0 +1,36 @@
package org.opencb.opencga.storage.mongodb.variant.analysis;

import org.apache.commons.lang3.StringUtils;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.storage.core.StorageEngineFactory;
import org.opencb.opencga.storage.mongodb.variant.MongoDBVariantStorageEngine;
import org.opencb.oskar.analysis.exceptions.AnalysisExecutorException;

/**
* Helper interface to be used by opencga mongodb analysis executors.
*/
public interface MongoDBAnalysisExecutor {

ObjectMap getExecutorParams();

default MongoDBVariantStorageEngine getMongoDBVariantStorageEngine() throws AnalysisExecutorException {
ObjectMap executorParams = getExecutorParams();
String storageEngine = executorParams.getString("storageEngineId");
if (StringUtils.isEmpty(storageEngine)) {
throw new AnalysisExecutorException("Missing arguments!");
} else {
if (!storageEngine.equals(MongoDBVariantStorageEngine.STORAGE_ENGINE_ID)) {
throw new AnalysisExecutorException("Unable to use executor '" + getClass() + "' with storageEngine " + storageEngine);
}
String dbName = executorParams.getString("dbName");
try {
return (MongoDBVariantStorageEngine) StorageEngineFactory.get()
.getVariantStorageEngine(MongoDBVariantStorageEngine.STORAGE_ENGINE_ID, dbName);
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new AnalysisExecutorException(e);
}

}

}
}
@@ -0,0 +1,109 @@
package org.opencb.opencga.storage.mongodb.variant.analysis.stats;

import com.mongodb.client.MongoCursor;
import org.apache.commons.io.FileUtils;
import org.bson.Document;
import org.opencb.biodata.models.variant.StudyEntry;
import org.opencb.biodata.models.variant.Variant;
import org.opencb.biodata.tools.variant.stats.writer.VariantStatsTsvExporter;
import org.opencb.commons.ProgressLogger;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.io.DataReader;
import org.opencb.commons.run.ParallelTaskRunner;
import org.opencb.commons.run.Task;
import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager;
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.variant.adaptors.VariantField;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryException;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.stats.VariantStatsWrapper;
import org.opencb.opencga.storage.mongodb.variant.MongoDBVariantStorageEngine;
import org.opencb.opencga.storage.mongodb.variant.analysis.MongoDBAnalysisExecutor;
import org.opencb.opencga.storage.mongodb.variant.stats.MongoDBVariantStatsCalculator;
import org.opencb.oskar.analysis.exceptions.AnalysisException;
import org.opencb.oskar.analysis.exceptions.AnalysisExecutorException;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysis;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysisExecutor;
import org.opencb.oskar.core.annotations.AnalysisExecutor;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

@AnalysisExecutor(id="mongodb-local", analysis= VariantStatsAnalysis.ID,
framework = AnalysisExecutor.Framework.ITERATOR,
source = AnalysisExecutor.Source.MONGODB)
public class VariantStatsMongoDBLocalAnalysisExecutor extends VariantStatsAnalysisExecutor implements MongoDBAnalysisExecutor {
@Override
public void exec() throws AnalysisException {
MongoDBVariantStorageEngine engine = getMongoDBVariantStorageEngine();

Query query = new Query(getVariantsQuery())
.append(VariantQueryParam.STUDY.key(), getStudy())
.append(VariantQueryParam.INCLUDE_SAMPLE.key(), getSamples());

QueryOptions queryOptions = new QueryOptions(QueryOptions.EXCLUDE,
Arrays.asList(VariantField.STUDIES_STATS, VariantField.ANNOTATION));
VariantStorageMetadataManager metadataManager = engine.getMetadataManager();
StudyMetadata studyMetadata = metadataManager.getStudyMetadata(getStudy());
List<Integer> sampleIds = new ArrayList<>(getSamples().size());
for (String sample : getSamples()) {
Integer sampleId = metadataManager.getSampleId(studyMetadata.getId(), sample);
if (sampleId == null) {
throw VariantQueryException.sampleNotFound(sample, getStudy());
}
sampleIds.add(sampleId);
}

String cohort = getCohort();

try (MongoCursor<Document> cursor = engine.getDBAdaptor().nativeIterator(query, queryOptions, true);
OutputStream os = new BufferedOutputStream(FileUtils.openOutputStream(getOutputFile().toFile()))) {
// reader
DataReader<Document> reader = i -> {
List<Document> documents = new ArrayList<>(i);
while (cursor.hasNext() && i-- > 0) {
documents.add(cursor.next());
}
return documents;
};

List<CohortMetadata> cohorts = Collections.singletonList(new CohortMetadata(studyMetadata.getId(), -1, cohort, sampleIds));
MongoDBVariantStatsCalculator calculator = new MongoDBVariantStatsCalculator(studyMetadata, cohorts, "0/0");

ProgressLogger progressLogger = new ProgressLogger("Variants processed:");
Task<Document, Variant> task = calculator.then((Task<VariantStatsWrapper, Variant>) batch -> {
progressLogger.increment(batch.size());
List<Variant> variants = new ArrayList<>(batch.size());
for (VariantStatsWrapper s : batch) {
Variant variant = s.toVariant();
StudyEntry studyEntry = new StudyEntry(getStudy());
studyEntry.setStats(s.getCohortStats());
variant.setStudies(Collections.singletonList(studyEntry));
variants.add(variant);
}
return variants;
});


VariantStatsTsvExporter writer = new VariantStatsTsvExporter(os, getStudy(), Collections.singletonList(cohort));

ParallelTaskRunner.Config config = ParallelTaskRunner.Config.builder().build();
ParallelTaskRunner<Document, Variant> ptr = new ParallelTaskRunner<>(reader, task, writer, config);

ptr.run();

arm.updateResult(analysisResult ->
analysisResult.getAttributes().put("numVariantStats", writer.getWrittenVariants()));
} catch (ExecutionException | IOException e) {
throw new AnalysisExecutorException(e);
}
}
}
Expand Up @@ -120,6 +120,6 @@ public VariantStatsWrapper calculateStats(Variant variant, Document study) {


private void addGt(Map<String, Integer> gtStrCount, String gt, int num) {
gtStrCount.compute(gt, (key, value) -> value == null ? num : value + num);
gtStrCount.merge(gt, num, Integer::sum);
}
}

0 comments on commit 61df855

Please sign in to comment.