Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIBI-401. Clean up the Avro serialization code.
- Loading branch information
Showing
10 changed files
with
579 additions
and
157 deletions.
There are no files selected for viewing
91 changes: 91 additions & 0 deletions
91
src/main/java/org/apache/avro/mapreduce/AvroDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
// (c) Copyright 2011 Odiago, Inc. | ||
|
||
package org.apache.avro.mapreduce; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.io.BinaryDecoder; | ||
import org.apache.avro.io.DatumReader; | ||
import org.apache.avro.io.DecoderFactory; | ||
import org.apache.avro.mapred.AvroWrapper; | ||
import org.apache.avro.specific.SpecificDatumReader; | ||
import org.apache.hadoop.io.serializer.Deserializer; | ||
|
||
/** | ||
* Deserializes AvroWrapper objects within Hadoop. | ||
* | ||
* <p>Keys and values containing Avro tyeps are more efficiently serialized outside of the | ||
* WritableSerialization model, so they are wrapper in {@link | ||
* org.apache.avro.mapred.AvroWrapper} objects and deserialization is handled by this | ||
* class.</p> | ||
* | ||
* <p>MapReduce jobs that use AvroWrapper objects as keys or values need to be configured | ||
* with {@link org.apache.avro.mapreduce.AvroSerialization}. Use {@link | ||
* org.apache.avro.mapreduce.AvroJob} to help with Job configuration.</p> | ||
* | ||
* @param <T> The type of Avro wrapper. | ||
* @param <D> The Java type of the Avro data being wrapped. | ||
*/ | ||
public abstract class AvroDeserializer<T extends AvroWrapper<D>, D> implements Deserializer<T> { | ||
/** The Avro reader schema for deserializing. */ | ||
private final Schema mReaderSchema; | ||
|
||
/** The Avro datum reader for deserializing. */ | ||
private final DatumReader<D> mAvroDatumReader; | ||
|
||
/** An Avro binary decoder for deserializing. */ | ||
private BinaryDecoder mAvroDecoder; | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param readerSchema The Avro reader schema for the data to deserialize. | ||
*/ | ||
protected AvroDeserializer(Schema readerSchema) { | ||
mReaderSchema = readerSchema; | ||
mAvroDatumReader = new SpecificDatumReader<D>(readerSchema); | ||
} | ||
|
||
/** | ||
* Gets the reader schema used for deserializing. | ||
* | ||
* @return The reader schema. | ||
*/ | ||
public Schema getReaderSchema() { | ||
return mReaderSchema; | ||
} | ||
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public void open(InputStream inputStream) throws IOException { | ||
mAvroDecoder = DecoderFactory.get().directBinaryDecoder(inputStream, mAvroDecoder); | ||
} | ||
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public T deserialize(T avroWrapperToReuse) throws IOException { | ||
// Create a new Avro wrapper if there isn't one to reuse. | ||
if (null == avroWrapperToReuse) { | ||
avroWrapperToReuse = createAvroWrapper(); | ||
} | ||
|
||
// Deserialize the Avro datum from the input stream. | ||
avroWrapperToReuse.datum(mAvroDatumReader.read(avroWrapperToReuse.datum(), mAvroDecoder)); | ||
return avroWrapperToReuse; | ||
} | ||
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public void close() throws IOException { | ||
mAvroDecoder.inputStream().close(); | ||
} | ||
|
||
/** | ||
* Creates a new empty <code>T</code> (extends AvroWrapper) instance. | ||
* | ||
* @return A new empty <code>T</code> instance. | ||
*/ | ||
protected abstract T createAvroWrapper(); | ||
} |
54 changes: 24 additions & 30 deletions
54
src/main/java/org/apache/avro/mapreduce/AvroKeyComparator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,51 +1,45 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
// (c) Copyright 2011 Odiago, Inc. | ||
|
||
package org.apache.avro.mapreduce; | ||
|
||
import org.apache.hadoop.io.RawComparator; | ||
import org.apache.hadoop.conf.Configured; | ||
import org.apache.hadoop.conf.Configuration; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.io.BinaryData; | ||
import org.apache.avro.mapred.AvroKey; | ||
import org.apache.avro.specific.SpecificData; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.conf.Configured; | ||
import org.apache.hadoop.io.RawComparator; | ||
|
||
/** The {@link RawComparator} used by jobs configured with {@link AvroJob}. */ | ||
/** | ||
* The {@link org.apache.hadoop.io.RawComparator} used by jobs configured with | ||
* {@link org.apache.avro.mapreduce.AvroJob}. | ||
* | ||
* <p>Compares AvroKeys output from the map phase for sorting.</p> | ||
*/ | ||
public class AvroKeyComparator<T> extends Configured implements RawComparator<AvroKey<T>> { | ||
/** The schema of the Avro data in the key to compare. */ | ||
private Schema mSchema; | ||
|
||
private Schema schema; | ||
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public void setConf(Configuration conf) { | ||
super.setConf(conf); | ||
if (conf != null) { | ||
schema = AvroJob.getMapOutputKeySchema(conf); | ||
if (null != conf) { | ||
// The MapReduce framework will be using this comparator to sort AvroKey objects | ||
// output from the map phase, so use the schema defined for the map output key. | ||
mSchema = AvroJob.getMapOutputKeySchema(conf); | ||
} | ||
} | ||
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { | ||
return BinaryData.compare(b1, s1, b2, s2, schema); | ||
return BinaryData.compare(b1, s1, b2, s2, mSchema); | ||
} | ||
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public int compare(AvroKey<T> x, AvroKey<T> y) { | ||
return SpecificData.get().compare(x.datum(), y.datum(), schema); | ||
return SpecificData.get().compare(x.datum(), y.datum(), mSchema); | ||
} | ||
|
||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
src/main/java/org/apache/avro/mapreduce/AvroKeyDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// (c) Copyright 2011 Odiago, Inc. | ||
|
||
package org.apache.avro.mapreduce; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.mapred.AvroKey; | ||
|
||
/** | ||
* Deserializes AvroKey objects within Hadoop. | ||
* | ||
* @param <D> The java type of the avro data to deserialize. | ||
* | ||
* @see org.apache.avro.mapreduce.AvroDeserializer | ||
*/ | ||
public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroKey<D>, D> { | ||
/** | ||
* Constructor. | ||
* | ||
* @param readerSchema The Avro reader schema for the data to deserialize. | ||
*/ | ||
public AvroKeyDeserializer(Schema readerSchema) { | ||
super(readerSchema); | ||
} | ||
|
||
/** | ||
* Creates a new empty <code>AvroKey</code> instance. | ||
* | ||
* @return a new empty AvroKey. | ||
*/ | ||
@Override | ||
protected AvroKey<D> createAvroWrapper() { | ||
return new AvroKey<D>(null); | ||
} | ||
} |
171 changes: 44 additions & 127 deletions
171
src/main/java/org/apache/avro/mapreduce/AvroSerialization.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,143 +1,60 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
// (c) Copyright 2011 Odiago, Inc. | ||
|
||
package org.apache.avro.mapreduce; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
|
||
import org.apache.hadoop.io.serializer.Serialization; | ||
import org.apache.hadoop.io.serializer.Deserializer; | ||
import org.apache.hadoop.io.serializer.Serializer; | ||
import org.apache.hadoop.conf.Configured; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.io.BinaryEncoder; | ||
import org.apache.avro.io.BinaryDecoder; | ||
import org.apache.avro.io.DecoderFactory; | ||
import org.apache.avro.io.DatumReader; | ||
import org.apache.avro.io.DatumWriter; | ||
import org.apache.avro.io.EncoderFactory; | ||
import org.apache.avro.mapred.AvroKey; | ||
import org.apache.avro.mapred.AvroValue; | ||
import org.apache.avro.mapred.AvroWrapper; | ||
import org.apache.avro.reflect.ReflectDatumWriter; | ||
import org.apache.avro.specific.SpecificDatumReader; | ||
|
||
/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */ | ||
public class AvroSerialization<T> extends Configured | ||
implements Serialization<AvroWrapper<T>> { | ||
import org.apache.hadoop.conf.Configured; | ||
import org.apache.hadoop.io.serializer.Deserializer; | ||
import org.apache.hadoop.io.serializer.Serialization; | ||
import org.apache.hadoop.io.serializer.Serializer; | ||
|
||
/** | ||
* The {@link org.apache.hadoop.io.serializer.Serialization} used by jobs configured with | ||
* {@link org.apache.avro.mapreduce.AvroJob}. | ||
* | ||
* @param <T> The Java type of the Avro data to serialize. | ||
*/ | ||
public class AvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> { | ||
/** {@inheritDoc} */ | ||
@Override | ||
public boolean accept(Class<?> c) { | ||
return AvroWrapper.class.isAssignableFrom(c); | ||
} | ||
|
||
/** Returns the specified map output deserializer. Defaults to the final | ||
* output deserializer if no map output schema was specified. */ | ||
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) { | ||
// We need not rely on mapred.task.is.map here to determine whether map | ||
// output or final output is desired, since the mapreduce framework never | ||
// creates a deserializer for final output, only for map output. | ||
boolean isKey = AvroKey.class.isAssignableFrom(c); | ||
Schema schema = isKey | ||
? AvroJob.getMapOutputKeySchema(getConf()) | ||
: AvroJob.getMapOutputValueSchema(getConf()); | ||
return new AvroWrapperDeserializer(new SpecificDatumReader<T>(schema), | ||
isKey); | ||
return AvroKey.class.isAssignableFrom(c) || AvroValue.class.isAssignableFrom(c); | ||
} | ||
|
||
private static final DecoderFactory FACTORY = DecoderFactory.get(); | ||
|
||
private class AvroWrapperDeserializer | ||
implements Deserializer<AvroWrapper<T>> { | ||
|
||
private DatumReader<T> reader; | ||
private BinaryDecoder decoder; | ||
private boolean isKey; | ||
|
||
public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) { | ||
this.reader = reader; | ||
this.isKey = isKey; | ||
} | ||
|
||
public void open(InputStream in) { | ||
this.decoder = FACTORY.directBinaryDecoder(in, decoder); | ||
} | ||
|
||
public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper) | ||
throws IOException { | ||
T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder); | ||
if (wrapper == null) { | ||
wrapper = isKey? new AvroKey<T>(datum) : new AvroValue<T>(datum); | ||
} else { | ||
wrapper.datum(datum); | ||
} | ||
return wrapper; | ||
} | ||
|
||
public void close() throws IOException { | ||
decoder.inputStream().close(); | ||
/** | ||
* Gets an object capable of deserializing the output from a Mapper. | ||
* | ||
* @param c The class to get a deserializer for. | ||
* @return A deserializer for objects of class <code>c</code>. | ||
*/ | ||
@Override | ||
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) { | ||
if (AvroKey.class.isAssignableFrom(c)) { | ||
return new AvroKeyDeserializer(AvroJob.getMapOutputKeySchema(getConf())); | ||
} else if (AvroValue.class.isAssignableFrom(c)) { | ||
return new AvroValueDeserializer(AvroJob.getMapOutputValueSchema(getConf())); | ||
} else { | ||
throw new IllegalStateException("Only AvroKey and AvroValue are supported."); | ||
} | ||
|
||
} | ||
|
||
/** Returns the specified output serializer. */ | ||
public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) { | ||
// Here we must rely on mapred.task.is.map to tell whether the map output | ||
// or final output is needed. | ||
boolean isMap = getConf().getBoolean("mapred.task.is.map", false); | ||
Schema schema = !isMap | ||
? AvroJob.getOutputSchema(getConf()) | ||
: (AvroKey.class.isAssignableFrom(c) | ||
? AvroJob.getMapOutputKeySchema(getConf()) | ||
: AvroJob.getMapOutputValueSchema(getConf())); | ||
return new AvroWrapperSerializer(new ReflectDatumWriter<T>(schema)); | ||
} | ||
|
||
private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> { | ||
|
||
private DatumWriter<T> writer; | ||
private OutputStream out; | ||
private BinaryEncoder encoder; | ||
|
||
public AvroWrapperSerializer(DatumWriter<T> writer) { | ||
this.writer = writer; | ||
} | ||
|
||
public void open(OutputStream out) { | ||
this.out = out; | ||
this.encoder = new EncoderFactory().configureBlockSize(512) | ||
.binaryEncoder(out, null); | ||
} | ||
|
||
public void serialize(AvroWrapper<T> wrapper) throws IOException { | ||
writer.write(wrapper.datum(), encoder); | ||
// would be a lot faster if the Serializer interface had a flush() | ||
// method and the Hadoop framework called it when needed rather | ||
// than for every record. | ||
encoder.flush(); | ||
} | ||
|
||
public void close() throws IOException { | ||
out.close(); | ||
} | ||
|
||
/** | ||
* Gets an object capable of serializing output from a Mapper. | ||
* | ||
* <p>This may be for Map output | ||
*/ | ||
public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) { | ||
Schema schema; | ||
if (AvroKey.class.isAssignableFrom(c)) { | ||
schema = AvroJob.getMapOutputKeySchema(getConf()); | ||
} else if (AvroValue.class.isAssignableFrom(c)) { | ||
schema = AvroJob.getMapOutputValueSchema(getConf()); | ||
} else { | ||
throw new IllegalStateException("Only AvroKey and AvroValue are supported."); | ||
} | ||
return new AvroSerializer<T>(schema); | ||
} | ||
|
||
} |
Oops, something went wrong.