Skip to content

Commit

Permalink
analysis: Implement VariantStats OpenCGA Analysis #1376
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Oct 15, 2019
1 parent 8d75391 commit ada07e1
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 0 deletions.
@@ -0,0 +1,151 @@
package org.opencb.opencga.analysis.variant.stats;

import org.apache.commons.lang3.StringUtils;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.OpenCgaAnalysis;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.core.models.Cohort;
import org.opencb.opencga.core.models.Sample;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.oskar.analysis.exceptions.AnalysisException;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysis;
import org.opencb.oskar.core.annotations.Analysis;

import java.util.List;
import java.util.stream.Collectors;

@Analysis(id = VariantStatsAnalysis.ID, data = Analysis.AnalysisData.VARIANT,
description = "Compute variant stats for any cohort and any set of variants.")
public class VariantStatsOpenCgaAnalysis extends OpenCgaAnalysis {

private String study;
private String cohortName;
private Query samplesQuery;
private Query variantsQuery;
private List<String> sampleNames;

public VariantStatsOpenCgaAnalysis() {
}

/**
* Study of the samples.
* @param study Study id
* @return this
*/
public VariantStatsOpenCgaAnalysis setStudy(String study) {
this.study = study;
return this;
}

/**
* Samples query selecting samples of the cohort.
* Optional if provided {@link #cohortName}.
*
* @param samplesQuery sample query
* @return this
*/
public VariantStatsOpenCgaAnalysis setSamplesQuery(Query samplesQuery) {
this.samplesQuery = samplesQuery;
return this;
}

/**
* Name of the cohort.
* Optional if provided {@link #samplesQuery}.
* When used without {@link #samplesQuery}, the cohort must be defined in catalog.
* It's samples will be used to calculate the variant stats.
* When used together with {@link #samplesQuery}, this name will be just an alias to be used in the output file.
*
* @param cohortName cohort name
* @return this
*/
public VariantStatsOpenCgaAnalysis setCohortName(String cohortName) {
this.cohortName = cohortName;
return this;
}

/**
* Variants query. If not provided, all variants from the study will be used.
* @param variantsQuery variants query.
* @return this
*/
public VariantStatsOpenCgaAnalysis setVariantsQuery(Query variantsQuery) {
this.variantsQuery = variantsQuery;
return this;
}

@Override
protected void check() throws AnalysisException {
super.check();
setUpStorageEngineExecutor(study);

if ((samplesQuery == null || samplesQuery.isEmpty()) && StringUtils.isEmpty(cohortName)) {
throw new AnalysisException("Unspecified cohort or list of samples");
}
if (StringUtils.isEmpty(cohortName)) {
cohortName = "COHORT";
}
try {
study = catalogManager.getStudyManager().get(study, new QueryOptions(QueryOptions.INCLUDE, "fqn"), sessionId).first().getFqn();
} catch (CatalogException e) {
throw new AnalysisException(e);
}

try {
List<Sample> samples;
if (samplesQuery == null || samplesQuery.isEmpty()) {
Cohort cohort = catalogManager.getCohortManager()
.get(study, cohortName, new QueryOptions(), sessionId).first();
samples = cohort.getSamples();
arm.updateResult(analysisResult -> analysisResult.getAttributes().put("cohortName", cohortName));
} else {
samples = catalogManager.getSampleManager()
.get(study, new Query(samplesQuery), new QueryOptions(QueryOptions.INCLUDE, "id"), sessionId).getResult();
}
sampleNames = samples.stream().map(Sample::getId).collect(Collectors.toList());
arm.updateResult(analysisResult -> analysisResult.getAttributes().put("sampleNames", sampleNames));
} catch (CatalogException e) {
throw new AnalysisException(e);
}

if (sampleNames.size() <= 1) {
throw new AnalysisException("Unable to compute variant stats with cohort of size " + sampleNames.size());
}

if (variantsQuery == null) {
variantsQuery = new Query();
}
variantsQuery.putIfAbsent(VariantQueryParam.STUDY.key(), study);

arm.updateResult(r -> r.getAttributes().append("variantsQuery", variantsQuery));

// check read permission
try {

variantStorageManager.checkQueryPermissions(
new Query(variantsQuery)
.append(VariantQueryParam.STUDY.key(), study)
.append(VariantQueryParam.INCLUDE_SAMPLE.key(), sampleNames),
new QueryOptions(),
sessionId);
} catch (CatalogException | StorageEngineException e) {
throw new AnalysisException(e);
}

}

@Override
protected void exec() throws AnalysisException {
VariantStatsAnalysis analysis = new VariantStatsAnalysis(executorParams, outDir);
analysis.setUp(executorParams, outDir, sourceTypes, availableFrameworks);
analysis.setVariantsQuery(variantsQuery)
.setStudy(study)
.setCohort(cohortName)
.setSamples(sampleNames);

analysis.execute(arm);
}

}
@@ -0,0 +1,79 @@
package org.opencb.opencga.analysis.variant.stats;

import org.apache.commons.io.FileUtils;
import org.opencb.biodata.models.variant.StudyEntry;
import org.opencb.biodata.models.variant.Variant;
import org.opencb.biodata.models.variant.stats.VariantStats;
import org.opencb.biodata.tools.variant.stats.VariantStatsCalculator;
import org.opencb.biodata.tools.variant.stats.writer.VariantStatsTsvExporter;
import org.opencb.commons.ProgressLogger;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.run.ParallelTaskRunner;
import org.opencb.commons.run.Task;
import org.opencb.opencga.analysis.OpenCgaAnalysisExecutor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.manager.variant.VariantStorageManager;
import org.opencb.opencga.storage.core.variant.adaptors.VariantField;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.io.db.VariantDBReader;
import org.opencb.oskar.analysis.exceptions.AnalysisException;
import org.opencb.oskar.analysis.exceptions.AnalysisExecutorException;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysis;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysisExecutor;
import org.opencb.oskar.core.annotations.AnalysisExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

@AnalysisExecutor(id="opencga-local", analysis= VariantStatsAnalysis.ID,
framework = AnalysisExecutor.Framework.ITERATOR,
source = AnalysisExecutor.Source.OPENCGA)
public class VariantStatsOpenCgaAnalysisExecutor extends VariantStatsAnalysisExecutor implements OpenCgaAnalysisExecutor {

private final Logger logger = LoggerFactory.getLogger(VariantStatsOpenCgaAnalysisExecutor.class);

@Override
public void exec() throws AnalysisException {

VariantStorageManager manager = getVariantStorageManager();
Query query = new Query(getVariantsQuery())
.append(VariantQueryParam.STUDY.key(), getStudy())
.append(VariantQueryParam.INCLUDE_SAMPLE.key(), getSamples());

QueryOptions queryOptions = new QueryOptions(QueryOptions.EXCLUDE, Arrays.asList(VariantField.STUDIES_STATS));

try (OutputStream os = new BufferedOutputStream(FileUtils.openOutputStream(getOutputFile().toFile()))) {
VariantDBReader reader = new VariantDBReader(manager.iterator(query, queryOptions, getSessionId()));
String cohort = getCohort();

ProgressLogger progressLogger = new ProgressLogger("Variants processed:");
Task<Variant, Variant> task = Task.forEach(variant -> {
StudyEntry study = variant.getStudies().get(0);
VariantStats stats = VariantStatsCalculator.calculate(variant, study);
study.setStats(Collections.singletonMap(cohort, stats));
progressLogger.increment(1);
return variant;
});

VariantStatsTsvExporter writer = new VariantStatsTsvExporter(os, getStudy(), Collections.singletonList(cohort));

ParallelTaskRunner.Config config = ParallelTaskRunner.Config.builder().build();
ParallelTaskRunner<Variant, Variant> ptr = new ParallelTaskRunner<>(reader, task, writer, config);

ptr.run();

arm.updateResult(analysisResult ->
analysisResult.getAttributes().put("numVariantStats", writer.getWrittenVariants()));
} catch (ExecutionException | IOException | CatalogException | StorageEngineException e) {
throw new AnalysisExecutorException(e);
}
}
}

0 comments on commit ada07e1

Please sign in to comment.