Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add Avro source.

  • Loading branch information...
commit 7099aa30ee0324a0a0f41b9809ab0b29ab418802 1 parent 40b96d0
Tom White authored
Showing with 974 additions and 0 deletions.
  1. +9 −0 avro/build.xml
  2. +11 −0 avro/ivy.xml
  3. +35 −0 avro/src/main/c/dump_pairs.c
  4. +116 −0 avro/src/main/java/AvroGenericMaxTemperature.java
  5. +51 −0 avro/src/main/java/AvroProjection.java
  6. +78 −0 avro/src/main/java/AvroSort.java
  7. +96 −0 avro/src/main/java/AvroSpecificMaxTemperature.java
  8. +263 −0 avro/src/main/java/AvroTest.java
  9. +90 −0 avro/src/main/java/NcdcRecordParser.java
  10. +23 −0 avro/src/main/java/Pair.java
  11. +26 −0 avro/src/main/java/WeatherRecord.java
  12. +49 −0 avro/src/main/py/test_avro.py
  13. +28 −0 avro/src/main/py/write_pairs.py
  14. +4 −0 avro/src/main/resources/Array.avsc
  15. +6 −0 avro/src/main/resources/Enum.avsc
  16. +5 −0 avro/src/main/resources/Fixed.avsc
  17. +4 −0 avro/src/main/resources/Map.avsc
  18. +10 −0 avro/src/main/resources/NewPair.avsc
  19. +10 −0 avro/src/main/resources/NewPairWithNull.avsc
  20. +9 −0 avro/src/main/resources/Pair.avsc
  21. +8 −0 avro/src/main/resources/ProjectedPair.avsc
  22. +10 −0 avro/src/main/resources/Record.avsc
  23. +9 −0 avro/src/main/resources/SortedPair.avsc
  24. +9 −0 avro/src/main/resources/SwitchedPair.avsc
  25. +5 −0 avro/src/main/resources/Union.avsc
  26. +10 −0 avro/src/main/resources/WeatherRecord.avsc
9 avro/build.xml
View
@@ -0,0 +1,9 @@
+<project name="hadoop-book-avro" default="resolve"
+ xmlns:ivy="antlib:org.apache.ivy.ant"
+ xmlns:mvn="urn:maven-artifact-ant">
+
+ <target name="resolve">
+ <ivy:retrieve />
+ </target>
+
+</project>
11 avro/ivy.xml
View
@@ -0,0 +1,11 @@
+<ivy-module version="2.0">
+
+ <info organisation="com.hadoopbook" module="avro"/>
+
+ <dependencies>
+ <dependency org="junit" name="junit" rev="4.8.1"/>
+ <dependency org="org.apache.hadoop" name="avro" rev="1.3.2" conf="default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
+ </dependencies>
+
+</ivy-module>
35 avro/src/main/c/dump_pairs.c
View
@@ -0,0 +1,35 @@
+#include <avro.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+int main(int argc, char *argv[]) {
+ if (argc != 2) {
+ fprintf(stderr, "Usage: dump_pairs <data_file>\n");
+ exit(EXIT_FAILURE);
+ }
+
+ const char *avrofile = argv[1];
+ avro_schema_error_t error;
+ avro_file_reader_t filereader;
+ avro_datum_t pair;
+ avro_datum_t left;
+ avro_datum_t right;
+ int rval;
+ char *p;
+
+ avro_file_reader(avrofile, &filereader);
+ while (1) {
+ rval = avro_file_reader_read(filereader, NULL, &pair);
+ if (rval) break;
+ if (avro_record_get(pair, "left", &left) == 0) {
+ avro_string_get(left, &p);
+ fprintf(stdout, "%s,", p);
+ }
+ if (avro_record_get(pair, "right", &right) == 0) {
+ avro_string_get(right, &p);
+ fprintf(stdout, "%s\n", p);
+ }
+ }
+ avro_file_reader_close(filereader);
+ return 0;
+}
116 avro/src/main/java/AvroGenericMaxTemperature.java
View
@@ -0,0 +1,116 @@
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroCollector;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroMapper;
+import org.apache.avro.mapred.AvroReducer;
+import org.apache.avro.mapred.AvroUtf8InputFormat;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class AvroGenericMaxTemperature extends Configured implements Tool {
+
+ private static final Schema SCHEMA = Schema.parse("{\"type\":\"record\", \"name\":\"WeatherRecord\", \"fields\":"
+ + "[{\"name\":\"year\", \"type\":\"int\"}, " +
+ "{\"name\":\"temperature\", \"type\":\"int\", \"order\": \"ignore\"}, " +
+ "{\"name\":\"stationId\", \"type\":\"string\", \"order\": \"ignore\"}]}");
+
+ private static GenericRecord newWeatherRecord(int year, int temperature, String stationId) {
+ GenericRecord value = new GenericData.Record(SCHEMA);
+ value.put("year", year);
+ value.put("temperature", temperature);
+ value.put("stationId", new Utf8(stationId));
+ return value;
+ }
+
+ private static GenericRecord newWeatherRecord(GenericRecord other) {
+ GenericRecord value = new GenericData.Record(SCHEMA);
+ value.put("year", other.get("year"));
+ value.put("temperature", other.get("temperature"));
+ value.put("stationId", other.get("stationId"));
+ return value;
+ }
+
+ public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, GenericRecord>> {
+ private NcdcRecordParser parser = new NcdcRecordParser();
+ @Override
+ public void map(Utf8 line,
+ AvroCollector<Pair<Integer, GenericRecord>> collector,
+ Reporter reporter) throws IOException {
+ parser.parse(line.toString());
+ if (parser.isValidTemperature()) {
+ GenericRecord record = newWeatherRecord(parser.getYearInt(), parser.getAirTemperature(), parser.getStationId());
+ Pair<Integer, GenericRecord> pair =
+ new Pair<Integer, GenericRecord>(parser.getYearInt(), record);
+ collector.collect(pair);
+ }
+ }
+ }
+
+ public static class MaxTemperatureReducer extends
+ AvroReducer<Integer, GenericRecord, GenericRecord> {
+
+ @Override
+ public void reduce(Integer key, Iterable<GenericRecord> values,
+ AvroCollector<GenericRecord> collector, Reporter reporter) throws IOException {
+ GenericRecord max = null;
+ for (GenericRecord value : values) {
+ if (max == null) {
+ max = newWeatherRecord(value);
+ } else {
+ int previousMax = (Integer) max.get("temperature");
+ int currentMax = (Integer) value.get("temperature");
+ if (currentMax > previousMax) {
+ max = newWeatherRecord(value);
+ }
+ }
+ }
+ collector.collect(max);
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.printf("Usage: %s [generic options] <input> <output>\n",
+ getClass().getSimpleName());
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+
+ JobConf conf = new JobConf(getConf(), getClass());
+ conf.setJobName("Max temperature");
+
+ FileInputFormat.addInputPath(conf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING));
+ AvroJob.setMapOutputSchema(conf,
+ Pair.getPairSchema(Schema.create(Schema.Type.INT), SCHEMA));
+ AvroJob.setOutputSchema(conf, SCHEMA);
+ conf.setInputFormat(AvroUtf8InputFormat.class);
+
+ AvroJob.setMapperClass(conf, MaxTemperatureMapper.class);
+ AvroJob.setReducerClass(conf, MaxTemperatureReducer.class);
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
+ System.exit(exitCode);
+ }
+}
51 avro/src/main/java/AvroProjection.java
View
@@ -0,0 +1,51 @@
+import java.io.File;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class AvroProjection extends Configured implements Tool {
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 3) {
+ System.err.printf("Usage: %s [generic options] <input> <output> <schema-file>\n",
+ getClass().getSimpleName());
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+
+ String input = args[0];
+ String output = args[1];
+ String schemaFile = args[2];
+
+ JobConf conf = new JobConf(getConf(), getClass());
+ conf.setJobName("Avro projection");
+
+ FileInputFormat.addInputPath(conf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ Schema schema = Schema.parse(new File(schemaFile));
+ AvroJob.setInputSchema(conf, schema);
+ AvroJob.setMapOutputSchema(conf, schema);
+ AvroJob.setOutputSchema(conf, schema);
+ conf.setNumReduceTasks(0);
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new AvroProjection(), args);
+ System.exit(exitCode);
+ }
+
+}
78 avro/src/main/java/AvroSort.java
View
@@ -0,0 +1,78 @@
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroCollector;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroMapper;
+import org.apache.avro.mapred.AvroReducer;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class AvroSort extends Configured implements Tool {
+
+ static class SortMapper<K> extends AvroMapper<K, Pair<K, Void>> {
+ public void map(K datum, AvroCollector<Pair<K, Void>> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(new Pair<K ,Void>(datum, null, null, null));
+ };
+ }
+
+ static class SortReducer<K> extends AvroReducer<K, Void, K> {
+ public void reduce(K key, Iterable<Void> values, AvroCollector<K> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(key);
+ };
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 3) {
+ System.err.printf(
+ "Usage: %s [generic options] <input> <output> <schema-file>\n",
+ getClass().getSimpleName());
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+
+ String input = args[0];
+ String output = args[1];
+ String schemaFile = args[2];
+
+ JobConf conf = new JobConf(getConf(), getClass());
+ conf.setJobName("Avro sort");
+
+ FileInputFormat.addInputPath(conf, new Path(input));
+ FileOutputFormat.setOutputPath(conf, new Path(output));
+
+ Schema schema = Schema.parse(new File(schemaFile));
+ AvroJob.setInputSchema(conf, schema);
+ // The intermediate schema is a Pair schema whose key is the input schema
+ // and value is null
+ Schema intermediateSchema = Pair.getPairSchema(schema,
+ Schema.create(Schema.Type.NULL));
+ AvroJob.setMapOutputSchema(conf, intermediateSchema);
+ AvroJob.setOutputSchema(conf, schema);
+
+ AvroJob.setMapperClass(conf, SortMapper.class);
+ AvroJob.setReducerClass(conf, SortReducer.class);
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new AvroSort(), args);
+ System.exit(exitCode);
+ }
+
+}
96 avro/src/main/java/AvroSpecificMaxTemperature.java
View
@@ -0,0 +1,96 @@
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroCollector;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroMapper;
+import org.apache.avro.mapred.AvroReducer;
+import org.apache.avro.mapred.AvroUtf8InputFormat;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class AvroSpecificMaxTemperature extends Configured implements Tool {
+
+ public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, WeatherRecord>> {
+ private NcdcRecordParser parser = new NcdcRecordParser();
+ private WeatherRecord record = new WeatherRecord();
+ @Override
+ public void map(Utf8 line,
+ AvroCollector<Pair<Integer, WeatherRecord>> collector, Reporter reporter)
+ throws IOException {
+ parser.parse(line.toString());
+ if (parser.isValidTemperature()) {
+ record.year = parser.getYearInt();
+ record.temperature = parser.getAirTemperature();
+ record.stationId = new Utf8(parser.getStationId());
+ collector.collect(
+ new Pair<Integer, WeatherRecord>(parser.getYearInt(), record));
+ }
+ }
+ }
+
+ public static class MaxTemperatureReducer extends
+ AvroReducer<Integer, WeatherRecord, WeatherRecord> {
+
+ @Override
+ public void reduce(Integer key, Iterable<WeatherRecord> values,
+ AvroCollector<WeatherRecord> collector, Reporter reporter) throws IOException {
+ WeatherRecord max = null;
+ for (WeatherRecord value : values) {
+ if (max == null || value.temperature > max.temperature) {
+ max = newWeatherRecord(value);
+ }
+ }
+ collector.collect(max);
+ }
+ private WeatherRecord newWeatherRecord(WeatherRecord value) {
+ WeatherRecord record = new WeatherRecord();
+ record.year = value.year;
+ record.temperature = value.temperature;
+ record.stationId = value.stationId;
+ return record;
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.printf("Usage: %s [generic options] <input> <output>\n",
+ getClass().getSimpleName());
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+
+ JobConf conf = new JobConf(getConf(), getClass());
+ conf.setJobName("Max temperature");
+
+ FileInputFormat.addInputPath(conf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING));
+ AvroJob.setMapOutputSchema(conf,
+ Pair.getPairSchema(Schema.create(Schema.Type.INT), WeatherRecord.SCHEMA$));
+ AvroJob.setOutputSchema(conf, WeatherRecord.SCHEMA$);
+ conf.setInputFormat(AvroUtf8InputFormat.class);
+
+ AvroJob.setMapperClass(conf, MaxTemperatureMapper.class);
+ AvroJob.setReducerClass(conf, MaxTemperatureReducer.class);
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new AvroSpecificMaxTemperature(), args);
+ System.exit(exitCode);
+ }
+}
263 avro/src/main/java/AvroTest.java
View
@@ -0,0 +1,263 @@
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class AvroTest {
+
+ @Test
+ public void testInt() throws IOException {
+ Schema schema = Schema.parse("\"int\"");
+
+ int datum = 163;
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DatumWriter<Integer> writer = new GenericDatumWriter<Integer>(schema);
+ Encoder encoder = new BinaryEncoder(out);
+ writer.write(datum, encoder); // boxed
+ encoder.flush();
+ out.close();
+
+ DatumReader<Integer> reader = new GenericDatumReader<Integer>(schema); // have to tell it the schema - it's not in the data stream!
+ Decoder decoder = DecoderFactory.defaultFactory()
+ .createBinaryDecoder(out.toByteArray(), null /* reuse */);
+ Integer result = reader.read(null /* reuse */, decoder);
+ assertThat(result, is(163));
+
+ try {
+ reader.read(null, decoder);
+ fail("Expected EOFException");
+ } catch (EOFException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testPairGeneric() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+
+ GenericRecord datum = new GenericData.Record(schema);
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ Encoder encoder = new BinaryEncoder(out);
+ writer.write(datum, encoder);
+ encoder.flush();
+ out.close();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
+ Decoder decoder = DecoderFactory.defaultFactory()
+ .createBinaryDecoder(out.toByteArray(), null);
+ GenericRecord result = reader.read(null, decoder);
+ assertThat(result.get("left").toString(), is("L"));
+ assertThat(result.get("right").toString(), is("R"));
+ }
+
+ @Test
+ public void testPairSpecific() throws IOException {
+
+ Pair datum = new Pair();
+ datum.left = new Utf8("L");
+ datum.right = new Utf8("R");
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DatumWriter<Pair> writer = new SpecificDatumWriter<Pair>(Pair.class);
+ Encoder encoder = new BinaryEncoder(out);
+ writer.write(datum, encoder);
+ encoder.flush();
+ out.close();
+
+ DatumReader<Pair> reader = new SpecificDatumReader<Pair>(Pair.class);
+ Decoder decoder = DecoderFactory.defaultFactory()
+ .createBinaryDecoder(out.toByteArray(), null);
+ Pair result = reader.read(null, decoder);
+ assertThat(result.left.toString(), is("L"));
+ assertThat(result.right.toString(), is("R"));
+ }
+
+ @Test
+ public void testDataFile() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+
+ GenericRecord datum = new GenericData.Record(schema);
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+
+ File file = new File("data.avro");
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter =
+ new DataFileWriter<GenericRecord>(writer);
+ dataFileWriter.create(schema, file);
+ dataFileWriter.append(datum);
+ dataFileWriter.close();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+ DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader<GenericRecord>(file, reader);
+ assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));
+
+ assertThat(dataFileReader.hasNext(), is(true));
+ GenericRecord result = dataFileReader.next();
+ assertThat(result.get("left").toString(), is("L"));
+ assertThat(result.get("right").toString(), is("R"));
+ assertThat(dataFileReader.hasNext(), is(false));
+ }
+
+ @Test
+ public void testDataFileIteration() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+
+ GenericRecord datum = new GenericData.Record(schema);
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+
+ File file = new File("data.avro");
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter =
+ new DataFileWriter<GenericRecord>(writer);
+ dataFileWriter.create(schema, file);
+ dataFileWriter.append(datum);
+ datum.put("right", new Utf8("r"));
+ dataFileWriter.append(datum);
+ dataFileWriter.close();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+ DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader<GenericRecord>(file, reader);
+ assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));
+
+ }
+
+ @Test
+ public void testSchemaResolution() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+ Schema newSchema = Schema.parse(getClass().getResourceAsStream("NewPair.avsc"));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ Encoder encoder = new BinaryEncoder(out);
+ GenericRecord datum = new GenericData.Record(schema); // no description
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+ writer.write(datum, encoder);
+ encoder.flush();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, newSchema); // write schema, read schema
+ Decoder decoder = DecoderFactory.defaultFactory()
+ .createBinaryDecoder(out.toByteArray(), null);
+ GenericRecord result = reader.read(null, decoder);
+ assertThat(result.get("left").toString(), is("L"));
+ assertThat(result.get("right").toString(), is("R"));
+ assertThat(result.get("description").toString(), is(""));
+ }
+
+ @Test
+ public void testSchemaResolutionWithNull() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+ Schema newSchema = Schema.parse(getClass().getResourceAsStream("NewPairWithNull.avsc"));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ Encoder encoder = new BinaryEncoder(out);
+ GenericRecord datum = new GenericData.Record(schema); // no description
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+ writer.write(datum, encoder);
+ encoder.flush();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, newSchema); // write schema, read schema
+ Decoder decoder = DecoderFactory.defaultFactory()
+ .createBinaryDecoder(out.toByteArray(), null);
+ GenericRecord result = reader.read(null, decoder);
+ assertThat(result.get("left").toString(), is("L"));
+ assertThat(result.get("right").toString(), is("R"));
+ assertThat(result.get("description"), is((Object) null));
+ }
+
+ @Test
+ public void testIncompatibleSchemaResolution() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+ Schema newSchema = Schema.parse("{\"type\": \"array\", \"items\": \"string\"}");
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ Encoder encoder = new BinaryEncoder(out);
+ GenericRecord datum = new GenericData.Record(schema); // no description
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+ writer.write(datum, encoder);
+ encoder.flush();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, newSchema); // write schema, read schema
+ Decoder decoder = DecoderFactory.defaultFactory()
+ .createBinaryDecoder(out.toByteArray(), null);
+ try {
+ reader.read(null, decoder);
+ fail("Expected AvroTypeException");
+ } catch (AvroTypeException e) {
+ // expected
+ }
+
+ }
+
+ @Test
+ public void testSchemaResolutionWithDataFile() throws IOException {
+ Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));
+ Schema newSchema = Schema.parse(getClass().getResourceAsStream("NewPair.avsc"));
+
+ File file = new File("data.avro");
+
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter =
+ new DataFileWriter<GenericRecord>(writer);
+ dataFileWriter.create(schema, file);
+ GenericRecord datum = new GenericData.Record(schema);
+ datum.put("left", new Utf8("L"));
+ datum.put("right", new Utf8("R"));
+ dataFileWriter.append(datum);
+ dataFileWriter.close();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(null, newSchema); // specify the read schema
+ DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader<GenericRecord>(file, reader);
+ assertThat(schema, is(dataFileReader.getSchema())); // schema is the actual (write) schema
+
+ assertThat(dataFileReader.hasNext(), is(true));
+ GenericRecord result = dataFileReader.next();
+ assertThat(result.get("left").toString(), is("L"));
+ assertThat(result.get("right").toString(), is("R"));
+ assertThat(result.get("description").toString(), is(""));
+ assertThat(dataFileReader.hasNext(), is(false));
+ }
+
+ // TODO: show specific types
+
+}
90 avro/src/main/java/NcdcRecordParser.java
View
@@ -0,0 +1,90 @@
+import java.text.*;
+import java.util.Date;
+
+import org.apache.hadoop.io.Text;
+
+public class NcdcRecordParser {
+
+ private static final int MISSING_TEMPERATURE = 9999;
+
+ private static final DateFormat DATE_FORMAT =
+ new SimpleDateFormat("yyyyMMddHHmm");
+
+ private String stationId;
+ private String observationDateString;
+ private String year;
+ private String airTemperatureString;
+ private int airTemperature;
+ private boolean airTemperatureMalformed;
+ private String quality;
+
+ public void parse(String record) {
+ stationId = record.substring(4, 10) + "-" + record.substring(10, 15);
+ observationDateString = record.substring(15, 27);
+ year = record.substring(15, 19);
+ airTemperatureMalformed = false;
+ // Remove leading plus sign as parseInt doesn't like them
+ if (record.charAt(87) == '+') {
+ airTemperatureString = record.substring(88, 92);
+ airTemperature = Integer.parseInt(airTemperatureString);
+ } else if (record.charAt(87) == '-') {
+ airTemperatureString = record.substring(87, 92);
+ airTemperature = Integer.parseInt(airTemperatureString);
+ } else {
+ airTemperatureMalformed = true;
+ }
+ airTemperature = Integer.parseInt(airTemperatureString);
+ quality = record.substring(92, 93);
+ }
+
+ public void parse(Text record) {
+ parse(record.toString());
+ }
+
+ public boolean isValidTemperature() {
+ return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE
+ && quality.matches("[01459]");
+ }
+
+ public boolean isMalformedTemperature() {
+ return airTemperatureMalformed;
+ }
+
+ public boolean isMissingTemperature() {
+ return airTemperature == MISSING_TEMPERATURE;
+ }
+
+ public String getStationId() {
+ return stationId;
+ }
+
+ public Date getObservationDate() {
+ try {
+ System.out.println(observationDateString);
+ return DATE_FORMAT.parse(observationDateString);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public String getYear() {
+ return year;
+ }
+
+ public int getYearInt() {
+ return Integer.parseInt(year);
+ }
+
+ public int getAirTemperature() {
+ return airTemperature;
+ }
+
+ public String getAirTemperatureString() {
+ return airTemperatureString;
+ }
+
+ public String getQuality() {
+ return quality;
+ }
+
+}
23 avro/src/main/java/Pair.java
View
@@ -0,0 +1,23 @@
+@SuppressWarnings("all")
+/** A pair of strings. */
+public class Pair extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Pair\",\"fields\":[{\"name\":\"left\",\"type\":\"string\"},{\"name\":\"right\",\"type\":\"string\"}],\"doc\":\"A pair of strings.\"}");
+ public org.apache.avro.util.Utf8 left;
+ public org.apache.avro.util.Utf8 right;
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return left;
+ case 1: return right;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: left = (org.apache.avro.util.Utf8)value$; break;
+ case 1: right = (org.apache.avro.util.Utf8)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+}
26 avro/src/main/java/WeatherRecord.java
View
@@ -0,0 +1,26 @@
+@SuppressWarnings("all")
+/** A weather reading. */
+public class WeatherRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"WeatherRecord\",\"fields\":[{\"name\":\"year\",\"type\":\"int\"},{\"name\":\"temperature\",\"type\":\"int\"},{\"name\":\"stationId\",\"type\":\"string\"}],\"doc\":\"A weather reading.\"}");
+ public int year;
+ public int temperature;
+ public org.apache.avro.util.Utf8 stationId;
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return year;
+ case 1: return temperature;
+ case 2: return stationId;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: year = (java.lang.Integer)value$; break;
+ case 1: temperature = (java.lang.Integer)value$; break;
+ case 2: stationId = (org.apache.avro.util.Utf8)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+}
49 avro/src/main/py/test_avro.py
View
@@ -0,0 +1,49 @@
+import os
+import unittest
+from avro import schema
+from avro import io
+from avro import datafile
+
+class TestAvro(unittest.TestCase):
+
+ def test_container(self):
+ writer = open('data.avro', 'wb')
+ datum_writer = io.DatumWriter()
+ schema_object = schema.parse("""\
+{ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings.",
+ "fields": [
+ {"name": "left", "type": "string"},
+ {"name": "right", "type": "string"}
+ ]
+}
+ """)
+ dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
+ datum = {'left':'L', 'right':'R'}
+ dfw.append(datum)
+ dfw.close()
+
+ reader = open('data.avro', 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ data = []
+ for datum in dfr:
+ data.append(datum)
+
+ self.assertEquals(1, len(data));
+ self.assertEquals(datum, data[0]);
+
+ def test_write_data(self):
+ writer = open('pairs.avro', 'wb')
+ datum_writer = io.DatumWriter()
+ schema_object = schema.parse(open('/Users/tom/workspace/hadoop-book-avro/src/main/java/Pair.avsc').read())
+ dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
+ dfw.append({'left':'a', 'right':'1'})
+ dfw.append({'left':'c', 'right':'2'})
+ dfw.append({'left':'b', 'right':'3'})
+ dfw.append({'left':'b', 'right':'2'})
+ dfw.close()
+
+if __name__ == '__main__':
+ unittest.main()
28 avro/src/main/py/write_pairs.py
View
@@ -0,0 +1,28 @@
+import os
+import string
+import sys
+
+from avro import schema
+from avro import io
+from avro import datafile
+
+if __name__ == '__main__':
+ if len(sys.argv) != 2:
+ sys.exit('Usage: %s <data_file>' % sys.argv[0])
+ avro_file = sys.argv[1]
+ writer = open(avro_file, 'wb')
+ datum_writer = io.DatumWriter()
+ schema_object = schema.parse("""\
+{ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings.",
+ "fields": [
+ {"name": "left", "type": "string"},
+ {"name": "right", "type": "string"}
+ ]
+}""")
+ dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
+ for line in sys.stdin.readlines():
+ (left, right) = string.split(line.strip(), ',')
+ dfw.append({'left':left, 'right':right});
+ dfw.close()
4 avro/src/main/resources/Array.avsc
View
@@ -0,0 +1,4 @@
+{
+ "type": "array",
+ "items": "long"
+}
6 avro/src/main/resources/Enum.avsc
View
@@ -0,0 +1,6 @@
+{
+ "type": "enum",
+ "name": "Cutlery",
+ "doc": "An eating utensil.",
+ "symbols": ["KNIFE", "FORK", "SPOON"]
+}
5 avro/src/main/resources/Fixed.avsc
View
@@ -0,0 +1,5 @@
+{
+ "type": "fixed",
+ "name": "Md5Hash",
+ "size": 16
+}
4 avro/src/main/resources/Map.avsc
View
@@ -0,0 +1,4 @@
+{
+ "type": "map",
+ "values": "string"
+}
10 avro/src/main/resources/NewPair.avsc
View
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings with an added field.",
+ "fields": [
+ {"name": "left", "type": "string"},
+ {"name": "right", "type": "string"},
+ {"name": "description", "type": "string", "default": ""}
+ ]
+}
10 avro/src/main/resources/NewPairWithNull.avsc
View
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings with an added (nullable) field.",
+ "fields": [
+ {"name": "left", "type": "string"},
+ {"name": "right", "type": "string"},
+ {"name": "description", "type": ["null", "string"], "default": "null"}
+ ]
+}
9 avro/src/main/resources/Pair.avsc
View
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings.",
+ "fields": [
+ {"name": "left", "type": "string"},
+ {"name": "right", "type": "string"}
+ ]
+}
8 avro/src/main/resources/ProjectedPair.avsc
View
@@ -0,0 +1,8 @@
+{
+ "type": "record",
+ "name": "Pair",
+ "doc": "The right field of a pair of strings.",
+ "fields": [
+ {"name": "right", "type": "string"}
+ ]
+}
10 avro/src/main/resources/Record.avsc
View
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "WeatherRecord",
+ "doc": "A weather reading.",
+ "fields": [
+ {"name": "year", "type": "int"},
+ {"name": "temperature", "type": "int"},
+ {"name": "stationId", "type": "string"}
+ ]
+}
9 avro/src/main/resources/SortedPair.avsc
View
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings, sorted by right field descending.",
+ "fields": [
+ {"name": "left", "type": "string", "order": "ignore"},
+ {"name": "right", "type": "string", "order": "descending"}
+ ]
+}
9 avro/src/main/resources/SwitchedPair.avsc
View
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "name": "Pair",
+ "doc": "A pair of strings, sorted by right then left.",
+ "fields": [
+ {"name": "right", "type": "string"},
+ {"name": "left", "type": "string"}
+ ]
+}
5 avro/src/main/resources/Union.avsc
View
@@ -0,0 +1,5 @@
+[
+ "null",
+ "string",
+ {"type": "map", "values": "string"}
+]
10 avro/src/main/resources/WeatherRecord.avsc
View
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "WeatherRecord",
+ "doc": "A weather reading.",
+ "fields": [
+ {"name": "year", "type": "int"},
+ {"name": "temperature", "type": "int"},
+ {"name": "stationId", "type": "string"}
+ ]
+}
Please sign in to comment.
Something went wrong with that request. Please try again.