Skip to content

Commit

Permalink
storage: Implement annotation snapshots for storage-hadoop. #830
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Apr 26, 2018
1 parent 2eac0d8 commit 3177cab
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 114 deletions.
Expand Up @@ -91,9 +91,8 @@ public class DefaultVariantAnnotationManager implements VariantAnnotationManager
protected static Logger logger = LoggerFactory.getLogger(DefaultVariantAnnotationManager.class);

public DefaultVariantAnnotationManager(VariantAnnotator variantAnnotator, VariantDBAdaptor dbAdaptor) {
if (dbAdaptor == null || variantAnnotator == null) {
throw new NullPointerException();
}
Objects.requireNonNull(variantAnnotator);
Objects.requireNonNull(dbAdaptor);
this.dbAdaptor = dbAdaptor;
this.variantAnnotator = variantAnnotator;
}
Expand Down
Expand Up @@ -6,7 +6,6 @@
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.opencga.storage.core.config.StorageConfiguration;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.annotation.annotators.VariantAnnotator;
Expand All @@ -26,12 +25,12 @@
*/
public abstract class VariantAnnotationSnapshotTest extends VariantStorageBaseTest {


@Test
public void testMultiAnnotations() throws Exception {

VariantStorageEngine variantStorageEngine = getVariantStorageEngine();
runDefaultETL(smallInputUri, variantStorageEngine, newStudyConfiguration(), new ObjectMap(VariantStorageEngine.Options.ANNOTATE.key(), false));
runDefaultETL(smallInputUri, variantStorageEngine, newStudyConfiguration(),
new ObjectMap(VariantStorageEngine.Options.ANNOTATE.key(), false));

variantStorageEngine.getOptions()
.append(VARIANT_ANNOTATOR_CLASSNAME, TestAnnotator.class.getName())
Expand All @@ -55,8 +54,7 @@ public void testMultiAnnotations() throws Exception {
assertEquals(0, variantStorageEngine.getAnnotation("v1", null).getResult().size());
}

public void checkAnnotationSnapshot(VariantStorageEngine variantStorageEngine, String name, String expectedId)
throws StorageEngineException, VariantAnnotatorException {
public void checkAnnotationSnapshot(VariantStorageEngine variantStorageEngine, String name, String expectedId) throws Exception {
int count = 0;
for (VariantAnnotation annotation: variantStorageEngine.getAnnotation(name, null).getResult()) {
assertEquals(expectedId, annotation.getId());
Expand Down
@@ -0,0 +1,157 @@
package org.opencb.opencga.storage.hadoop.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.opencb.commons.datastore.core.ObjectMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Created on 24/04/18.
*
* @author Jacobo Coll <jacobo167@gmail.com>
*/
public abstract class AbstractHBaseDriver extends Configured implements Tool {

public static final String COLUMNS_TO_COUNT = "columns_to_count";
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHBaseDriver.class);
protected String table;

protected abstract String getJobName();

protected abstract void setupJob(Job job, String table) throws IOException;

protected void parseAndValidateParameters() {
}

protected String getUsage() {
return "Usage: " + getClass().getSimpleName()
+ " [generic options] <table> (<key> <value>)*";
}

protected int getFixedSizeArgs() {
return 1;
}

protected void parseAndValidateArgs(String[] args) {
}

protected void preExecution() {

}

protected void postExecution(boolean succeed) {

}

@Override
public int run(String[] args) throws Exception {

Configuration conf = getConf();
HBaseConfiguration.addHbaseResources(conf);
getConf().setClassLoader(AbstractHBaseDriver.class.getClassLoader());
configFromArgs(args);
parseAndValidateParameters();

// int maxKeyValueSize = conf.getInt(HadoopVariantStorageEngine.MAPREDUCE_HBASE_KEYVALUE_SIZE_MAX, 10485760); // 10MB
// logger.info("HBASE: set " + ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY + " to " + maxKeyValueSize);
// conf.setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, maxKeyValueSize); // always overwrite server default (usually 1MB)

/* -------------------------------*/
// JOB setup
Job job = Job.getInstance(getConf(), getJobName());
job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
job.setJarByClass(AbstractHBaseDriver.class); // class that contains mapper

// // Increase the ScannerTimeoutPeriod to avoid ScannerTimeoutExceptions
// // See opencb/opencga#352 for more info.
// int scannerTimeout = getConf().getInt(MAPREDUCE_HBASE_SCANNER_TIMEOUT,
// getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
// logger.info("Set Scanner timeout to " + scannerTimeout + " ...");
// job.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scannerTimeout);

setupJob(job, table);


preExecution();

boolean succeed = executeJob(job);
if (!succeed) {
LOGGER.error("error with job!");
}
postExecution(succeed);

return succeed ? 0 : 1;
}

protected final void configFromArgs(String[] args) {
int fixedSizeArgs = getFixedSizeArgs();

System.out.println(Arrays.toString(args));
if (args.length < fixedSizeArgs || (args.length - fixedSizeArgs) % 2 != 0) {
System.err.println(getUsage());
System.err.println("Found " + Arrays.toString(args));
ToolRunner.printGenericCommandUsage(System.err);
throw new IllegalArgumentException("Wrong number of arguments!");
}

// Get first other args to avoid overwrite the fixed position args.
for (int i = fixedSizeArgs; i < args.length; i = i + 2) {
getConf().set(args[i], args[i + 1]);
}

table = args[0];

/* -------------------------------*/
// Validate parameters CHECK
if (StringUtils.isEmpty(table)) {
throw new IllegalArgumentException("No table specified!");
}

parseAndValidateArgs(args);
}

private boolean executeJob(Job job) throws IOException, InterruptedException, ClassNotFoundException {
Thread hook = new Thread(() -> {
try {
if (!job.isComplete()) {
job.killJob();
}
// onError();
} catch (IOException e) {
LOGGER.error("Error", e);
}
});
try {
Runtime.getRuntime().addShutdownHook(hook);
return job.waitForCompletion(true);
} finally {
Runtime.getRuntime().removeShutdownHook(hook);
}
}

public static String[] buildArgs(String table, ObjectMap options) {
List<String> args = new ArrayList<>(1 + options.size() * 2);

args.add(table);
for (String key : options.keySet()) {
String value = options.getString(key);
if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
args.add(key);
args.add(value);
}
}
return args.toArray(new String[args.size()]);
}

}
@@ -0,0 +1,154 @@
package org.opencb.opencga.storage.hadoop.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine;
import org.opencb.opencga.storage.hadoop.variant.mr.VariantMapReduceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;

/**
* Created on 24/04/18.
*
* @author Jacobo Coll &lt;jacobo167@gmail.com&gt;
*/
public class CopyHBaseColumnDriver extends AbstractHBaseDriver {

public static final String COLUMNS_TO_COPY = "columnsToCopy";
public static final String DELETE_AFTER_COPY = "deleteAfterCopy";
private Map<String, String> columnsToCopyMap;
private static final Logger LOGGER = LoggerFactory.getLogger(CopyHBaseColumnDriver.class);

public CopyHBaseColumnDriver() {
}

public CopyHBaseColumnDriver(Configuration conf) {
if (conf != null) {
setConf(conf);
}
}

@Override
protected String getJobName() {
return "opencga: copy columns from table '" + table + '\'';
}

@Override
protected void setupJob(Job job, String table) throws IOException {
Scan scan = new Scan();
int caching = job.getConfiguration().getInt(HadoopVariantStorageEngine.MAPREDUCE_HBASE_SCAN_CACHING, 100);

LOGGER.info("Scan set Caching to " + caching);
scan.setCaching(caching); // 1 is the default in Scan
scan.setCacheBlocks(false); // don't set to true for MR jobs
for (String column : columnsToCopyMap.keySet()) {
String[] split = column.split(":");
scan.addColumn(Bytes.toBytes(split[0]), Bytes.toBytes(split[1]));
}

// There is a maximum number of counters
int newSize = Math.min(columnsToCopyMap.size(), 80);
job.getConfiguration().setStrings(COLUMNS_TO_COUNT, new ArrayList<>(columnsToCopyMap.keySet())
.subList(0, newSize).toArray(new String[newSize]));

// set other scan attrs
VariantMapReduceUtil.initTableMapperJob(job, table, table, scan, CopyHBaseColumnMapper.class);
}

@Override
protected void parseAndValidateParameters() {
columnsToCopyMap = getColumnsToCopy(getConf());
}

public static String[] buildArgs(String table, Map<String, String> columnsToCopyMap, ObjectMap options) {
options = options == null ? new ObjectMap() : new ObjectMap(options);
if (columnsToCopyMap == null || columnsToCopyMap.isEmpty()) {
throw new IllegalArgumentException("Invalid empty ColumnsToCopy");
}
options.put(COLUMNS_TO_COPY, columnsToCopyMap.entrySet()
.stream()
.map(entry -> entry.getKey() + '=' + entry.getValue())
.collect(Collectors.joining(",")));
return AbstractHBaseDriver.buildArgs(table, options);
}

private static Map<String, String> getColumnsToCopy(Configuration conf) {
final String malformedColumnsToCopyMessage = "Invalid list of src=target columns to copy.";

String columnsToCopy = conf.get(COLUMNS_TO_COPY);
Map<String, String> columnsToCopyMap = new HashMap<>();
for (String pair : columnsToCopy.split(",")) {
String[] split = pair.split("=");
if (split.length != 2) {
throw new IllegalArgumentException(malformedColumnsToCopyMessage
+ " Missing target. '" + pair + '\'');
}
String src = split[0];
String dest = split[1];
if (!src.contains(":") || !dest.contains(":")) {
throw new IllegalArgumentException(malformedColumnsToCopyMessage
+ " Missing family. '" + pair + '\'');
}
if (src.equals(dest)) {
throw new IllegalArgumentException(malformedColumnsToCopyMessage
+ " Source and target can not be the same. '" + pair + '\'');
}
columnsToCopyMap.put(src, dest);
}
if (columnsToCopyMap.isEmpty()) {
throw new IllegalArgumentException(malformedColumnsToCopyMessage + " Empty.");
}
return columnsToCopyMap;
}


public static class CopyHBaseColumnMapper extends TableMapper<ImmutableBytesWritable, Mutation> {

private Map<String, String> columnsToCopy;
private Set<String> columnsToCount;
private boolean deleteAfterCopy;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
columnsToCopy = getColumnsToCopy(context.getConfiguration());
deleteAfterCopy = context.getConfiguration().getBoolean(DELETE_AFTER_COPY, false);
columnsToCount = new HashSet<>(Arrays.asList(context.getConfiguration().getStrings(COLUMNS_TO_COUNT)));

}

@Override
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
for (Cell cell : result.rawCells()) {
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
String c = Bytes.toString(family) + ":" + Bytes.toString(qualifier);
String[] split = columnsToCopy.get(c).split(":", 2);

context.write(key, new Put(result.getRow())
.addColumn(Bytes.toBytes(split[0]), ByteBuffer.wrap(Bytes.toBytes(split[1])), HConstants.LATEST_TIMESTAMP,
CellUtil.getValueBufferShallowCopy(cell)));

if (deleteAfterCopy) {
context.write(key, new Delete(result.getRow()).addColumn(family, qualifier));
}
if (columnsToCount.contains(c)) {
context.getCounter("CopyColumn", c).increment(1);
}
}
}
}

}

0 comments on commit 3177cab

Please sign in to comment.