Skip to content

Commit

Permalink
Use DataFileWriter.appendEncoded to avoid an extra deserialization/se…
Browse files Browse the repository at this point in the history
…rialization step.
  • Loading branch information
tomwhite committed Jun 14, 2013
1 parent a2cc066 commit cf71e88
Showing 1 changed file with 9 additions and 20 deletions.
Expand Up @@ -22,18 +22,15 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
Expand Down Expand Up @@ -61,9 +58,8 @@
* #AVRO_SCHEMA_URL_HEADER}. Schemas read from URLs are cached by instances of this
* class so that the overhead of retrieval is minimized.
* </p>
* @param <T> data type that can be written in the Avro schema
*/
public class AvroEventSerializer<T> implements EventSerializer, Configurable {
public class AvroEventSerializer implements EventSerializer, Configurable {

private static final Logger logger =
LoggerFactory.getLogger(AvroEventSerializer.class);
Expand All @@ -72,12 +68,9 @@ public class AvroEventSerializer<T> implements EventSerializer, Configurable {
public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";

private final OutputStream out;
private DatumWriter<T> writer = null;
private DataFileWriter<T> dataFileWriter = null;
private DatumWriter<Object> writer = null;
private DataFileWriter<Object> dataFileWriter = null;

private DatumReader<T> reader;
private BinaryDecoder decoder;
private T record;
private int syncIntervalBytes;
private String compressionCodec;
private Map<String, Schema> schemaCache = new HashMap<String, Schema>();
Expand Down Expand Up @@ -107,12 +100,10 @@ public void afterReopen() throws IOException {

@Override
public void write(Event event) throws IOException {
if (reader == null) {
if (dataFileWriter == null) {
initialize(event);
}
decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
record = reader.read(record, decoder);
dataFileWriter.append(record);
dataFileWriter.appendEncoded(ByteBuffer.wrap(event.getBody()));
}

private void initialize(Event event) throws IOException {
Expand All @@ -133,8 +124,8 @@ private void initialize(Event event) throws IOException {
schema = new Schema.Parser().parse(schemaString);
}

writer = new ReflectDatumWriter<T>(schema);
dataFileWriter = new DataFileWriter<T>(writer);
writer = new GenericDatumWriter<Object>(schema);
dataFileWriter = new DataFileWriter<Object>(writer);

dataFileWriter.setSyncInterval(syncIntervalBytes);

Expand All @@ -147,8 +138,6 @@ private void initialize(Event event) throws IOException {
}

dataFileWriter.create(schema, out);

reader = new ReflectDatumReader<T>(schema);
}

private Schema loadFromUrl(String schemaUrl) throws IOException {
Expand Down

0 comments on commit cf71e88

Please sign in to comment.