wrapper) throws IOException {
- writer.write(wrapper.datum(), encoder);
- // would be a lot faster if the Serializer interface had a flush()
- // method and the Hadoop framework called it when needed rather
- // than for every record.
- encoder.flush();
- }
-
- public void close() throws IOException {
- out.close();
- }
-
+ /**
+ * Gets an object capable of serializing output from a Mapper.
+ *
+ * This may be for Map output
+ */
+ public Serializer> getSerializer(Class> c) {
+ Schema schema;
+ if (AvroKey.class.isAssignableFrom(c)) {
+ schema = AvroJob.getMapOutputKeySchema(getConf());
+ } else if (AvroValue.class.isAssignableFrom(c)) {
+ schema = AvroJob.getMapOutputValueSchema(getConf());
+ } else {
+ throw new IllegalStateException("Only AvroKey and AvroValue are supported.");
+ }
+ return new AvroSerializer(schema);
}
-
}
diff --git a/src/main/java/org/apache/avro/mapreduce/AvroSerializer.java b/src/main/java/org/apache/avro/mapreduce/AvroSerializer.java
new file mode 100644
index 0000000..d9068d0
--- /dev/null
+++ b/src/main/java/org/apache/avro/mapreduce/AvroSerializer.java
@@ -0,0 +1,96 @@
+// (c) Copyright 2011 Odiago, Inc.
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * Serializes AvroWrapper objects within Hadoop.
+ *
+ * Keys and values containing Avro types are more efficiently serialized outside of the
+ * WritableSerialization model, so they are wrapped in {@link
+ * org.apache.avro.mapred.AvroWrapper} objects and serialization is handled by this
+ * class.
+ *
+ * MapReduce jobs that use AvroWrapper objects as keys or values need to be configured
+ * with {@link org.apache.avro.mapreduce.AvroSerialization}. Use {@link
+ * org.apache.avro.mapreduce.AvroJob} to help with Job configuration.
+ *
+ * @param The Java type of the Avro data.
+ */
+public class AvroSerializer implements Serializer> {
+ /**
+ * The block size for the Avro encoder.
+ *
+ * This number was copied from the AvroSerialization of org.apache.avro.mapred in Avro 1.5.1.
+ *
+ * TODO(gwu): Do some benchmarking with different numbers here to see if it is important.
+ */
+ private static final int AVRO_ENCODER_BLOCK_SIZE_BYTES = 512;
+
+ /** An factory for creating Avro datum encoders. */
+ private static EncoderFactory mEncoderFactory
+ = new EncoderFactory().configureBlockSize(AVRO_ENCODER_BLOCK_SIZE_BYTES);
+
+ /** The writer schema for the data to serialize. */
+ private final Schema mWriterSchema;
+
+ /** The Avro datum writer for serializing. */
+ private final DatumWriter mAvroDatumWriter;
+
+ /** The Avro encoder for serializing. */
+ private BinaryEncoder mAvroEncoder;
+
+ /** The output stream for serializing. */
+ private OutputStream mOutputStream;
+
+ /**
+ * Constructor.
+ *
+ * @param writerSchema The writer schema for the Avro data being serialized.
+ */
+ public AvroSerializer(Schema writerSchema) {
+ mWriterSchema = writerSchema;
+ mAvroDatumWriter = new SpecificDatumWriter(writerSchema);
+ }
+
+ /**
+ * Gets the writer schema being used for serialization.
+ *
+ * @return The writer schema.
+ */
+ public Schema getWriterSchema() {
+ return mWriterSchema;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void open(OutputStream outputStream) throws IOException {
+ mOutputStream = outputStream;
+ mAvroEncoder = mEncoderFactory.binaryEncoder(outputStream, mAvroEncoder);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void serialize(AvroWrapper avroWrapper) throws IOException {
+ mAvroDatumWriter.write(avroWrapper.datum(), mAvroEncoder);
+ // This would be a lot faster if the Serializer interface had a flush() method and the
+ // Hadoop framework called it when needed. For now, we'll have to flush on every record.
+ mAvroEncoder.flush();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws IOException {
+ mOutputStream.close();
+ }
+}
diff --git a/src/main/java/org/apache/avro/mapreduce/AvroValueDeserializer.java b/src/main/java/org/apache/avro/mapreduce/AvroValueDeserializer.java
new file mode 100644
index 0000000..91fd03c
--- /dev/null
+++ b/src/main/java/org/apache/avro/mapreduce/AvroValueDeserializer.java
@@ -0,0 +1,34 @@
+// (c) Copyright 2011 Odiago, Inc.
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroValue;
+
+/**
+ * Deserializes AvroValue objects within Hadoop.
+ *
+ * @param The java type of the avro data to deserialize.
+ *
+ * @see org.apache.avro.mapreduce.AvroDeserializer
+ */
+public class AvroValueDeserializer extends AvroDeserializer, D> {
+ /**
+ * Constructor.
+ *
+ * @param readerSchema The Avro reader schema for the data to deserialize.
+ */
+ public AvroValueDeserializer(Schema readerSchema) {
+ super(readerSchema);
+ }
+
+ /**
+ * Creates a new empty AvroValue
instance.
+ *
+ * @return a new empty AvroValue.
+ */
+ @Override
+ protected AvroValue createAvroWrapper() {
+ return new AvroValue(null);
+ }
+}
diff --git a/src/test/java/org/apache/avro/mapreduce/TestAvroKeyDeserializer.java b/src/test/java/org/apache/avro/mapreduce/TestAvroKeyDeserializer.java
new file mode 100644
index 0000000..7ef3403
--- /dev/null
+++ b/src/test/java/org/apache/avro/mapreduce/TestAvroKeyDeserializer.java
@@ -0,0 +1,52 @@
+// (c) Copyright 2011 Odiago, Inc.
+
+package org.apache.avro.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroKey;
+import org.junit.Test;
+
+public class TestAvroKeyDeserializer {
+ @Test
+ public void testDeserialize() throws IOException {
+ // Create a deserializer.
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ AvroKeyDeserializer deserializer
+ = new AvroKeyDeserializer(readerSchema);
+
+ // Check the reader schema.
+ assertEquals(readerSchema, deserializer.getReaderSchema());
+
+ // Write some records to deserialize.
+ Schema writerSchema = Schema.create(Schema.Type.STRING);
+ DatumWriter datumWriter = new GenericDatumWriter(writerSchema);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+ datumWriter.write("record1", encoder);
+ datumWriter.write("record2", encoder);
+ encoder.flush();
+
+ // Deserialize the records.
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ deserializer.open(inputStream);
+ AvroKey record = null;
+
+ record = deserializer.deserialize(record);
+ assertEquals("record1", record.datum().toString());
+
+ record = deserializer.deserialize(record);
+ assertEquals("record2", record.datum().toString());
+
+ deserializer.close();
+ }
+}
diff --git a/src/test/java/org/apache/avro/mapreduce/TestAvroSerialization.java b/src/test/java/org/apache/avro/mapreduce/TestAvroSerialization.java
new file mode 100644
index 0000000..a72f999
--- /dev/null
+++ b/src/test/java/org/apache/avro/mapreduce/TestAvroSerialization.java
@@ -0,0 +1,101 @@
+// (c) Copyright 2011 Odiago, Inc.
+
+package org.apache.avro.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+public class TestAvroSerialization {
+ @Test
+ public void testAccept() {
+ AvroSerialization serialization = new AvroSerialization();
+
+ assertTrue(serialization.accept(AvroKey.class));
+ assertTrue(serialization.accept(AvroValue.class));
+ assertFalse(serialization.accept(AvroWrapper.class));
+ assertFalse(serialization.accept(String.class));
+ }
+
+ @Test
+ public void testGetSerializerForKey() throws IOException {
+ // Set the writer schema in the job configuration.
+ Schema writerSchema = Schema.create(Schema.Type.STRING);
+ Job job = new Job();
+ AvroJob.setMapOutputKeySchema(job, writerSchema);
+
+ // Get a serializer from the configuration.
+ AvroSerialization serialization
+ = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+ Serializer serializer = serialization.getSerializer(AvroKey.class);
+ assertTrue(serializer instanceof AvroSerializer);
+ AvroSerializer avroSerializer = (AvroSerializer) serializer;
+
+ // Check that the writer schema is set correctly on the serializer.
+ assertEquals(writerSchema, avroSerializer.getWriterSchema());
+ }
+
+ @Test
+ public void testGetSerializerForValue() throws IOException {
+ // Set the writer schema in the job configuration.
+ Schema writerSchema = Schema.create(Schema.Type.STRING);
+ Job job = new Job();
+ AvroJob.setMapOutputValueSchema(job, writerSchema);
+
+ // Get a serializer from the configuration.
+ AvroSerialization serialization
+ = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+ Serializer serializer = serialization.getSerializer(AvroValue.class);
+ assertTrue(serializer instanceof AvroSerializer);
+ AvroSerializer avroSerializer = (AvroSerializer) serializer;
+
+ // Check that the writer schema is set correctly on the serializer.
+ assertEquals(writerSchema, avroSerializer.getWriterSchema());
+ }
+
+ @Test
+ public void testGetDeserializerForKey() throws IOException {
+ // Set the reader schema in the job configuration.
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ Job job = new Job();
+ AvroJob.setMapOutputKeySchema(job, readerSchema);
+
+ // Get a deserializer from the configuration.
+ AvroSerialization serialization
+ = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+ Deserializer deserializer = serialization.getDeserializer(AvroKey.class);
+ assertTrue(deserializer instanceof AvroKeyDeserializer);
+ AvroKeyDeserializer avroDeserializer = (AvroKeyDeserializer) deserializer;
+
+ // Check that the reader schema is set correctly on the deserializer.
+ assertEquals(readerSchema, avroDeserializer.getReaderSchema());
+ }
+
+ @Test
+ public void testGetDeserializerForValue() throws IOException {
+ // Set the reader schema in the job configuration.
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ Job job = new Job();
+ AvroJob.setMapOutputValueSchema(job, readerSchema);
+
+ // Get a deserializer from the configuration.
+ AvroSerialization serialization
+ = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+ Deserializer deserializer = serialization.getDeserializer(AvroValue.class);
+ assertTrue(deserializer instanceof AvroValueDeserializer);
+ AvroValueDeserializer avroDeserializer = (AvroValueDeserializer) deserializer;
+
+ // Check that the reader schema is set correctly on the deserializer.
+ assertEquals(readerSchema, avroDeserializer.getReaderSchema());
+ }
+}
diff --git a/src/test/java/org/apache/avro/mapreduce/TestAvroSerializer.java b/src/test/java/org/apache/avro/mapreduce/TestAvroSerializer.java
new file mode 100644
index 0000000..f0f692b
--- /dev/null
+++ b/src/test/java/org/apache/avro/mapreduce/TestAvroSerializer.java
@@ -0,0 +1,51 @@
+// (c) Copyright 2011 Odiago, Inc.
+
+package org.apache.avro.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.AvroKey;
+import org.junit.Test;
+
+public class TestAvroSerializer {
+ @Test
+ public void testSerialize() throws IOException {
+ // Create a serializer.
+ Schema writerSchema = Schema.create(Schema.Type.STRING);
+ AvroSerializer serializer = new AvroSerializer(writerSchema);
+
+ // Check the writer schema.
+ assertEquals(writerSchema, serializer.getWriterSchema());
+
+ // Serialize two records, 'record1' and 'record2'.
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ serializer.open(outputStream);
+ serializer.serialize(new AvroKey("record1"));
+ serializer.serialize(new AvroKey("record2"));
+ serializer.close();
+
+ // Make sure the records were serialized correctly.
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ DatumReader datumReader = new GenericDatumReader(readerSchema);
+ Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ CharSequence record = null;
+
+ record = datumReader.read(record, decoder);
+ assertEquals("record1", record.toString());
+
+ record = datumReader.read(record, decoder);
+ assertEquals("record2", record.toString());
+
+ inputStream.close();
+ }
+}
diff --git a/src/test/java/org/apache/avro/mapreduce/TestAvroValueDeserializer.java b/src/test/java/org/apache/avro/mapreduce/TestAvroValueDeserializer.java
new file mode 100644
index 0000000..5a2cdba
--- /dev/null
+++ b/src/test/java/org/apache/avro/mapreduce/TestAvroValueDeserializer.java
@@ -0,0 +1,52 @@
+// (c) Copyright 2011 Odiago, Inc.
+
+package org.apache.avro.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroValue;
+import org.junit.Test;
+
+public class TestAvroValueDeserializer {
+ @Test
+ public void testDeserialize() throws IOException {
+ // Create a deserializer.
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ AvroValueDeserializer deserializer
+ = new AvroValueDeserializer(readerSchema);
+
+ // Check the reader schema.
+ assertEquals(readerSchema, deserializer.getReaderSchema());
+
+ // Write some records to deserialize.
+ Schema writerSchema = Schema.create(Schema.Type.STRING);
+ DatumWriter datumWriter = new GenericDatumWriter(writerSchema);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+ datumWriter.write("record1", encoder);
+ datumWriter.write("record2", encoder);
+ encoder.flush();
+
+ // Deserialize the records.
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ deserializer.open(inputStream);
+ AvroValue record = null;
+
+ record = deserializer.deserialize(record);
+ assertEquals("record1", record.datum().toString());
+
+ record = deserializer.deserialize(record);
+ assertEquals("record2", record.datum().toString());
+
+ deserializer.close();
+ }
+}