Skip to content

Commit

Permalink
storage: Update sample index schema status after FamilyIndex #1901
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Mar 17, 2022
1 parent d1ba365 commit 5c4b84c
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ public SampleIndexConfigurationVersioned getSampleIndexConfigurationLatest(boole
return new SampleIndexConfigurationVersioned(SampleIndexConfiguration.defaultConfiguration(),
DEFAULT_SAMPLE_INDEX_VERSION, Date.from(Instant.now()), SampleIndexConfigurationVersioned.Status.ACTIVE);
} else {
SampleIndexConfigurationVersioned conf = sampleIndexConfigurations.get(0);
SampleIndexConfigurationVersioned conf = null;
for (SampleIndexConfigurationVersioned thisConf : sampleIndexConfigurations) {
if (includeStagingSchemas || thisConf.getStatus() != SampleIndexConfigurationVersioned.Status.STAGING) {
if (thisConf.getVersion() > conf.getVersion()) {
if (thisConf.getStatus() == SampleIndexConfigurationVersioned.Status.ACTIVE
|| includeStagingSchemas && thisConf.getStatus() == SampleIndexConfigurationVersioned.Status.STAGING) {
if (conf == null) {
conf = thisConf;
} else if (thisConf.getVersion() > conf.getVersion()) {
conf = thisConf;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.SampleMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.utils.BatchUtils;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions;
import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor;
Expand Down Expand Up @@ -101,22 +102,23 @@ public DataResult<List<String>> load(String study, List<List<String>> trios, Obj

int batchSize = options.getInt(HadoopVariantStorageOptions.SAMPLE_INDEX_FAMILY_MAX_TRIOS_PER_MR.key(),
HadoopVariantStorageOptions.SAMPLE_INDEX_FAMILY_MAX_TRIOS_PER_MR.defaultValue());
List<List<List<String>>> batches = splitLists(trios, batchSize);
List<List<List<String>>> batches = BatchUtils.splitBatches(trios, batchSize);
if (batches.size() == 1) {
run(study, trios, options, studyId);
runBatch(study, trios, options, studyId);
} else {
logger.warn("Unable to run family index in one single MapReduce operation.");
logger.info("Split in {} jobs of {} samples each.", batches, batches.get(0).size());
for (int i = 0; i < batches.size(); i++) {
List<List<String>> batch = batches.get(i);
logger.info("Running MapReduce {}/{} over {} trios", i + 1, batches, batch.size());
run(study, batch, options, studyId);
runBatch(study, batch, options, studyId);
}
}
postIndex(studyId, version);
return dr;
}

private void run(String study, List<List<String>> trios, ObjectMap options, int studyId) throws StorageEngineException {
private void runBatch(String study, List<List<String>> trios, ObjectMap options, int studyId) throws StorageEngineException {
if (trios.size() < 500) {
options.put(FamilyIndexDriver.TRIOS, trios.stream().map(trio -> String.join(",", trio)).collect(Collectors.joining(";")));
} else {
Expand All @@ -134,15 +136,9 @@ private void run(String study, List<List<String>> trios, ObjectMap options, int
"Precompute mendelian errors for " + (trios.size() == 1 ? "trio " + trios.get(0) : trios.size() + " trios"));
}

private static <T> List<List<T>> splitLists(List<T> list, int maxBatchSize) {
int batchSize = maxBatchSize;
int batches = (int) Math.round(Math.ceil(list.size() / ((float) batchSize)));
batchSize = (int) Math.round(Math.ceil(list.size() / ((float) batches)));
List<List<T>> parts = new ArrayList<>(batches);
for (int i = 0; i < batches; i++) {
parts.add(list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())));
}
return parts;
public void postIndex(int studyId, int version)
throws StorageEngineException {
sampleIndexDBAdaptor.updateSampleIndexSchemaStatus(studyId, version);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -795,19 +795,7 @@ public void updateSampleIndexSchemaStatus(int studyId, int version) throws Stora
// Update only if status is STAGING
return;
}
Iterator<SampleMetadata> it = metadataManager.sampleMetadataIterator(studyId);
boolean allSamplesWithThisVersion = true;
while (it.hasNext()) {
SampleMetadata sampleMetadata = it.next();
if (sampleMetadata.getIndexStatus() == TaskMetadata.Status.READY) {
// Only check indexed samples
if (sampleMetadata.getSampleIndexStatus(version) != TaskMetadata.Status.READY
&& sampleMetadata.getSampleIndexAnnotationStatus(version) != TaskMetadata.Status.READY) {
allSamplesWithThisVersion = false;
break;
}
}
}
boolean allSamplesWithThisVersion = isAllSamplesWithThisVersion(studyId, version);
if (allSamplesWithThisVersion) {
metadataManager.updateStudyMetadata(studyId, sm -> {
sm.getSampleIndexConfiguration(version)
Expand All @@ -818,11 +806,80 @@ public void updateSampleIndexSchemaStatus(int studyId, int version) throws Stora
}
}

// public void deprecateSampleIndexSchemas(int studyId) throws StorageEngineException {
//
//
//
// }
public void deprecateSampleIndexSchemas(int studyId) throws StorageEngineException {
StudyMetadata studyMetadata = metadataManager.getStudyMetadata(studyId);
Set<Integer> deprecatables = new LinkedHashSet<>();
for (StudyMetadata.SampleIndexConfigurationVersioned sampleIndexConfiguration : studyMetadata.getSampleIndexConfigurations()) {
if (sampleIndexConfiguration.getStatus() == StudyMetadata.SampleIndexConfigurationVersioned.Status.ACTIVE
|| sampleIndexConfiguration.getStatus() == StudyMetadata.SampleIndexConfigurationVersioned.Status.STAGING) {
deprecatables.add(sampleIndexConfiguration.getVersion());
}
}
StudyMetadata.SampleIndexConfigurationVersioned latest = studyMetadata.getSampleIndexConfigurationLatest(false);
if (latest == null) {
logger.info("No active sample index available for study '{}'", studyMetadata.getName());
return;
}
int latestVersion = latest.getVersion();
deprecatables.remove(latestVersion);
if (deprecatables.isEmpty()) {
logger.info("No sample indexes to deprecate for study '{}'", studyMetadata.getName());
return;
}

int totalSamples = 0;
int samplesNotInLatest = 0;
for (SampleMetadata sampleMetadata : metadataManager.sampleMetadataIterable(studyId)) {
// Only check indexed samples
if (sampleMetadata.getIndexStatus() == TaskMetadata.Status.READY) {
totalSamples++;
if (!sampleWithThisVersion(sampleMetadata, latestVersion)) {
samplesNotInLatest++;
}
}
}
if (samplesNotInLatest > 0) {
logger.warn("Unable to deprecate sample indexes. {} samples out of {} not in the latest sample index for study '{}'",
samplesNotInLatest, totalSamples, studyMetadata.getName());
return;
}
logger.info("Deprecate sample indexes {} for study '{}'", deprecatables, studyMetadata.getName());
metadataManager.updateStudyMetadata(studyId, sm -> {
for (Integer version : deprecatables) {
sm.getSampleIndexConfiguration(version).setStatus(StudyMetadata.SampleIndexConfigurationVersioned.Status.DEPRECATED);
}
});
}

private boolean isAllSamplesWithThisVersion(int studyId, int version) {
boolean allSamplesWithThisVersion = true;
for (SampleMetadata sampleMetadata : metadataManager.sampleMetadataIterable(studyId)) {
// Only check indexed samples
if (sampleMetadata.getIndexStatus() == TaskMetadata.Status.READY) {
allSamplesWithThisVersion = sampleWithThisVersion(sampleMetadata, version);
if (!allSamplesWithThisVersion) {
break;
}
}
}
return allSamplesWithThisVersion;
}

private boolean sampleWithThisVersion(SampleMetadata sampleMetadata, int version) {
// Check SampleIndex + SampleIndexAnnotation + FamilyIndex (if needed)
if (sampleMetadata.getSampleIndexStatus(version) != TaskMetadata.Status.READY) {
return false;
}
if (sampleMetadata.getSampleIndexAnnotationStatus(version) != TaskMetadata.Status.READY) {
return false;
}
if (sampleMetadata.getMendelianErrorStatus() == TaskMetadata.Status.READY) {
if (sampleMetadata.getFamilyIndexStatus(version) != TaskMetadata.Status.READY) {
return false;
}
}
return true;
}


}

0 comments on commit 5c4b84c

Please sign in to comment.