Skip to content

Commit

Permalink
storage: Add deleteAnnotationSnapshot operation to storage-hadoop. #830
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Apr 26, 2018
1 parent 5fd1d59 commit 89d1321
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 32 deletions.
Expand Up @@ -225,7 +225,6 @@ public Scan parseQuery(VariantQueryUtils.SelectVariantElements selectElements, Q

Scan scan = new Scan();
byte[] family = genomeHelper.getColumnFamily();
scan.addFamily(family);
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
// FilterList regionFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
// filters.addFilter(regionFilters);
Expand Down Expand Up @@ -480,11 +479,11 @@ public Scan parseQuery(VariantQueryUtils.SelectVariantElements selectElements, Q
scan.addColumn(family, Bytes.toBytes(VariantPhoenixHelper.getAnnotationSnapshotColumn(name)));
} else {
scan.addColumn(family, FULL_ANNOTATION.bytes());
}
if (defaultStudyConfiguration != null) {
int release = defaultStudyConfiguration.getAttributes().getInt(RELEASE.key(), RELEASE.defaultValue());
for (int i = 1; i <= release; i++) {
scan.addColumn(family, VariantPhoenixHelper.buildReleaseColumnKey(release));
if (defaultStudyConfiguration != null) {
int release = defaultStudyConfiguration.getAttributes().getInt(RELEASE.key(), RELEASE.defaultValue());
for (int i = 1; i <= release; i++) {
scan.addColumn(family, VariantPhoenixHelper.buildReleaseColumnKey(release));
}
}
}
}
Expand Down
Expand Up @@ -32,14 +32,14 @@
import org.opencb.opencga.storage.core.variant.annotation.DefaultVariantAnnotationManager;
import org.opencb.opencga.storage.core.variant.annotation.annotators.VariantAnnotator;
import org.opencb.opencga.storage.hadoop.utils.CopyHBaseColumnDriver;
import org.opencb.opencga.storage.hadoop.utils.DeleteHBaseColumnDriver;
import org.opencb.opencga.storage.hadoop.utils.HBaseDataWriter;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine;
import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor;
import org.opencb.opencga.storage.hadoop.variant.converters.annotation.VariantAnnotationToHBaseConverter;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor;
import org.opencb.opencga.storage.hadoop.variant.index.phoenix.VariantPhoenixHelper;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -91,15 +91,8 @@ protected QueryOptions getIteratorQueryOptions(Query query, ObjectMap params) {

@Override
public void createAnnotationSnapshot(String name, ObjectMap inputOptions) throws StorageEngineException {
QueryOptions options = new QueryOptions(baseOptions);
if (inputOptions != null) {
options.putAll(inputOptions);
}
String hadoopRoute = options.getString(HadoopVariantStorageEngine.HADOOP_BIN, "hadoop");
String jar = HadoopVariantStorageEngine.getJarWithDependencies(options);
QueryOptions options = getOptions(inputOptions);

Class execClass = CopyHBaseColumnDriver.class;
String executable = hadoopRoute + " jar " + jar + ' ' + execClass.getName();
String columnFamily = Bytes.toString(dbAdaptor.getGenomeHelper().getColumnFamily());
String targetColumn = VariantPhoenixHelper.getAnnotationSnapshotColumn(name);
Map<String, String> columnsToCopyMap = Collections.singletonMap(
Expand All @@ -109,25 +102,28 @@ public void createAnnotationSnapshot(String name, ObjectMap inputOptions) throws
dbAdaptor.getTableNameGenerator().getVariantTableName(),
columnsToCopyMap, options);

long startTime = System.currentTimeMillis();
logger.info("------------------------------------------------------");
logger.info("Copy current annotation into " + targetColumn);
logger.debug(executable + ' ' + Arrays.toString(args));
logger.info("------------------------------------------------------");
int exitValue = mrExecutor.run(executable, args);
logger.info("------------------------------------------------------");
logger.info("Exit value: {}", exitValue);
logger.info("Total time: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
if (exitValue != 0) {
throw new StorageEngineException("Error creating snapshot of current annotation. "
+ "Exception while copying column " + VariantPhoenixHelper.VariantColumn.FULL_ANNOTATION.column()
+ " into " + targetColumn);
}

mrExecutor.run(CopyHBaseColumnDriver.class, args, options, "Create new annotation snapshot with name '" + name + '\'');
}

@Override
public void deleteAnnotationSnapshot(String name, ObjectMap options) throws StorageEngineException {
super.deleteAnnotationSnapshot(name, options);
public void deleteAnnotationSnapshot(String name, ObjectMap inputOptions) throws StorageEngineException {
QueryOptions options = getOptions(inputOptions);

String columnFamily = Bytes.toString(dbAdaptor.getGenomeHelper().getColumnFamily());
String targetColumn = VariantPhoenixHelper.getAnnotationSnapshotColumn(name);

String[] args = DeleteHBaseColumnDriver.buildArgs(
dbAdaptor.getTableNameGenerator().getVariantTableName(),
Collections.singletonList(columnFamily + ':' + targetColumn), options);

mrExecutor.run(DeleteHBaseColumnDriver.class, args, options, "Delete annotation snapshot '" + name + '\'');
}

public QueryOptions getOptions(ObjectMap inputOptions) {
QueryOptions options = new QueryOptions(baseOptions);
if (inputOptions != null) {
options.putAll(inputOptions);
}
return options;
}
}
Expand Up @@ -16,7 +16,17 @@

package org.opencb.opencga.storage.hadoop.variant.executors;

import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Tool;
import org.apache.tools.ant.types.Commandline;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
* Created on 18/01/16 .
Expand All @@ -25,6 +35,37 @@
*/
public interface MRExecutor {

default <T extends Tool> void run(Class<T> execClass, String[] args, ObjectMap options, String taskDescription)
throws StorageEngineException {
Logger logger = LoggerFactory.getLogger(MRExecutor.class);

StopWatch stopWatch = new StopWatch();
stopWatch.start();
logger.info("------------------------------------------------------");
logger.info(taskDescription);
logger.info("------------------------------------------------------");
int exitValue = run(execClass, args, options);
logger.info("------------------------------------------------------");
logger.info("Exit value: {}", exitValue);
logger.info("Total time: {}s", (stopWatch.now(TimeUnit.MILLISECONDS)) / 1000.0);

if (exitValue != 0) {
throw new StorageEngineException("Error executing MapReduce for : \"" + taskDescription + "\"");
}
}

default <T extends Tool> int run(Class<T> execClass, String[] args, ObjectMap options) throws StorageEngineException {
String hadoopRoute = options.getString(HadoopVariantStorageEngine.HADOOP_BIN, "hadoop");
String jar = HadoopVariantStorageEngine.getJarWithDependencies(options);
String executable = hadoopRoute + " jar " + jar + ' ' + execClass.getName();
Logger logger = LoggerFactory.getLogger(MRExecutor.class);
if (logger.isDebugEnabled()) {
logger.debug(executable + ' ' + Arrays.toString(args));
}

return run(executable, Commandline.toString(args));
}

default int run(String executable, String[] args) {
return run(executable, Commandline.toString(args));
}
Expand Down

0 comments on commit 89d1321

Please sign in to comment.