Skip to content

Commit

Permalink
Moved some classes from 'mapreduce' package to 'io' package.
Browse files Browse the repository at this point in the history
Also got rid of some type warnings.
  • Loading branch information
gwu committed Nov 29, 2011
1 parent 43be672 commit 958def1
Show file tree
Hide file tree
Showing 33 changed files with 160 additions and 57 deletions.
12 changes: 10 additions & 2 deletions .gitignore
@@ -1,4 +1,12 @@
/target/
# Editor temporary files.
*~

# IDE project files.
/.classpath
/.project
/.settings/
/prj.el
/.jde
/.jde/

# Build artifacts.
/target/
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -89,6 +89,12 @@
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/org/apache/avro/file/SortedKeyValueFile.java
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

Expand All @@ -14,16 +15,15 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.AvroKeyValue;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapreduce.AvroKeyValue;
import org.apache.avro.util.AvroCharSequenceComparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;

/**
* A SortedKeyValueFile is an indexed Avro container file of KeyValue records sorted by key.
Expand Down Expand Up @@ -310,10 +310,10 @@ public static class Writer<K, V> implements Closeable {
private final Schema mIndexSchema;

/** The writer for the data file. */
private final DataFileWriter mDataFileWriter;
private final DataFileWriter<GenericRecord> mDataFileWriter;

/** The writer for the index file. */
private final DataFileWriter mIndexFileWriter;
private final DataFileWriter<GenericRecord> mIndexFileWriter;

/** We store an indexed key for every mIndexInterval records written to the data file. */
private final int mIndexInterval;
Expand Down Expand Up @@ -482,7 +482,8 @@ public Writer(Options options) throws IOException {
Path dataFilePath = new Path(options.getPath(), DATA_FILENAME);
LOG.debug("Creating writer for avro data file: " + dataFilePath);
mRecordSchema = AvroKeyValue.getSchema(mKeySchema, mValueSchema);
DatumWriter datumWriter = new GenericDatumWriter<GenericRecord>(mRecordSchema);
DatumWriter<GenericRecord> datumWriter
= new GenericDatumWriter<GenericRecord>(mRecordSchema);
OutputStream dataOutputStream = fileSystem.create(dataFilePath);
mDataFileWriter = new DataFileWriter<GenericRecord>(datumWriter)
.setSyncInterval(1 << 20) // Set the auto-sync interval sufficiently large, since
Expand All @@ -493,7 +494,8 @@ public Writer(Options options) throws IOException {
Path indexFilePath = new Path(options.getPath(), INDEX_FILENAME);
LOG.debug("Creating writer for avro index file: " + indexFilePath);
mIndexSchema = AvroKeyValue.getSchema(mKeySchema, Schema.create(Schema.Type.LONG));
DatumWriter indexWriter = new GenericDatumWriter<GenericRecord>(mIndexSchema);
DatumWriter<GenericRecord> indexWriter
= new GenericDatumWriter<GenericRecord>(mIndexSchema);
OutputStream indexOutputStream = fileSystem.create(indexFilePath);
mIndexFileWriter = new DataFileWriter<GenericRecord>(indexWriter)
.create(mIndexSchema, indexOutputStream);
Expand Down
@@ -1,6 +1,6 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import org.apache.avro.Schema;

Expand Down
@@ -1,6 +1,6 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import java.nio.ByteBuffer;

Expand All @@ -11,6 +11,7 @@
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.BooleanWritable;
Expand Down Expand Up @@ -59,6 +60,7 @@ public AvroDatumConverterFactory(Configuration conf) {
* @param inputClass The type of input data to convert.
* @return A converter that turns objects of type <code>inputClass</code> into Avro data.
*/
@SuppressWarnings("unchecked")
public <IN, OUT> AvroDatumConverter<IN, OUT> create(Class<IN> inputClass) {
if (AvroKey.class.isAssignableFrom(inputClass)) {
Schema schema = AvroJob.getOutputKeySchema(getConf());
Expand Down
@@ -1,14 +1,11 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.io.serializer.Deserializer;
Expand Down
@@ -1,10 +1,10 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryData;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
Expand Down
@@ -1,9 +1,10 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroWrapper;

/**
* Deserializes AvroKey objects within Hadoop.
Expand All @@ -12,7 +13,7 @@
*
* @see org.apache.avro.mapreduce.AvroDeserializer
*/
public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroKey<D>, D> {
public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroWrapper<D>, D> {
/**
* Constructor.
*
Expand All @@ -29,7 +30,7 @@ public AvroKeyDeserializer(Schema writerSchema, Schema readerSchema) {
* @return a new empty AvroKey.
*/
@Override
protected AvroKey<D> createAvroWrapper() {
protected AvroWrapper<D> createAvroWrapper() {
return new AvroKey<D>(null);
}
}
@@ -1,6 +1,6 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import java.util.Arrays;

Expand Down Expand Up @@ -52,6 +52,7 @@ public GenericRecord get() {
*
* @return The key from the key/value generic record.
*/
@SuppressWarnings("unchecked")
public K getKey() {
return (K) mKeyValueRecord.get(KEY_FIELD);
}
Expand All @@ -61,6 +62,7 @@ public K getKey() {
*
* @return The value from the key/value generic record.
*/
@SuppressWarnings("unchecked")
public V getValue() {
return (V) mKeyValueRecord.get(VALUE_FIELD);
}
Expand Down
@@ -1,6 +1,6 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import java.util.Collection;

Expand Down Expand Up @@ -49,9 +49,9 @@ public boolean accept(Class<?> c) {
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
Configuration conf = getConf();
if (AvroKey.class.isAssignableFrom(c)) {
return new AvroKeyDeserializer(getKeyWriterSchema(conf), getKeyReaderSchema(conf));
return new AvroKeyDeserializer<T>(getKeyWriterSchema(conf), getKeyReaderSchema(conf));
} else if (AvroValue.class.isAssignableFrom(c)) {
return new AvroValueDeserializer(getValueWriterSchema(conf), getValueReaderSchema(conf));
return new AvroValueDeserializer<T>(getValueWriterSchema(conf), getValueReaderSchema(conf));
} else {
throw new IllegalStateException("Only AvroKey and AvroValue are supported.");
}
Expand Down
@@ -1,14 +1,11 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import java.io.IOException;
import java.io.OutputStream;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.io.serializer.Serializer;
Expand Down
@@ -1,9 +1,10 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;

/**
* Deserializes AvroValue objects within Hadoop.
Expand All @@ -12,7 +13,7 @@
*
* @see org.apache.avro.mapreduce.AvroDeserializer
*/
public class AvroValueDeserializer<D> extends AvroDeserializer<AvroValue<D>, D> {
public class AvroValueDeserializer<D> extends AvroDeserializer<AvroWrapper<D>, D> {
/**
* Constructor.
*
Expand All @@ -29,7 +30,7 @@ public AvroValueDeserializer(Schema writerSchema, Schema readerSchema) {
* @return a new empty AvroValue.
*/
@Override
protected AvroValue<D> createAvroWrapper() {
protected AvroWrapper<D> createAvroWrapper() {
return new AvroValue<D>(null);
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/apache/avro/mapreduce/AvroJob.java
Expand Up @@ -3,6 +3,8 @@
package org.apache.avro.mapreduce;

import org.apache.avro.Schema;
import org.apache.avro.io.AvroKeyComparator;
import org.apache.avro.io.AvroSerialization;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.conf.Configuration;
Expand Down
Expand Up @@ -64,6 +64,7 @@ protected RecordWriter<AvroKey<T>, NullWritable> create(

/** {@inheritDoc} */
@Override
@SuppressWarnings("unchecked")
public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
throws IOException {
// Get the writer schema.
Expand Down
Expand Up @@ -3,9 +3,9 @@
package org.apache.avro.mapreduce;

import java.io.IOException;
import java.io.OutputStream;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.io.AvroDatumConverter;
import org.apache.avro.io.AvroDatumConverterFactory;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Expand All @@ -28,6 +28,7 @@
public class AvroKeyValueOutputFormat<K, V> extends AvroOutputFormatBase<K, V> {
/** {@inheritDoc} */
@Override
@SuppressWarnings("unchecked")
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException {
AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
context.getConfiguration());
Expand Down
Expand Up @@ -6,6 +6,7 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.AvroKeyValue;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;

Expand Down
Expand Up @@ -6,6 +6,8 @@
import java.io.OutputStream;

import org.apache.avro.Schema;
import org.apache.avro.io.AvroKeyValue;
import org.apache.avro.io.AvroDatumConverter;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -60,7 +62,7 @@ public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
mValueConverter = valueConverter;

// Create a reusable output record.
mOutputRecord = new AvroKeyValue(new GenericData.Record(mKeyValuePairSchema));
mOutputRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/apache/hadoop/io/AvroSequenceFile.java
Expand Up @@ -5,9 +5,9 @@
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.io.AvroSerialization;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroSerialization;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down
Expand Up @@ -12,8 +12,8 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.AvroKeyValue;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapreduce.AvroKeyValue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Rule;
Expand Down Expand Up @@ -107,7 +107,8 @@ public void testWriter() throws IOException {
File dataFile = new File(directory, SortedKeyValueFile.DATA_FILENAME);
DatumReader<GenericRecord> dataReader = new GenericDatumReader<GenericRecord>(
AvroKeyValue.getSchema(options.getKeySchema(), options.getValueSchema()));
DataFileReader<GenericRecord> dataFileReader = new DataFileReader(dataFile, dataReader);
DataFileReader<GenericRecord> dataFileReader
= new DataFileReader<GenericRecord>(dataFile, dataReader);

try {
dataFileReader.seek(indexRecords.get(0).getValue());
Expand Down
@@ -1,6 +1,6 @@
// (c) Copyright 2011 Odiago, Inc.

package org.apache.avro.mapreduce;
package org.apache.avro.io;

import static org.junit.Assert.*;

Expand All @@ -11,6 +11,7 @@
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -39,6 +40,7 @@ public void testConvertAvroKey() throws IOException {
AvroJob.setOutputKeySchema(mJob, Schema.create(Schema.Type.STRING));

AvroKey<CharSequence> avroKey = new AvroKey<CharSequence>("foo");
@SuppressWarnings("unchecked")
AvroDatumConverter<AvroKey<CharSequence>, ?> converter = mFactory.create(
(Class<AvroKey<CharSequence>>) avroKey.getClass());
assertEquals("foo", converter.convert(avroKey).toString());
Expand All @@ -49,6 +51,7 @@ public void testConvertAvroValue() throws IOException {
AvroJob.setOutputValueSchema(mJob, Schema.create(Schema.Type.INT));

AvroValue<Integer> avroValue = new AvroValue<Integer>(42);
@SuppressWarnings("unchecked")
AvroDatumConverter<AvroValue<Integer>, Integer> converter = mFactory.create(
(Class<AvroValue<Integer>>) avroValue.getClass());
assertEquals(42, converter.convert(avroValue).intValue());
Expand Down

0 comments on commit 958def1

Please sign in to comment.