From 09b075cb7849067be83bed95b7604310edac1269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Wed, 2 Mar 2022 09:52:38 +0000 Subject: [PATCH] storage: Add status to SampleIndexConfiguration #1901 --- .../manager/VariantStorageManager.java | 2 +- .../DefaultSampleIndexConfiguration.java | 3 +- .../VariantStorageMetadataManager.java | 20 +- .../core/metadata/models/SampleMetadata.java | 12 +- .../core/metadata/models/StudyMetadata.java | 51 ++-- .../VariantDBAdaptorMultiFileTest.java | 1 + .../variant/HadoopVariantStorageEngine.java | 2 +- .../mr/SampleIndexAnnotationLoaderDriver.java | 16 +- .../sample/SampleIndexAnnotationLoader.java | 241 ++---------------- .../index/sample/SampleIndexDBAdaptor.java | 46 +++- .../index/sample/SampleIndexDriver.java | 8 +- .../index/sample/SampleIndexLoader.java | 1 + .../sample/SampleIndexSchemaFactory.java | 57 ++++- .../variant/index/sample/SampleIndexTest.java | 57 ++++- 14 files changed, 225 insertions(+), 292 deletions(-) diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java index 048098e0948..6bf2ac7bce0 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java @@ -497,7 +497,7 @@ public OpenCGAResult configureSampleIndex(String studyStr, SampleIndexConfi return secureOperation("configure", studyStr, new ObjectMap(), token, engine -> { sampleIndexConfiguration.validate(); String studyFqn = getStudyFqn(studyStr, token); - engine.getMetadataManager().addSampleIndexConfiguration(studyFqn, sampleIndexConfiguration); + engine.getMetadataManager().addSampleIndexConfiguration(studyFqn, sampleIndexConfiguration, true); catalogManager.getStudyManager() .setVariantEngineConfigurationSampleIndex(studyStr, sampleIndexConfiguration, token); diff --git a/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_1_0/storage/DefaultSampleIndexConfiguration.java b/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_1_0/storage/DefaultSampleIndexConfiguration.java index 7295eed79b9..301b510266b 100644 --- a/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_1_0/storage/DefaultSampleIndexConfiguration.java +++ b/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_1_0/storage/DefaultSampleIndexConfiguration.java @@ -86,7 +86,8 @@ protected void run() throws Exception { if (CollectionUtils.isEmpty(studyMetadata.getSampleIndexConfigurations())) { studyMetadata.setSampleIndexConfigurations(new ArrayList<>()); studyMetadata.getSampleIndexConfigurations().add( - new StudyMetadata.SampleIndexConfigurationVersioned(sampleIndexConfiguration, 1, Date.from(Instant.now()))); + new StudyMetadata.SampleIndexConfigurationVersioned(sampleIndexConfiguration, 1, Date.from(Instant.now()), + StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE)); } else { int size = studyMetadata.getSampleIndexConfigurations().size(); studyMetadata.getSampleIndexConfigurations().get(size - 1).setConfiguration(sampleIndexConfiguration); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java index 78dba85c50c..b149c192a7a 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java @@ -208,23 +208,33 @@ public StudyMetadata createStudy(String studyName) throws StorageEngineException } public StudyMetadata.SampleIndexConfigurationVersioned addSampleIndexConfiguration( - String study, SampleIndexConfiguration configuration) throws StorageEngineException { + String study, SampleIndexConfiguration configuration, boolean staging) throws StorageEngineException { Integer idOrNull = getStudyIdOrNull(study); + int studyId; if (idOrNull == null) { - createStudy(study); + studyId = createStudy(study).getId(); + } else { + studyId = idOrNull; } - return updateStudyMetadata(study, studyMetadata -> { + StudyMetadata.SampleIndexConfigurationVersioned.Status status = staging + ? StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING + : StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE; + return updateStudyMetadata(studyId, studyMetadata -> { List configurations = studyMetadata.getSampleIndexConfigurations(); if (configurations == null || configurations.isEmpty()) { configurations = new ArrayList<>(2); configurations.add(new StudyMetadata.SampleIndexConfigurationVersioned( SampleIndexConfiguration.defaultConfiguration(), StudyMetadata.DEFAULT_SAMPLE_INDEX_VERSION, - Date.from(Instant.now()))); + Date.from(Instant.now()), StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE)); studyMetadata.setSampleIndexConfigurations(configurations); } int version = studyMetadata.getSampleIndexConfigurationLatest().getVersion() + 1; - configurations.add(new StudyMetadata.SampleIndexConfigurationVersioned(configuration, version, Date.from(Instant.now()))); + configurations.add(new StudyMetadata.SampleIndexConfigurationVersioned( + configuration, + version, + Date.from(Instant.now()), + status)); }).getSampleIndexConfigurationLatest(); } diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/SampleMetadata.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/SampleMetadata.java index 13672ff0b56..0199d194937 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/SampleMetadata.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/SampleMetadata.java @@ -175,11 +175,10 @@ private TaskMetadata.Status getSampleIndexAnnotationStatus() { } @JsonIgnore - public TaskMetadata.Status getSampleIndexAnnotationStatus(int latestSampleIndexVersion) { + public TaskMetadata.Status getSampleIndexAnnotationStatus(int sampleIndexVersion) { TaskMetadata.Status status = getSampleIndexAnnotationStatus(); if (status == TaskMetadata.Status.READY) { - int actualSampleIndexVersion = getSampleIndexAnnotationVersion(); - if (actualSampleIndexVersion != latestSampleIndexVersion) { + if (!getSampleIndexAnnotationVersions().contains(sampleIndexVersion)) { // logger.debug("Sample index annotation version outdated. Actual : " + actualSampleIndexVersion // + " , expected : " + latestSampleIndexVersion); status = TaskMetadata.Status.NONE; @@ -251,13 +250,10 @@ public boolean hasSampleIndexStatus() { } @JsonIgnore - public TaskMetadata.Status getSampleIndexStatus(int latestSampleIndexVersion) { + public TaskMetadata.Status getSampleIndexStatus(int sampleIndexVersion) { TaskMetadata.Status status = getSampleIndexStatus(); if (status == TaskMetadata.Status.READY) { - int actualSampleIndexVersion = getSampleIndexVersion(); - if (actualSampleIndexVersion != latestSampleIndexVersion) { -// logger.debug("Sample index version outdated. Actual : " + actualSampleIndexVersion -// + " , expected : " + latestSampleIndexVersion); + if (!getSampleIndexVersions().contains(sampleIndexVersion)) { status = TaskMetadata.Status.NONE; } } diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/StudyMetadata.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/StudyMetadata.java index 21fe29bba6e..2d8931e4398 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/StudyMetadata.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/models/StudyMetadata.java @@ -31,7 +31,7 @@ public class StudyMetadata { private Long timeStamp; private VariantFileHeader variantHeader; private List variantScores; - private List sampleIndexConfigurations; + private List sampleIndexConfigurations = Collections.emptyList(); private ObjectMap attributes; @@ -136,14 +136,20 @@ public StudyMetadata setVariantScores(List variantScores) } public SampleIndexConfigurationVersioned getSampleIndexConfigurationLatest() { + return getSampleIndexConfigurationLatest(true); + } + + public SampleIndexConfigurationVersioned getSampleIndexConfigurationLatest(boolean includeStagingSchemas) { if (sampleIndexConfigurations == null || sampleIndexConfigurations.isEmpty()) { return new SampleIndexConfigurationVersioned(SampleIndexConfiguration.defaultConfiguration(), - DEFAULT_SAMPLE_INDEX_VERSION, Date.from(Instant.now())); + DEFAULT_SAMPLE_INDEX_VERSION, Date.from(Instant.now()), SampleIndexConfigurationVersioned.Status.ACTIVE); } else { SampleIndexConfigurationVersioned conf = sampleIndexConfigurations.get(0); for (SampleIndexConfigurationVersioned thisConf : sampleIndexConfigurations) { - if (thisConf.getVersion() > conf.getVersion()) { - conf = thisConf; + if (includeStagingSchemas || thisConf.getStatus() != SampleIndexConfigurationVersioned.Status.STAGING) { + if (thisConf.getVersion() > conf.getVersion()) { + conf = thisConf; + } } } return conf; @@ -154,17 +160,18 @@ public List getSampleIndexConfigurations() { return sampleIndexConfigurations; } - public SampleIndexConfiguration getSampleIndexConfiguration(int version) { + public SampleIndexConfigurationVersioned getSampleIndexConfiguration(int version) { if (sampleIndexConfigurations == null || sampleIndexConfigurations.isEmpty()) { if (version == DEFAULT_SAMPLE_INDEX_VERSION) { - return SampleIndexConfiguration.defaultConfiguration(); + return new SampleIndexConfigurationVersioned(SampleIndexConfiguration.defaultConfiguration(), + DEFAULT_SAMPLE_INDEX_VERSION, Date.from(Instant.now()), SampleIndexConfigurationVersioned.Status.ACTIVE); } else { return null; } } else { for (SampleIndexConfigurationVersioned v : sampleIndexConfigurations) { if (v.getVersion() == version) { - return v.getConfiguration(); + return v; } } return null; @@ -243,18 +250,24 @@ public static class SampleIndexConfigurationVersioned { private SampleIndexConfiguration configuration; private int version; private Date date; -// private boolean staging; + private Status status; // private int numSamples; + public enum Status { + STAGING, // Index being built. Not ready. Not to be used. + ACTIVE, // Index ready to be used (if present) + DEPRECATED, // Index marked to be removed. + REMOVED // Index no longer exists. + } public SampleIndexConfigurationVersioned() { } - public SampleIndexConfigurationVersioned(SampleIndexConfiguration configuration, int version, Date date) { + public SampleIndexConfigurationVersioned(SampleIndexConfiguration configuration, int version, Date date, Status status) { this.configuration = configuration; this.version = version; this.date = date; -// this.staging = false; + this.status = status; } public SampleIndexConfiguration getConfiguration() { @@ -284,16 +297,16 @@ public SampleIndexConfigurationVersioned setDate(Date date) { return this; } -// public boolean isStaging() { -// return staging; -// } -// -// public SampleIndexConfigurationVersioned setStaging(boolean staging) { -// this.staging = staging; -// return this; -// } + public Status getStatus() { + return status; + } + + public SampleIndexConfigurationVersioned setStatus(Status status) { + this.status = status; + return this; + } -// public int getNumSamples() { + // public int getNumSamples() { // return numSamples; // } // diff --git a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/adaptors/VariantDBAdaptorMultiFileTest.java b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/adaptors/VariantDBAdaptorMultiFileTest.java index 123f70c0e8c..e24805b7403 100644 --- a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/adaptors/VariantDBAdaptorMultiFileTest.java +++ b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/adaptors/VariantDBAdaptorMultiFileTest.java @@ -273,6 +273,7 @@ public void testGetByStudies() throws Exception { .append(INCLUDE_SAMPLE.key(), ALL); queryResult = query(query, options); VariantQueryResult allVariants = dbAdaptor.get(new Query() + .append(INCLUDE_SAMPLE.key(), ALL) .append(VariantQueryParam.INCLUDE_STUDY.key(), study1), options); assertThat(queryResult, everyResult(allVariants, withStudy(study1))); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java index 29158f8569d..8f647baa0e8 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java @@ -333,7 +333,7 @@ public void sampleIndex(String study, List samples, ObjectMap options) t @Override public void sampleIndexAnnotate(String study, List samples, ObjectMap options) throws StorageEngineException { options = getMergedOptions(options); - new SampleIndexAnnotationLoader(hBaseManager, getTableNameGenerator(), getMetadataManager(), getMRExecutor()) + new SampleIndexAnnotationLoader(getSampleIndexDBAdaptor(), getMRExecutor()) .updateSampleAnnotation(study, samples, options); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/annotation/mr/SampleIndexAnnotationLoaderDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/annotation/mr/SampleIndexAnnotationLoaderDriver.java index 007bffa716c..41826fd4abe 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/annotation/mr/SampleIndexAnnotationLoaderDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/annotation/mr/SampleIndexAnnotationLoaderDriver.java @@ -9,7 +9,6 @@ import org.opencb.biodata.models.core.Region; import org.opencb.commons.datastore.core.ObjectMap; import org.opencb.opencga.core.config.storage.SampleIndexConfiguration; -import org.opencb.opencga.storage.core.exceptions.StorageEngineException; import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; import org.opencb.opencga.storage.core.metadata.models.SampleMetadata; import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; @@ -21,7 +20,6 @@ import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.PhoenixHelper; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; import org.opencb.opencga.storage.hadoop.variant.converters.HBaseToVariantConverter; -import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexAnnotationLoader; import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexSchema; import org.opencb.opencga.storage.hadoop.variant.mr.VariantAlignedInputFormat; import org.opencb.opencga.storage.hadoop.variant.mr.VariantMapReduceUtil; @@ -40,6 +38,7 @@ public class SampleIndexAnnotationLoaderDriver extends AbstractVariantsTableDriv private static final Logger LOGGER = LoggerFactory.getLogger(SampleIndexAnnotationLoaderDriver.class); public static final String OUTPUT = "output-table"; + public static final String SAMPLE_INDEX_VERSION = "sample-index-version"; public static final String SAMPLES = "samples"; public static final String SAMPLE_IDS = "sampleIds"; @@ -59,6 +58,7 @@ protected Map getParams() { Map params = new HashMap<>(); params.put("--" + SAMPLES, ""); params.put("--" + SAMPLE_IDS, ""); + params.put("--" + SAMPLE_INDEX_VERSION, ""); params.put("--" + OUTPUT, ""); params.put("--" + VariantQueryParam.REGION.key(), ""); return params; @@ -152,11 +152,11 @@ protected Job setupJob(Job job, String archiveTable, String variantTable) throws VariantMapReduceUtil.setNoneReduce(job); + sampleIndexVersion = Integer.parseInt(getParam(SAMPLE_INDEX_VERSION)); StudyMetadata.SampleIndexConfigurationVersioned versioned = getMetadataManager() .getStudyMetadata(getStudyId()) - .getSampleIndexConfigurationLatest(); + .getSampleIndexConfiguration(sampleIndexVersion); SampleIndexConfiguration configuration = versioned.getConfiguration(); - sampleIndexVersion = versioned.getVersion(); VariantMapReduceUtil.setSampleIndexConfiguration(job, configuration, sampleIndexVersion); return job; @@ -167,14 +167,6 @@ protected String getJobOperationName() { return "sample_index_annotation_loader"; } - @Override - protected void postExecution(boolean succeed) throws IOException, StorageEngineException { - super.postExecution(succeed); - if (succeed && StringUtils.isEmpty(region)) { - SampleIndexAnnotationLoader.postAnnotationLoad(getStudyId(), sampleIds, getMetadataManager(), sampleIndexVersion); - } - } - public static void main(String[] args) throws Exception { try { System.exit(new SampleIndexAnnotationLoaderDriver().privateMain(args, null)); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexAnnotationLoader.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexAnnotationLoader.java index 12a13dbd0bb..557b86c25a1 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexAnnotationLoader.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexAnnotationLoader.java @@ -1,11 +1,5 @@ package org.opencb.opencga.storage.hadoop.variant.index.sample; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Put; -import org.opencb.biodata.models.core.Region; -import org.opencb.biodata.models.variant.Variant; import org.opencb.commons.datastore.core.ObjectMap; import org.opencb.opencga.storage.core.exceptions.StorageEngineException; import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; @@ -13,18 +7,15 @@ import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryException; import org.opencb.opencga.storage.core.variant.query.VariantQueryUtils; import org.opencb.opencga.storage.hadoop.utils.HBaseManager; -import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor; -import org.opencb.opencga.storage.hadoop.variant.index.annotation.AnnotationIndexEntry; -import org.opencb.opencga.storage.hadoop.variant.index.annotation.AnnotationIndexPutBuilder; import org.opencb.opencga.storage.hadoop.variant.index.annotation.mr.SampleIndexAnnotationLoaderDriver; import org.opencb.opencga.storage.hadoop.variant.utils.HBaseVariantTableNameGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; -import java.util.function.Function; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; import static org.opencb.opencga.core.api.ParamConstants.OVERWRITE; import static org.opencb.opencga.storage.core.metadata.models.TaskMetadata.Status; @@ -35,22 +26,25 @@ */ public class SampleIndexAnnotationLoader { - private final HBaseManager hBaseManager; private final HBaseVariantTableNameGenerator tableNameGenerator; private final MRExecutor mrExecutor; private final SampleIndexDBAdaptor sampleDBAdaptor; - private final byte[] family; private final VariantStorageMetadataManager metadataManager; - private Logger logger = LoggerFactory.getLogger(SampleIndexAnnotationLoader.class); + private final Logger logger = LoggerFactory.getLogger(SampleIndexAnnotationLoader.class); public SampleIndexAnnotationLoader(HBaseManager hBaseManager, HBaseVariantTableNameGenerator tableNameGenerator, VariantStorageMetadataManager metadataManager, MRExecutor mrExecutor) { - this.hBaseManager = hBaseManager; this.tableNameGenerator = tableNameGenerator; this.mrExecutor = mrExecutor; this.metadataManager = metadataManager; this.sampleDBAdaptor = new SampleIndexDBAdaptor(hBaseManager, tableNameGenerator, this.metadataManager); - family = GenomeHelper.COLUMN_FAMILY_BYTES; + } + + public SampleIndexAnnotationLoader(SampleIndexDBAdaptor sampleDBAdaptor, MRExecutor mrExecutor) { + this.mrExecutor = mrExecutor; + this.sampleDBAdaptor = sampleDBAdaptor; + this.metadataManager = sampleDBAdaptor.getMetadataManager(); + this.tableNameGenerator = sampleDBAdaptor.getTableNameGenerator(); } public void updateSampleAnnotation(String study, List samples, ObjectMap options) @@ -138,220 +132,28 @@ public void updateSampleAnnotation(int studyId, List samples, ObjectMap for (int i = 0; i < batches; i++) { List subSet = finalSamplesList.subList(i * batchSize, Math.min((i + 1) * batchSize, finalSamplesList.size())); logger.info("Running MapReduce {}/{} over {} samples", i + 1, batches, subSet.size()); - updateSampleAnnotationBatchMapreduce(studyId, subSet, options); + updateSampleAnnotationBatchMapreduce(studyId, subSet, sampleIndexVersion, options); } } else { - updateSampleAnnotationBatchMapreduce(studyId, finalSamplesList, options); + updateSampleAnnotationBatchMapreduce(studyId, finalSamplesList, sampleIndexVersion, options); } + + postAnnotationLoad(studyId, sampleIndexVersion); } - private void updateSampleAnnotationBatchMapreduce(int studyId, List samples, ObjectMap options) + private void updateSampleAnnotationBatchMapreduce(int studyId, List samples, int sampleIndexVersion, ObjectMap options) throws StorageEngineException { options.put(SampleIndexAnnotationLoaderDriver.OUTPUT, sampleDBAdaptor.getSampleIndexTableNameLatest(studyId)); + options.put(SampleIndexAnnotationLoaderDriver.SAMPLE_INDEX_VERSION, sampleIndexVersion); mrExecutor.run(SampleIndexAnnotationLoaderDriver.class, SampleIndexAnnotationLoaderDriver.buildArgs( tableNameGenerator.getArchiveTableName(studyId), tableNameGenerator.getVariantTableName(), studyId, samples, options), "Annotate sample index for " + (samples.size() < 10 ? "samples " + samples : samples.size() + " samples")); - } - - -// private void updateSampleAnnotationBatchMultiThread(int studyId, List samples) throws IOException, StorageEngineException { -// logger.info("Update sample index annotation of " + samples.size() + " samples"); -// -// String sampleIndexTableName = tableNameGenerator.getSampleIndexTableName(studyId); -// -// ProgressLogger progressLogger = new ProgressLogger("Sample index annotation updated variants"); -// -// ParallelTaskRunner, Put> ptr = new ParallelTaskRunner<>( -// new DataReader>() { -// -// private Iterator> iterator = annotationIndexDBAdaptor.iterator(); -// private int initialCapacity = 200000; -// private Pair nextPair = null; -// -// private String chromosome = ""; -// private int start = -1; -// private int end = -1; -// -// @Override -// public List> read(int n) { -// List> annotationMasks = new ArrayList<>(initialCapacity); -// -// // Read next batch -// if (nextPair == null && iterator.hasNext()) { -// nextPair = iterator.next(); -// } -// if (nextPair != null) { -// annotationMasks.add(nextPair); -// Variant firstVariant = nextPair.getKey(); -// chromosome = firstVariant.getChromosome(); -// start = firstVariant.getStart() - (firstVariant.getStart() % SampleIndexSchema.BATCH_SIZE); -// end = start + SampleIndexSchema.BATCH_SIZE; -// nextPair = null; -// } -// while (iterator.hasNext()) { -// Pair pair = iterator.next(); -// Variant variant = pair.getKey(); -// if (variant.getChromosome().equals(chromosome) && variant.getStart() > start && variant.getStart() < end) { -// annotationMasks.add(pair); -// } else { -//// logger.info("Variant " + variant -//// + "(" + variant.getChromosome() + ":" + variant.getStart() + "-" + variant.getEnd() + ")" -//// + " not in batch " + chromosome + ":" + start + "-" + end); -// nextPair = pair; -// break; -// } -// } -// -// return annotationMasks; -// } -// }, -// annotationMasks -> { -// // Ensure is sorted as expected -// annotationMasks.sort(Comparator.comparing(Pair::getKey, -// SampleIndexSchema.INTRA_CHROMOSOME_VARIANT_COMPARATOR)); -// -// Variant firstVariant = annotationMasks.get(0).getKey(); -// String chromosome = firstVariant.getChromosome(); -// int start = firstVariant.getStart() - (firstVariant.getStart() % SampleIndexSchema.BATCH_SIZE); -// int end = start + SampleIndexSchema.BATCH_SIZE; -// -// progressLogger.increment(annotationMasks.size(), () -> "Up to batch " + chromosome + ":" + start + "-" + end); -// List puts = new ArrayList<>(samples.size()); -// -// for (Integer sampleId : samples) { -// Map> map = sampleDBAdaptor.queryByGt(studyId, sampleId, chromosome, start); -// Put put = annotate(chromosome, start, sampleId, map, annotationMasks); -// if (!put.isEmpty()) { -// puts.add(put); -// } -//// else logger.warn("Empty put for sample " + sampleId + " -> " + chromosome + ":" + start + ":" + end); -// } -// -// return puts; -// }, -// new HBaseDataWriter<>(hBaseManager, sampleIndexTableName), -// ParallelTaskRunner.Config.builder().setNumTasks(8).setCapacity(2).build() -// ); -// -// try { -// ptr.run(); -// } catch (ExecutionException e) { -// throw new StorageEngineException("Error", e); -// } -// -// postAnnotationLoad(studyId, samples); -// } - - public void updateSampleAnnotationMultiSampleIterator(int studyId, List samples, - Function>> annotationIndexReader) - throws IOException, StorageEngineException { - int version = sampleDBAdaptor.getSampleIndexConfigurationLatest(studyId).getVersion(); - String sampleIndexTableName = sampleDBAdaptor.getSampleIndexTableName(studyId, version); - Map>>> sampleIterators = new HashMap<>(samples.size()); - SampleIndexSchema schema = sampleDBAdaptor.getSchemaLatest(studyId); - - for (Integer sample : samples) { - sampleIterators.put(sample, sampleDBAdaptor.iteratorByGt(studyId, sample)); - } - - BufferedMutator mutator = hBaseManager.getConnection().getBufferedMutator(TableName.valueOf(sampleIndexTableName)); - - String chromosome = ""; - int start = -1; - int end = -1; - List> annotationEntries = null; - do { - for (Map.Entry>>> sampleIteratorPair : sampleIterators.entrySet()) { - Iterator>> sampleIterator = sampleIteratorPair.getValue(); - Integer sampleId = sampleIteratorPair.getKey(); - if (sampleIterator.hasNext()) { - Map> next = sampleIterator.next(); - - Variant firstVariant = next.values().iterator().next().get(0); - if (annotationEntries == null - || !chromosome.equals(firstVariant.getChromosome()) - || firstVariant.getStart() < start - || firstVariant.getStart() > end) { - chromosome = firstVariant.getChromosome(); - start = firstVariant.getStart() - firstVariant.getStart() % SampleIndexSchema.BATCH_SIZE; - end = start + SampleIndexSchema.BATCH_SIZE; - // FIXME -// annotationEntries = annotationIndexDBAdaptor.get(chromosome, start, end); - annotationEntries = annotationIndexReader.apply(new Region(chromosome, start, end)); - } - - Put put = annotate(chromosome, start, sampleId, next, annotationEntries, schema); - mutator.mutate(put); - } - } - // Remove exhausted iterators - sampleIterators.entrySet().removeIf(e -> !e.getValue().hasNext()); - } while (!sampleIterators.isEmpty()); - - mutator.close(); - - postAnnotationLoad(studyId, samples, version); + postAnnotationBatchLoad(studyId, samples, sampleIndexVersion); } - private Put annotate(String chromosome, int start, Integer sampleId, - Map> sampleIndex, List> annotationMasks, - SampleIndexSchema schema) { - byte[] rk = SampleIndexSchema.toRowKey(sampleId, chromosome, start); - Put put = new Put(rk); - - for (Map.Entry> entry : sampleIndex.entrySet()) { - String gt = entry.getKey(); - List variantsToAnnotate = entry.getValue(); - if (!SampleIndexSchema.isAnnotatedGenotype(gt)) { - continue; - } - - ListIterator> iterator = annotationMasks.listIterator(); - AnnotationIndexPutBuilder builder = new AnnotationIndexPutBuilder(schema, variantsToAnnotate.size()); - int missingVariants = 0; - // Assume both lists are ordered, and "variantsToAnnotate" is fully contained in "annotationMasks" - for (Variant variantToAnnotate : variantsToAnnotate) { - boolean restarted = false; - while (iterator.hasNext()) { - Pair annotationPair = iterator.next(); - if (annotationPair.getKey().sameGenomicVariant(variantToAnnotate)) { - builder.add(annotationPair.getRight()); - break; - } else if (annotationPair.getKey().getStart() > variantToAnnotate.getStart()) { - if (!restarted) { - logger.warn("Missing variant to annotate " + variantToAnnotate + " RESTART ITERATOR"); - while (iterator.hasPrevious()) { - iterator.previous(); - } - restarted = true; - } else { - logger.error("Missing variant to annotate " + variantToAnnotate); - builder.add(AnnotationIndexEntry.empty(schema)); - missingVariants++; - break; - } - } - } - } - if (missingVariants > 0) { - // TODO: What if a variant is not annotated? - String msg = "Error annotating batch. " + missingVariants + " missing variants"; - logger.error(msg); -// throw new IllegalStateException(msg); - } - - builder.buildAndReset(put, gt, family); - } - return put; - } - - private void postAnnotationLoad(int studyId, List samples, int version) throws StorageEngineException { - postAnnotationLoad(studyId, samples, metadataManager, version); - } - - public static void postAnnotationLoad(int studyId, List samples, VariantStorageMetadataManager metadataManager, int version) + public void postAnnotationBatchLoad(int studyId, List samples, int version) throws StorageEngineException { for (Integer sampleId : samples) { metadataManager.updateSampleMetadata(studyId, sampleId, sampleMetadata -> { @@ -360,5 +162,8 @@ public static void postAnnotationLoad(int studyId, List samples, Varian } } - + public void postAnnotationLoad(int studyId, int version) + throws StorageEngineException { + sampleDBAdaptor.updateSampleIndexSchemaStatus(studyId, version); + } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDBAdaptor.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDBAdaptor.java index 9b203722e0f..c1a1cd46e30 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDBAdaptor.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDBAdaptor.java @@ -13,8 +13,11 @@ import org.opencb.commons.datastore.core.ObjectMap; import org.opencb.commons.datastore.core.Query; import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.core.metadata.models.SampleMetadata; import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; +import org.opencb.opencga.storage.core.metadata.models.TaskMetadata; import org.opencb.opencga.storage.core.utils.iterators.CloseableIterator; import org.opencb.opencga.storage.core.utils.iterators.IntersectMultiKeyIterator; import org.opencb.opencga.storage.core.utils.iterators.UnionMultiKeyIterator; @@ -184,7 +187,7 @@ public String getSampleIndexTableName(SampleIndexQuery query) { } public String getSampleIndexTableNameLatest(int studyId) { - int version = schemaFactory.getSampleIndexConfigurationLatest(studyId).getVersion(); + int version = schemaFactory.getSampleIndexConfigurationLatest(studyId, true).getVersion(); return tableNameGenerator.getSampleIndexTableName(studyId, version); } @@ -445,10 +448,6 @@ public SampleIndexSchema getSchemaLatest(Object study) { return schemaFactory.getSchemaLatest(toStudyId(study)); } - public StudyMetadata.SampleIndexConfigurationVersioned getSampleIndexConfigurationLatest(int studyId) { - return schemaFactory.getSampleIndexConfigurationLatest(studyId); - } - protected int toStudyId(Object study) { int studyId; if (study == null || study instanceof String && ((String) study).isEmpty()) { @@ -789,4 +788,41 @@ public boolean createTableIfNeeded(int studyId, int version, ObjectMap options) } } + public void updateSampleIndexSchemaStatus(int studyId, int version) throws StorageEngineException { + StudyMetadata studyMetadata = metadataManager.getStudyMetadata(studyId); + if (studyMetadata.getSampleIndexConfiguration(version).getStatus() + != StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING) { + // Update only if status is STAGING + return; + } + Iterator it = metadataManager.sampleMetadataIterator(studyId); + boolean allSamplesWithThisVersion = true; + while (it.hasNext()) { + SampleMetadata sampleMetadata = it.next(); + if (sampleMetadata.getIndexStatus() == TaskMetadata.Status.READY) { + // Only check indexed samples + if (sampleMetadata.getSampleIndexStatus(version) != TaskMetadata.Status.READY + && sampleMetadata.getSampleIndexAnnotationStatus(version) != TaskMetadata.Status.READY) { + allSamplesWithThisVersion = false; + break; + } + } + } + if (allSamplesWithThisVersion) { + metadataManager.updateStudyMetadata(studyId, sm -> { + sm.getSampleIndexConfiguration(version) + .setStatus(StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE); + }); + } else { + logger.info("Not all samples had the sample index version {} on GENOTYPES and ANNOTATION", version); + } + } + +// public void deprecateSampleIndexSchemas(int studyId) throws StorageEngineException { +// +// +// +// } + + } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDriver.java index caab6dacf60..66b9bbf970b 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexDriver.java @@ -18,6 +18,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.opencb.biodata.models.core.Region; import org.opencb.biodata.models.variant.Variant; +import org.opencb.opencga.core.config.storage.SampleIndexConfiguration; import org.opencb.opencga.storage.core.io.bit.BitBuffer; import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; import org.opencb.opencga.storage.core.metadata.models.SampleMetadata; @@ -80,6 +81,7 @@ public class SampleIndexDriver extends AbstractVariantsTableDriver { private static final String FIXED_ATTRIBUTES = "SampleIndexDriver.fixedAttributes"; private static final String FIXED_SAMPLE_DATA_KEYS = "SampleIndexDriver.fixedSampleDataKeys"; private static final String PARTIAL_SCAN = "SampleIndexDriver.partial_scan"; + public static final String SAMPLE_INDEX_VERSION = "sample-index-version"; private int study; private String outputTable; @@ -114,6 +116,7 @@ protected Map getParams() { params.put("--" + SAMPLE_IDS, ""); params.put("--" + VariantStorageOptions.STUDY.key(), ""); params.put("--" + OUTPUT, ""); + params.put("--" + SAMPLE_INDEX_VERSION, ""); params.put("--" + SECONDARY_ONLY, ""); // params.put("--" + MAIN_ONLY, ""); params.put("--" + VariantQueryParam.REGION.key(), ""); @@ -329,8 +332,9 @@ protected Job setupJob(Job job, String archiveTable, String table) throws IOExce } StudyMetadata studyMetadata = getMetadataManager().getStudyMetadata(getStudyId()); - StudyMetadata.SampleIndexConfigurationVersioned latest = studyMetadata.getSampleIndexConfigurationLatest(); - VariantMapReduceUtil.setSampleIndexConfiguration(job, latest.getConfiguration(), latest.getVersion()); + int sampleIndexVersion = Integer.parseInt(getParam(SAMPLE_INDEX_VERSION)); + SampleIndexConfiguration configuration = studyMetadata.getSampleIndexConfiguration(sampleIndexVersion).getConfiguration(); + VariantMapReduceUtil.setSampleIndexConfiguration(job, configuration, sampleIndexVersion); return job; } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexLoader.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexLoader.java index 4b75881a013..3778b4d047a 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexLoader.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexLoader.java @@ -118,6 +118,7 @@ private void buildSampleIndexBatchMapreduce(int studyId, List samples, options = new ObjectMap(options); options.put(SampleIndexDriver.SAMPLE_IDS, samples); options.put(SampleIndexDriver.OUTPUT, sampleIndexDBAdaptor.getSampleIndexTableName(studyId, schema.getVersion())); + options.put(SampleIndexDriver.SAMPLE_INDEX_VERSION, schema.getVersion()); mrExecutor.run(SampleIndexDriver.class, SampleIndexDriver.buildArgs( diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexSchemaFactory.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexSchemaFactory.java index 85cbcfb8885..a1cade8e4b2 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexSchemaFactory.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexSchemaFactory.java @@ -24,15 +24,18 @@ public SampleIndexSchema getSchema(int studyId, int sampleId) { } public SampleIndexSchema getSchema(int studyId, int sampleId, boolean acceptPartialAnnotationIndex) { + StudyMetadata studyMetadata = metadataManager.getStudyMetadata(studyId); Collection versions = getSampleIndexConfigurationVersions(studyId, sampleId, true); + removeStagingVersions(studyMetadata, versions); if (versions.isEmpty() && acceptPartialAnnotationIndex) { versions = getSampleIndexConfigurationVersions(studyId, sampleId, false); } + removeStagingVersions(studyMetadata, versions); if (versions.isEmpty()) { throw sampleIndexNotFound(Collections.singletonList(metadataManager.getSampleName(studyId, sampleId))); } int version = versions.stream().mapToInt(i -> i).max().getAsInt(); - SampleIndexConfiguration sampleIndexConfiguration = getSampleIndexConfiguration(studyId, version); + SampleIndexConfiguration sampleIndexConfiguration = studyMetadata.getSampleIndexConfiguration(version).getConfiguration(); if (sampleIndexConfiguration == null) { throw new VariantQueryException("Unable to use sample index version " + version + " required to query sample " @@ -49,8 +52,9 @@ public SampleIndexSchema getSchema(int studyId, Collection samples, bool if (samples.isEmpty()) { throw new IllegalArgumentException("Missing samples"); } - int version = getSampleIndexConfigurationVersion(studyId, samples, acceptPartialAnnotationIndex); - SampleIndexConfiguration sampleIndexConfiguration = getSampleIndexConfiguration(studyId, version); + StudyMetadata studyMetadata = metadataManager.getStudyMetadata(studyId); + int version = getSampleIndexConfigurationVersion(studyId, samples, acceptPartialAnnotationIndex, studyMetadata); + SampleIndexConfiguration sampleIndexConfiguration = studyMetadata.getSampleIndexConfiguration(version).getConfiguration(); if (sampleIndexConfiguration == null) { throw new VariantQueryException("Unable to use sample index version " + version + " required to query samples " + samples); @@ -86,22 +90,36 @@ private Collection getSampleIndexConfigurationVersions(int studyId, Obj } } - public int getSampleIndexConfigurationVersion(int studyId, Collection samples) { - return getSampleIndexConfigurationVersion(studyId, samples, false); + public int getSampleIndexConfigurationVersion(int studyId, Collection samples, boolean acceptPartialAnnotationIndex) { + return getSampleIndexConfigurationVersion( + studyId, samples, acceptPartialAnnotationIndex, metadataManager.getStudyMetadata(studyId)); } - public int getSampleIndexConfigurationVersion(int studyId, Collection samples, boolean acceptPartialAnnotationIndex) { + private int getSampleIndexConfigurationVersion(int studyId, Collection samples, boolean acceptPartialAnnotationIndex, + StudyMetadata studyMetadata) { Collection validVersions = getSampleIndexConfigurationVersions(studyId, samples, true); + removeStagingVersions(studyMetadata, validVersions); if (validVersions.isEmpty() && acceptPartialAnnotationIndex) { validVersions = getSampleIndexConfigurationVersions(studyId, samples, false); } - + removeStagingVersions(studyMetadata, validVersions); if (validVersions.isEmpty()) { throw sampleIndexNotFound(samples); } return validVersions.stream().mapToInt(i -> i).max().getAsInt(); } + private void removeStagingVersions(StudyMetadata studyMetadata, Collection validVersions) { + if (studyMetadata.getSampleIndexConfigurations() == null || validVersions.isEmpty()) { + return; + } + for (StudyMetadata.SampleIndexConfigurationVersioned v : studyMetadata.getSampleIndexConfigurations()) { + if (v.getStatus() == StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING) { + validVersions.remove(v.getVersion()); + } + } + } + private VariantQueryException sampleIndexNotFound(Collection samples) { if (samples.size() == 1) { throw new VariantQueryException("Not found valid sample index for sample " + samples.iterator().next()); @@ -110,18 +128,29 @@ private VariantQueryException sampleIndexNotFound(Collection samples) { } } + /** + * Get the latest schema available, including staging schemas. + * @param studyId studyId + * @return Latest schema available + */ public SampleIndexSchema getSchemaLatest(int studyId) { - StudyMetadata.SampleIndexConfigurationVersioned latest = getSampleIndexConfigurationLatest(studyId); - return new SampleIndexSchema(latest.getConfiguration(), latest.getVersion()); + return getSchemaLatest(studyId, true); } - public StudyMetadata.SampleIndexConfigurationVersioned getSampleIndexConfigurationLatest(int studyId) { - return metadataManager.getStudyMetadata(studyId).getSampleIndexConfigurationLatest(); + /** + * Get the latest schema available. + * @param studyId studyId + * @param includeStagingSchemas Include schemas with status + * {@link org.opencb.opencga.storage.core.metadata.models.StudyMetadata.SampleIndexConfigurationVersioned.Status#STAGING}. + * @return Latest schema available + */ + public SampleIndexSchema getSchemaLatest(int studyId, boolean includeStagingSchemas) { + StudyMetadata.SampleIndexConfigurationVersioned latest = getSampleIndexConfigurationLatest(studyId, includeStagingSchemas); + return new SampleIndexSchema(latest.getConfiguration(), latest.getVersion()); } - public SampleIndexConfiguration getSampleIndexConfiguration(int studyId, int version) { - return metadataManager.getStudyMetadata(studyId).getSampleIndexConfiguration(version); + public StudyMetadata.SampleIndexConfigurationVersioned getSampleIndexConfigurationLatest(int studyId, boolean includeStagingSchemas) { + return metadataManager.getStudyMetadata(studyId).getSampleIndexConfigurationLatest(includeStagingSchemas); } - } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java index 2f886ac28cf..771a097574d 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java @@ -28,6 +28,7 @@ import org.opencb.opencga.core.response.VariantQueryResult; import org.opencb.opencga.storage.core.exceptions.StorageEngineException; import org.opencb.opencga.storage.core.metadata.models.SampleMetadata; +import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest; import org.opencb.opencga.storage.core.variant.VariantStorageEngine; import org.opencb.opencga.storage.core.variant.VariantStorageOptions; @@ -108,7 +109,7 @@ public void before() throws Exception { public void load() throws Exception { clearDB(DB_NAME); - + StudyMetadata.SampleIndexConfigurationVersioned versioned; HadoopVariantStorageEngine engine = getVariantStorageEngine(); // Study 1 - single file @@ -126,19 +127,26 @@ public void load() throws Exception { .append(VariantStorageOptions.STATS_CALCULATE.key(), false) .append(VariantStorageOptions.LOAD_SPLIT_DATA.key(), VariantStorageEngine.SplitData.MULTI); - int version = metadataManager.addSampleIndexConfiguration(STUDY_NAME_2, SampleIndexConfiguration.defaultConfiguration() - .addFileIndexField(new IndexFieldConfiguration(IndexFieldConfiguration.Source.SAMPLE, "DS", new double[]{0, 1, 2}))).getVersion(); - System.out.println("version = " + version); + versioned = metadataManager.addSampleIndexConfiguration(STUDY_NAME_2, SampleIndexConfiguration.defaultConfiguration() + .addFileIndexField(new IndexFieldConfiguration(IndexFieldConfiguration.Source.SAMPLE, "DS", new double[]{0, 1, 2})), true); + assertEquals(2, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING, versioned.getStatus()); runETL(engine, getResourceUri("by_chr/chr22_1-1.variant-test-file.vcf.gz"), outputUri, params, true, true, true); runETL(engine, getResourceUri("by_chr/chr22_1-2.variant-test-file.vcf.gz"), outputUri, params, true, true, true); runETL(engine, getResourceUri("by_chr/chr22_1-2-DUP.variant-test-file.vcf.gz"), outputUri, params, true, true, true); engine.familyIndex(STUDY_NAME_2, trios, new ObjectMap()); + versioned = metadataManager.getStudyMetadata(STUDY_NAME_2).getSampleIndexConfiguration(versioned.getVersion()); + assertEquals(2, versioned.getVersion()); + // Not annotated + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING, versioned.getStatus()); + + // Study 3 - platinum metadataManager.addSampleIndexConfiguration(STUDY_NAME_3, SampleIndexConfiguration.defaultConfiguration() .addFileIndexField(new IndexFieldConfiguration(IndexFieldConfiguration.Source.FILE, "culprit", - IndexFieldConfiguration.Type.CATEGORICAL, "DP", "FS", "MQ", "QD").setNullable(true))); + IndexFieldConfiguration.Type.CATEGORICAL, "DP", "FS", "MQ", "QD").setNullable(true)), true); params = new ObjectMap() .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_3) @@ -147,15 +155,52 @@ public void load() throws Exception { runETL(engine, getPlatinumFile(0), outputUri, params, true, true, true); runETL(engine, getPlatinumFile(1), outputUri, params, true, true, true); + versioned = metadataManager.getStudyMetadata(STUDY_NAME_3).getSampleIndexConfiguration(versioned.getVersion()); + assertEquals(2, versioned.getVersion()); + // Not annotated + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING, versioned.getStatus()); + + + // ---------------- Annotate this.variantStorageEngine.annotate(new Query(), new QueryOptions(DefaultVariantAnnotationManager.OUT_DIR, outputUri)); engine.familyIndex(STUDY_NAME_3, triosPlatinum, new ObjectMap()); + // Study 1 - extra sample index configuration, not staging, only one sample in that configuration + SampleIndexConfiguration configuration = engine.getMetadataManager().getStudyMetadata(STUDY_NAME).getSampleIndexConfigurationLatest().getConfiguration(); // Don't modify the configuration. - engine.getMetadataManager().addSampleIndexConfiguration(STUDY_NAME, configuration); + versioned = engine.getMetadataManager().addSampleIndexConfiguration(STUDY_NAME, configuration, true); + assertEquals(2, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING, versioned.getStatus()); + engine.sampleIndex(STUDY_NAME, Collections.singletonList("NA19660"), new ObjectMap()); engine.sampleIndexAnnotate(STUDY_NAME, Collections.singletonList("NA19660"), new ObjectMap()); + versioned = engine.getMetadataManager().getStudyMetadata(STUDY_NAME).getSampleIndexConfigurationLatest(false); + assertEquals(1, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE, versioned.getStatus()); + + versioned = engine.getMetadataManager().getStudyMetadata(STUDY_NAME).getSampleIndexConfigurationLatest(true); + assertEquals(2, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING, versioned.getStatus()); + + engine.getMetadataManager().updateStudyMetadata(STUDY_NAME, sm -> { + sm.getSampleIndexConfigurationLatest(true).setStatus(StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE); + }); + versioned = engine.getMetadataManager().getStudyMetadata(STUDY_NAME).getSampleIndexConfigurationLatest(false); + assertEquals(2, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE, versioned.getStatus()); + + // Study 2 - Latest should be active + versioned = metadataManager.getStudyMetadata(STUDY_NAME_2).getSampleIndexConfiguration(versioned.getVersion()); + assertEquals(2, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE, versioned.getStatus()); + + // Study 3 - Latest should be active + versioned = metadataManager.getStudyMetadata(STUDY_NAME_3).getSampleIndexConfiguration(versioned.getVersion()); + assertEquals(2, versioned.getVersion()); + assertEquals(StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE, versioned.getStatus()); + VariantHbaseTestUtils.printVariants(dbAdaptor, newOutputUri()); }