Skip to content

Commit

Permalink
Merge branch 'thrift-reimp' of https://github.com/rangadi/elephant-bird
Browse files Browse the repository at this point in the history
… into eb1.2.4
  • Loading branch information
dvryaboy committed Mar 28, 2011
2 parents 2e5e66d + d9afb47 commit fa5e16e
Show file tree
Hide file tree
Showing 14 changed files with 897 additions and 779 deletions.
13 changes: 6 additions & 7 deletions build.xml
Expand Up @@ -38,7 +38,8 @@
<property name="test.junit.fork.mode" value="perTest" />
<property name="test.junit.haltonfailure" value="no" />
<property name="test.junit.maxmemory" value="512m" />

<property name="javac.debug" value="on"/>
<property name="javac.optimize" value="on"/>

<path id="test.classpath">
<pathelement location="${classes.dir}"/>
Expand All @@ -52,13 +53,11 @@
<target name="debug" description="sets properties for debugging (logging on, debug symbols, etc)">
<echo message="Building in debug mode..."/>
<property name="compile.mode" value="debug"/>
<property name="java.debug" value="true"/>
</target>

<target name="release" description="sets properties for release builds.">
<echo message="Building in release mode..."/>
<property name="compile.mode" value="release"/>
<property name="java.debug" value="false"/>
</target>

<target name="noproto" depends="init">
Expand Down Expand Up @@ -87,7 +86,7 @@
<macrodef name="compile-noprotobuf">
<sequential>
<!-- Compile the java code from ${src.java} into ${build} -->
<javac srcdir="${src.java.dir}" destdir="${classes.dir}" debug="${java.debug}" debuglevel="lines,vars,source" excludes="**/*Protobuf*, **/proto/">
<javac srcdir="${src.java.dir}" destdir="${classes.dir}" debug="${javac.debug}" optimize="${javac.optimize}" excludes="**/*Protobuf*, **/proto/">
<classpath>
<fileset dir="lib">
<include name="**/*.jar"/>
Expand All @@ -100,7 +99,7 @@
<macrodef name="compile-nonothing">
<sequential>
<!-- Compile the java code from ${src.java} into ${build} -->
<javac srcdir="${src.java.dir}" destdir="${classes.dir}" debug="${java.debug}" debuglevel="lines,vars,source" excludes="**/*Protobuf*, **/proto/, **/*Thrift*, **/thrift/, **/*Binary*" >
<javac srcdir="${src.java.dir}" destdir="${classes.dir}" debug="${javac.debug}" optimize="${javac.optimize}" excludes="**/*Protobuf*, **/proto/, **/*Thrift*, **/thrift/, **/*Binary*" >
<classpath>
<fileset dir="lib">
<include name="**/*.jar"/>
Expand All @@ -113,7 +112,7 @@
<macrodef name="compile-java-protobuf">
<sequential>
<!-- Compile the java code from ${src.java} into ${build} -->
<javac srcdir="${src.java.dir}" destdir="${classes.dir}" debug="${java.debug}" debuglevel="lines,vars,source" includes="**/*Protobuf*, **/proto/">
<javac srcdir="${src.java.dir}" destdir="${classes.dir}" debug="${javac.debug}" optimize="${javac.optimize}" includes="**/*Protobuf*, **/proto/">
<classpath>
<fileset dir="lib">
<include name="**/*.jar"/>
Expand Down Expand Up @@ -227,7 +226,7 @@
<echo message="classes dir: ${classes.dir}" />
<delete dir="${test.build.classes}"/>
<mkdir dir="${test.build.classes}"/>
<javac srcdir="${test.src.dir}" destdir="${test.build.classes}" deprecation="on" debug="${java.debug}" debuglevel="lines,vars,source">
<javac srcdir="${test.src.dir}" destdir="${test.build.classes}" deprecation="on" debug="${javac.debug}" optimize="${javac.optimize}">
<classpath refid="${test.classpath.id}"/>
<include name="**/*.java" />
</javac>
Expand Down
Expand Up @@ -8,7 +8,7 @@
import com.twitter.data.proto.BlockStorage.SerializedBlock;
import com.twitter.elephantbird.util.Protobufs;

/**
/**
* A class to write blocks of serialized objects.
*/
public abstract class BinaryBlockWriter<M> {
Expand All @@ -26,7 +26,7 @@ protected BinaryBlockWriter(OutputStream out, Class<M> protoClass, BinaryConvert
numRecordsPerBlock_ = numRecordsPerBlock;
protobufClass_ = protoClass;
protoConverter_ = protoConverter;

builder_ = reinitializeBlockBuilder();
}

Expand All @@ -37,7 +37,7 @@ public void write(M message) throws IOException {
} else {
builder_.addProtoBlobs(ByteString.copyFrom(protoConverter_.toBytes(message)));
}

numRecordsWritten_++;

if (builder_.getProtoBlobsCount() == numRecordsPerBlock_) {
Expand All @@ -52,14 +52,14 @@ public SerializedBlock.Builder reinitializeBlockBuilder() {
.setProtoClassName(protobufClass_.getCanonicalName());
}


public void finish() throws IOException {
if (builder_.getProtoBlobsCount() > 0) {
serialize();
}
}

public void close() throws IOException {
finish();
out_.close();
}

Expand Down
Expand Up @@ -7,9 +7,6 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.elephantbird.mapreduce.io.BinaryConverter;
import com.twitter.elephantbird.mapreduce.io.ThriftConverter;
Expand All @@ -21,7 +18,6 @@
* Loader for LZO files with line-oriented base64 encoded Thrift objects.
*/
public class LzoThriftB64LinePigLoader<M extends TBase<?, ?>> extends LzoBinaryB64LinePigLoader {
private static final Logger LOG = LoggerFactory.getLogger(LzoThriftB64LinePigLoader.class);

private final TypeRef<M> typeRef_;
private ThriftConverter<M> converter_;
Expand All @@ -36,11 +32,7 @@ public LzoThriftB64LinePigLoader(String thriftClassName) {
public Tuple fromBytes(byte[] messageBuffer) {
M value = converter_.fromBytes(messageBuffer);
if (value != null) {
try {
return thriftToPig_.getPigTuple(value);
} catch (TException e) {
LOG.warn("ThriftToTuple error :", e); // may be struct mismatch
}
return thriftToPig_.getLazyTuple(value);
}
return null;
}
Expand Down
Expand Up @@ -8,9 +8,6 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Pair;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.elephantbird.mapreduce.io.ThriftBlockReader;
import com.twitter.elephantbird.pig.util.PigUtil;
Expand All @@ -19,22 +16,19 @@


public class LzoThriftBlockPigLoader<M extends TBase<?, ?>> extends LzoBaseLoadFunc {
private static final Logger LOG = LoggerFactory.getLogger(LzoThriftBlockPigLoader.class);

private final TypeRef<M> typeRef_;
private final ThriftToPig<M> thriftToPig_;
private ThriftBlockReader<M> reader_;

private Pair<String, String> thriftStructsRead;
private Pair<String, String> thriftErrors;

public LzoThriftBlockPigLoader(String thriftClassName) {
typeRef_ = PigUtil.getThriftTypeRef(thriftClassName);
thriftToPig_ = ThriftToPig.newInstance(typeRef_);

String group = "LzoBlocks of " + typeRef_.getRawClass().getName();
thriftStructsRead = new Pair<String, String>(group, "Thrift Structs Read");
thriftErrors = new Pair<String, String>(group, "Errors");

setLoaderSpec(getClass(), new String[]{thriftClassName});
}
Expand Down Expand Up @@ -72,15 +66,8 @@ public Tuple getNext() throws IOException {

M value;
while ((value = reader_.readNext()) != null) {
try {
Tuple t = thriftToPig_.getPigTuple(value);
incrCounter(thriftStructsRead, 1L);
return t;
} catch (TException e) {
incrCounter(thriftErrors, 1L);
LOG.warn("ThriftToTuple error :", e); // may be corrupt data.
// try next
}
incrCounter(thriftStructsRead, 1L);
return thriftToPig_.getLazyTuple(value);
}
return null;
}
Expand Down
Expand Up @@ -43,9 +43,7 @@ public Tuple exec(Tuple input) throws IOException {
try {
DataByteArray bytes = (DataByteArray) input.get(0);
M value = thriftConverter.fromBytes(bytes.get());
return thriftToPig.getPigTuple(value);
} catch (TException e) {
throw new IOException(e);
return value == null ? null : thriftToPig.getPigTuple(value);
} catch (IOException e) {
return null;
}
Expand Down
@@ -0,0 +1,41 @@
package com.twitter.elephantbird.pig.store;

import java.io.IOException;

import org.apache.commons.codec.binary.Base64;
import org.apache.pig.data.Tuple;
import org.apache.thrift.TBase;

import com.twitter.elephantbird.mapreduce.io.ThriftConverter;
import com.twitter.elephantbird.pig.util.PigToThrift;
import com.twitter.elephantbird.pig.util.PigUtil;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;

/**
* Serializes Pig Tuples into Base-64 encoded, line-delimited Thrift objects.
* The fields in the pig tuple must correspond exactly to the fields in
* the Thrift object, as no name-matching is performed (names of the tuple
* fields are not currently accessible to a StoreFunc. It will be in 0.7,
* so something more flexible will be possible)
*/
public class LzoThriftB64LinePigStorage<T extends TBase<?, ?>> extends LzoBaseStoreFunc {

private TypeRef<T> typeRef;
private Base64 base64 = new Base64();
private PigToThrift<T> pigToThrift;
private ThriftConverter<T> converter;

public LzoThriftB64LinePigStorage(String thriftClassName) {
typeRef = PigUtil.getThriftTypeRef(thriftClassName);
pigToThrift = PigToThrift.newInstance(typeRef);
converter = ThriftConverter.newInstance(typeRef);
}

public void putNext(Tuple f) throws IOException {
if (f == null) return;
T tObj = pigToThrift.getThriftObject(f);
os_.write(base64.encode(converter.toBytes(tObj)));
os_.write(Protobufs.NEWLINE_UTF8_BYTE);
}
}
@@ -0,0 +1,52 @@
package com.twitter.elephantbird.pig.store;

import java.io.IOException;

import org.apache.pig.data.Tuple;
import org.apache.thrift.TBase;

import com.twitter.elephantbird.mapreduce.io.ThriftBlockWriter;
import com.twitter.elephantbird.pig.util.PigToThrift;
import com.twitter.elephantbird.pig.util.PigUtil;
import com.twitter.elephantbird.util.TypeRef;
import java.io.OutputStream;

/**
* Serializes Pig Tuples into Block encoded Thrift objects.
* The fields in the pig tuple must correspond exactly to the fields in
* Thrift struct, as no name-matching is performed (names of the tuple
* fields are not currently accessible to a StoreFunc.
* It will be in 0.7, so something more flexible will be possible)
*/
public class LzoThriftBlockPigStorage<T extends TBase<?, ?>> extends LzoBaseStoreFunc {

private TypeRef<T> typeRef;
private ThriftBlockWriter<T> writer;
private PigToThrift<T> pigToThrift;
private int numRecordsPerBlock = 10000; // is this too high?

public LzoThriftBlockPigStorage(String thriftClassName) {
typeRef = PigUtil.getThriftTypeRef(thriftClassName);
pigToThrift = PigToThrift.newInstance(typeRef);
}

@Override
public void bindTo(OutputStream os) throws IOException {
super.bindTo(os);
writer = new ThriftBlockWriter<T>(os_, typeRef.getRawClass(), numRecordsPerBlock);
}

@Override
public void putNext(Tuple f) throws IOException {
if (f == null) return;
writer.write(pigToThrift.getThriftObject(f));
}

@Override
public void finish() throws IOException {
if (writer != null) {
writer.close();
}
super.finish();
}
}

0 comments on commit fa5e16e

Please sign in to comment.