Skip to content

Commit

Permalink
Merge branch 'eb-dev' into rc_file_exp
Browse files Browse the repository at this point in the history
Conflicts:
	src/java/com/twitter/elephantbird/pig/load/LzoBaseLoadFunc.java
	src/test/com/twitter/elephantbird/pig/piggybank/TestThriftToPig.java
  • Loading branch information
Raghu Angadi committed Nov 28, 2011
2 parents b784885 + 342f336 commit 3d97a41
Show file tree
Hide file tree
Showing 22 changed files with 674 additions and 66 deletions.
4 changes: 2 additions & 2 deletions build.xml
Expand Up @@ -40,6 +40,7 @@
<property name="test.junit.maxmemory" value="512m" /> <property name="test.junit.maxmemory" value="512m" />
<property name="javac.debug" value="on"/> <property name="javac.debug" value="on"/>
<property name="javac.optimize" value="on"/> <property name="javac.optimize" value="on"/>
<property name="test.library.path" value="none"/>


<path id="test.classpath"> <path id="test.classpath">
<pathelement location="${classes.dir}"/> <pathelement location="${classes.dir}"/>
Expand Down Expand Up @@ -263,8 +264,7 @@
<sysproperty key="test.log.dir" value="${test.log.dir}"/> <sysproperty key="test.log.dir" value="${test.log.dir}"/>
<sysproperty key="test.source.dir" value="${test.src.dir}"/> <sysproperty key="test.source.dir" value="${test.src.dir}"/>
<sysproperty key="test.build.extraconf" value="${test.build.extraconf}" /> <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
<sysproperty key="java.library.path" <sysproperty key="java.library.path" value="${test.library.path}"/>
value="${lib.dir}/*.jar:${classes.dir}:${test.build.classes}:${basedir}/bin"/>
<classpath refid="${test.classpath.id}"/> <classpath refid="${test.classpath.id}"/>
<formatter type="${test.junit.output.format}" /> <formatter type="${test.junit.output.format}" />
<batchtest todir="${test.log.dir}" unless="testcase"> <batchtest todir="${test.log.dir}" unless="testcase">
Expand Down
Expand Up @@ -89,14 +89,23 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
public static boolean decodeLineToJson(JSONParser parser, Text line, MapWritable value) { public static boolean decodeLineToJson(JSONParser parser, Text line, MapWritable value) {
try { try {
JSONObject jsonObj = (JSONObject)parser.parse(line.toString()); JSONObject jsonObj = (JSONObject)parser.parse(line.toString());
for (Object key: jsonObj.keySet()) { if (jsonObj != null) {
Text mapKey = new Text(key.toString()); for (Object key: jsonObj.keySet()) {
Text mapValue = new Text(); Text mapKey = new Text(key.toString());
if (jsonObj.get(key) != null) { Text mapValue = new Text();
mapValue.set(jsonObj.get(key).toString()); if (jsonObj.get(key) != null) {
mapValue.set(jsonObj.get(key).toString());
}

value.put(mapKey, mapValue);
} }

}
value.put(mapKey, mapValue); else {
// JSONParser#parse(String) may return a null reference, e.g. when
// the input parameter is the string "null". A single line with
// "null" is not valid JSON though.
LOG.warn("Could not json-decode string: " + line);
return false;
} }
return true; return true;
} catch (ParseException e) { } catch (ParseException e) {
Expand Down
@@ -0,0 +1,170 @@
package com.twitter.elephantbird.mapreduce.input;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.thrift.TBase;

import com.google.protobuf.Message;
import com.twitter.data.proto.BlockStorage.SerializedBlock;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephantbird.util.HadoopUtils;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;

/**
* The input could consist of heterogeneous mix of formats storing
* compatible objects. Currently supported formats:
* <ol>
* <li> Lzo Block storage of Thrift and Protobuf objects
* <li> Lzo B64Line storage of Thrift and Protobuf objects
* </ol>
*
* <p>
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
*/
public class MultiInputFormat<M>
extends LzoInputFormat<LongWritable, BinaryWritable<M>> {

// TODO need handle multiple input formats in a job better.
// might be better to store classname in the input split rather than in config.
private static String CLASS_CONF_KEY = "elephantbird.class.for.MultiInputFormat";

private TypeRef<M> typeRef;

public MultiInputFormat() {}

public MultiInputFormat(TypeRef<M> typeRef) {
this.typeRef = typeRef;
}

private static enum Format {
LZO_BLOCK,
LZO_B64LINE;
};

/**
* Sets jobs input format to {@link MultiInputFormat} and stores
* supplied clazz's name in job configuration. This configuration is
* read on the remote tasks to initialize the input format correctly.
*/
public static void setInputFormatClass(Class<?> clazz, Job job) {
job.setInputFormatClass(MultiInputFormat.class);
HadoopUtils.setInputFormatClass(job.getConfiguration(), CLASS_CONF_KEY, clazz);
}

@SuppressWarnings("unchecked") // return type is runtime dependent
@Override
public RecordReader<LongWritable, BinaryWritable<M>>
createRecordReader(InputSplit split, TaskAttemptContext taskAttempt)
throws IOException, InterruptedException {
Configuration conf = taskAttempt.getConfiguration();
if (typeRef == null) {
setTypeRef(conf);
}
Class<?> recordClass = typeRef.getRawClass();

Format fileFormat = determineFileFormat(split, conf);

// Thrift
if (TBase.class.isAssignableFrom(recordClass)) {
switch (fileFormat) {
case LZO_BLOCK:
return new LzoThriftBlockRecordReader(typeRef);
case LZO_B64LINE:
return new LzoThriftB64LineRecordReader(typeRef);
}
}

// Protobuf
if (Message.class.isAssignableFrom(recordClass)) {
switch (fileFormat) {
case LZO_BLOCK:
return new LzoProtobufBlockRecordReader(typeRef);
case LZO_B64LINE:
return new LzoProtobufBlockRecordReader(typeRef);
}
}

throw new IOException( "could not determine reader for "
+ ((FileSplit)split).getPath() + " with class " + recordClass.getName());
}

/** set typeRef from conf */
private void setTypeRef(Configuration conf) {
String className = conf.get(CLASS_CONF_KEY);

if (className == null) {
throw new RuntimeException(CLASS_CONF_KEY + " is not set");
}

Class<?> clazz = null;
try {
clazz = conf.getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("failed to instantiate class '" + className + "'", e);
}

typeRef = new TypeRef<M>(clazz){};
}

/**
* Checks to see if the input records are stored as {@link SerializedBlock}.
* The block format starts with {@link Protobufs#KNOWN_GOOD_POSITION_MARKER}.
* Otherwise the input is assumed to be Base64 encoded lines.
*/
private static Format determineFileFormat(InputSplit split,
Configuration conf)
throws IOException {
FileSplit fileSplit = (FileSplit)split;

Path file = fileSplit.getPath();

/* we could have a an optional configuration that maps a regex on a
* file name to a format. E.g. ".*-block.lzo" to LZO_BLOCK file.
*/

// most of the cost is opening the file and
// reading first lzo block (about 256k of compressed data)

CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(file);
if (codec == null) {
throw new IOException("No codec for file " + file + " found");
}

InputStream in = file.getFileSystem(conf).open(file);
InputStream lzoIn = null;

// check if the file starts with magic bytes for Block storage format.
try {
lzoIn = codec.createInputStream(in);

for(byte magic : Protobufs.KNOWN_GOOD_POSITION_MARKER) {
int b = lzoIn.read();
if (b < 0 || (byte)b != magic) {
return Format.LZO_B64LINE;
}
}
} finally {
IOUtils.closeStream(lzoIn);
IOUtils.closeStream(in);
}

// the check passed
return Format.LZO_BLOCK;
}
}


Expand Up @@ -5,12 +5,15 @@


import com.twitter.elephantbird.mapreduce.io.BinaryConverter; import com.twitter.elephantbird.mapreduce.io.BinaryConverter;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable; import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephantbird.mapreduce.io.ThriftConverter;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import com.twitter.elephantbird.util.Codecs; import com.twitter.elephantbird.util.Codecs;
import com.twitter.elephantbird.util.Protobufs; import com.twitter.elephantbird.util.Protobufs;


import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TBase;


/** /**
* A RecordWriter-derived class for use with the LzoProtobufB64LineOutputFormat. * A RecordWriter-derived class for use with the LzoProtobufB64LineOutputFormat.
Expand Down Expand Up @@ -43,4 +46,11 @@ public void close(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException { throws IOException, InterruptedException {
out.close(); out.close();
} }

// for convenience
public static <M extends TBase<?, ?>> LzoBinaryB64LineRecordWriter<M, ThriftWritable<M>>
newThriftWriter(Class<M> tClass, DataOutputStream out) {
return new LzoBinaryB64LineRecordWriter<M, ThriftWritable<M>>
(ThriftConverter.newInstance(tClass), out);
}
} }
144 changes: 144 additions & 0 deletions src/java/com/twitter/elephantbird/pig/load/FilterLoadFunc.java
@@ -0,0 +1,144 @@
package com.twitter.elephantbird.pig.load;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.Expression;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;

/**
* A wrapper LoadFunc that delegates all the functionality to another loader.
* Similar to a FilterInputStream.
*/
public class FilterLoadFunc extends LoadFunc implements LoadMetadata, LoadPushDown {

protected LoadFunc loader;

/**
* @param loader This could be null. It may not be feasible to set
* loader during construction. It can be set later with setLoader()
*/
public FilterLoadFunc(LoadFunc loader) {
this.loader = loader;
}

public void setLoader(LoadFunc loader) {
this.loader = loader;
}

// just for readability
private boolean isSet() {
return loader != null;
}
// LoadFunc implementation:

@Override @SuppressWarnings("unchecked")
public InputFormat getInputFormat() throws IOException {
return isSet() ? loader.getInputFormat() : null;
}

@Override
public LoadCaster getLoadCaster() throws IOException {
return isSet() ? loader.getLoadCaster() : null;
}

@Override
public Tuple getNext() throws IOException {
return isSet() ? loader.getNext() : null;
}

@Override @SuppressWarnings("unchecked")
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
if (isSet()) {
loader.prepareToRead(reader, split);
}
}

@Override
public String relativeToAbsolutePath(String location, Path curDir)
throws IOException {
return isSet() ?
loader.relativeToAbsolutePath(location, curDir):
super.relativeToAbsolutePath(location, curDir);
}

@Override
public void setLocation(String location, Job job) throws IOException {
if (isSet()) {
loader.setLocation(location, job);
}
}

@Override
public void setUDFContextSignature(String signature) {
if (isSet()) {
loader.setUDFContextSignature(signature);
} else {
super.setUDFContextSignature(signature);
}
}

// LoadMetadata & LoadPushDown interface.

// helpers for casting:
private static LoadMetadata asLoadMetadata(LoadFunc loader) {
return loader instanceof LoadMetadata ? (LoadMetadata) loader : null;
}

private static LoadPushDown asLoadPushDown(LoadFunc loader) {
return loader instanceof LoadPushDown ? (LoadPushDown) loader : null;
}


@Override
public String[] getPartitionKeys(String location, Job job) throws IOException {
LoadMetadata metadata = asLoadMetadata(loader);
return metadata == null ? null : metadata.getPartitionKeys(location, job);
}

@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
LoadMetadata metadata = asLoadMetadata(loader);
return metadata == null ? null : metadata.getSchema(location, job);
}

@Override
public ResourceStatistics getStatistics(String location, Job job) throws IOException {
LoadMetadata metadata = asLoadMetadata(loader);
return metadata == null ? null : metadata.getStatistics(location, job);
}

@Override
public void setPartitionFilter(Expression partitionFilter) throws IOException {
LoadMetadata metadata = asLoadMetadata(loader);
if ( metadata != null ) {
metadata.setPartitionFilter(partitionFilter);
}
}

@Override
public List<OperatorSet> getFeatures() {
LoadPushDown pushDown = asLoadPushDown(loader);
return pushDown == null ? null : pushDown.getFeatures();
}

@Override
public RequiredFieldResponse pushProjection(
RequiredFieldList requiredFieldList) throws FrontendException {
LoadPushDown pushDown = asLoadPushDown( loader );
return pushDown == null ? null : pushDown.pushProjection( requiredFieldList );
}

}

0 comments on commit 3d97a41

Please sign in to comment.