Permalink
Browse files

Add support for other serialization formats. You'll have to implement…

… the RecordWriterProvider interface and specify the implementation by setting the etl.record.writer.provider.class property in your camus properties file, e.g.

etl.record.writer.provider.class=com.jivesoftware.jive.platform.kafka.camus.JSONRecordWriterProvider
  • Loading branch information...
1 parent 93344cf commit 87917a2aea46da9d21c8f67129f6463af52f7aa8 @sam-meder sam-meder committed Jun 24, 2013
@@ -7,7 +7,7 @@
/**
* Container for messages. Enables the use of a custom message decoder with knowledge
* of where these values are stored in the message schema
- *
+ *
* @author kgoodhop
*
* @param <R> The type of decoded payload
@@ -20,11 +20,11 @@
public CamusWrapper(R record) {
this(record, System.currentTimeMillis());
}
-
+
public CamusWrapper(R record, long timestamp) {
this(record, timestamp, "unknown_server", "unknown_service");
}
-
+
public CamusWrapper(R record, long timestamp, String server, String service) {
this.record = record;
this.timestamp = timestamp;
@@ -58,14 +58,14 @@ public void put(Writable key, Writable value) {
/**
* Get a value for partitions
- * @returns the value for the given key
+ * @return the value for the given key
*/
public Writable get(Writable key) {
return partitionMap.get(key);
}
/**
- * Get all the parititon key/partitionMap
+ * Get all the partition key/partitionMap
*/
public MapWritable getPartitionMap() {
return partitionMap;
@@ -3,8 +3,8 @@
import java.util.Properties;
/**
- * Decoder interface. Implemenations should return a CamusWrapper with timestamp
- * set at the very least. Camus will instantiate a descendent of this class
+ * Decoder interface. Implementations should return a CamusWrapper with timestamp
+ * set at the very least. Camus will instantiate a descendent of this class
* based on the property ccamus.message.decoder.class.
* @author kgoodhop
*
@@ -14,7 +14,7 @@
public abstract class MessageDecoder<M,R> {
protected Properties props;
protected String topicName;
-
+
public void init(Properties props, String topicName){
this.props = props;
this.topicName = topicName;
@@ -0,0 +1,20 @@
+package com.linkedin.camus.etl;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+/**
+ *
+ *
+ */
+public interface RecordWriterProvider {
+
+ String getFilenameExtension();
+
+ RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
+ TaskAttemptContext context, String fileName, CamusWrapper data, FileOutputCommitter committer) throws IOException,
+ InterruptedException;
+}
@@ -1,5 +1,12 @@
package com.linkedin.camus.etl.kafka;
+import com.linkedin.camus.etl.kafka.common.DateUtils;
+import com.linkedin.camus.etl.kafka.common.EtlCounts;
+import com.linkedin.camus.etl.kafka.common.EtlKey;
+import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
+import com.linkedin.camus.etl.kafka.common.Source;
+import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
+import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -15,8 +22,6 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TreeMap;
-
-import org.apache.avro.mapred.AvroWrapper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -47,21 +52,13 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
-import com.linkedin.camus.etl.kafka.common.DateUtils;
-import com.linkedin.camus.etl.kafka.common.EtlCounts;
-import com.linkedin.camus.etl.kafka.common.EtlKey;
-import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
-import com.linkedin.camus.etl.kafka.common.Source;
-import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
-import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.type.TypeReference;
-
public class CamusJob extends Configured implements Tool {
public static final String ETL_EXECUTION_BASE_PATH = "etl.execution.base.path";
@@ -234,9 +231,6 @@ public void run() throws Exception {
job.setInputFormatClass(EtlInputFormat.class);
job.setOutputFormatClass(EtlMultiOutputFormat.class);
- job.setOutputKeyClass(EtlKey.class);
- job.setOutputValueClass(AvroWrapper.class);
-
job.setNumReduceTasks(0);
stopTiming("pre-setup");
@@ -255,15 +249,15 @@ public void run() throws Exception {
stopTiming("hadoop");
startTiming("commit");
-
+
//Send Tracking counts to Kafka
sendTrackingCounts(job, fs,newExecutionOutput);
-
+
//Print any potentail errors encountered
printErrors(fs, newExecutionOutput);
-
+
fs.rename(newExecutionOutput, execHistory);
-
+
System.out.println("Job finished");
stopTiming("commit");
stopTiming("total");
@@ -301,7 +295,7 @@ public void printErrors(FileSystem fs, Path newExecutionOutput) throws IOExcepti
reader.close();
}
}
-
+
//Posts the tracking counts to Kafka
public void sendTrackingCounts(JobContext job, FileSystem fs, Path newExecutionOutput) throws IOException, URISyntaxException
{
@@ -391,11 +385,11 @@ public void sendTrackingCounts(JobContext job, FileSystem fs, Path newExecutionO
}
}
}
-
+
/**
* Creates a diagnostic report mostly focused on timing breakdowns. Useful
* for determining where to optimize.
- *
+ *
* @param job
* @param timingMap
* @throws IOException
@@ -26,6 +26,5 @@ public String generatePartitionedPath(JobContext context, String topic, int brok
DateTime bucket = new DateTime(Long.valueOf(encodedPartition));
sb.append(bucket.toString(OUTPUT_DATE_FORMAT));
return sb.toString();
-
}
}
@@ -0,0 +1,67 @@
+package com.linkedin.camus.etl.kafka.common;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.etl.IEtlKey;
+import com.linkedin.camus.etl.RecordWriterProvider;
+import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
+import java.io.IOException;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ *
+ *
+ */
+public class AvroRecordWriterProvider implements RecordWriterProvider {
+ public final static String EXT = ".avro";
+
+ @Override
+ public String getFilenameExtension() {
+ return EXT;
+ }
+
+ @Override
+ public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
+ TaskAttemptContext context,
+ String fileName,
+ CamusWrapper data,
+ FileOutputCommitter committer) throws IOException, InterruptedException {
+ final DataFileWriter<Object> writer = new DataFileWriter<Object>(
+ new SpecificDatumWriter<Object>());
+
+ if (FileOutputFormat.getCompressOutput(context)) {
+ if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
+ writer.setCodec(CodecFactory.snappyCodec());
+ } else {
+ int level = EtlMultiOutputFormat.getEtlDeflateLevel(context);
+ writer.setCodec(CodecFactory.deflateCodec(level));
+ }
+ }
+
+ Path path = committer.getWorkPath();
+ path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));
+ writer.create(((GenericRecord) data.getRecord()).getSchema(),
+ path.getFileSystem(context.getConfiguration()).create(path));
+
+ writer.setSyncInterval(EtlMultiOutputFormat.getEtlAvroWriterSyncInterval(context));
+
+ return new RecordWriter<IEtlKey, CamusWrapper>() {
+ @Override
+ public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
+ writer.append(data.getRecord());
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ writer.close();
+ }
+ };
+ }
+}
@@ -23,7 +23,7 @@
/**
* Poorly named class that handles kafka pull events within each
* KafkaRecordReader.
- *
+ *
* @author Richard Park
*/
public class KafkaReader {
@@ -46,7 +46,7 @@
private int fetchBufferSize;
/**
- * Construct using the json represention of the kafka request
+ * Construct using the json representation of the kafka request
*/
public KafkaReader(TaskAttemptContext context, EtlRequest request,
int clientTimeout, int fetchBufferSize) throws Exception {
@@ -90,7 +90,7 @@ public boolean hasNext() throws IOException {
/**
* Fetches the next Kafka message and stuffs the results into the key and
* value
- *
+ *
* @param key
* @param payload
* @param pKey
@@ -108,7 +108,7 @@ public boolean getNext(EtlKey key, BytesWritable payload ,BytesWritable pKey) th
byte[] bytes = new byte[origSize];
buf.get(bytes, buf.position(), origSize);
payload.set(bytes, 0, origSize);
-
+
buf = message.key();
if(buf != null){
origSize = buf.remaining();
@@ -133,7 +133,7 @@ public boolean getNext(EtlKey key, BytesWritable payload ,BytesWritable pKey) th
/**
* Creates a fetch request.
- *
+ *
* @return false if there's no more fetches
* @throws IOException
*/
@@ -200,14 +200,14 @@ public boolean fetch() throws IOException {
System.out.println("Skipping offset : " + skippedMessage.offset());
skipped --;
}
-
+
if (!messageIter.hasNext()) {
System.out
.println("No more data left to process. Returning false");
messageIter = null;
return false;
}
-
+
return true;
}
} catch (Exception e) {
@@ -220,7 +220,7 @@ public boolean fetch() throws IOException {
/**
* Closes this context
- *
+ *
* @throws IOException
*/
public void close() throws IOException {
@@ -232,7 +232,7 @@ public void close() throws IOException {
/**
* Returns the total bytes that will be fetched. This is calculated by
* taking the diffs of the offsets
- *
+ *
* @return
*/
public long getTotalBytes() {
@@ -241,7 +241,7 @@ public long getTotalBytes() {
/**
* Returns the total bytes that have been fetched so far
- *
+ *
* @return
*/
public long getReadBytes() {
@@ -250,7 +250,7 @@ public long getReadBytes() {
/**
* Returns the number of events that have been read r
- *
+ *
* @return
*/
public long getCount() {
@@ -259,7 +259,7 @@ public long getCount() {
/**
* Returns the fetch time of the last fetch in ms
- *
+ *
* @return
*/
public long getFetchTime() {
@@ -268,7 +268,7 @@ public long getFetchTime() {
/**
* Returns the totalFetchTime in ms
- *
+ *
* @return
*/
public long getTotalFetchTime() {
Oops, something went wrong.

0 comments on commit 87917a2

Please sign in to comment.