Skip to content

Commit

Permalink
some clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 4, 2012
1 parent 65e8bee commit 29d370b
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 20 deletions.
29 changes: 29 additions & 0 deletions config/single_node_cluster/config/stores.xml
Expand Up @@ -15,4 +15,33 @@
<type>string</type>
</value-serializer>
</store>
<store>
<name>avro-test</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners> harry@hogwarts.edu, hermoine@hogwarts.edu </owners>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>avro-generic</type>
<schema-info version="0">"int"</schema-info>
</key-serializer>
<value-serializer>
<type>avro-generic</type>
<schema-info version="0">
{
"type": "record",
"name": "myrec",

"fields": [
{ "name": "original", "type": "string" },
{ "name": "new-field", "type": [ "string" , "null"], "default":"null" }

]
}
</schema-info>
</value-serializer>
</store>
</stores>
Expand Up @@ -26,6 +26,8 @@
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

// The default Voldemort keyvalue writer
// generates index and data files
public class HadoopStoreWriter implements KeyValueWriter<BytesWritable, BytesWritable> {

private static final Logger logger = Logger.getLogger(HadoopStoreWriter.class);
Expand Down
Expand Up @@ -6,6 +6,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;

// Interface used by reducers to layout the datqa on disk
public interface KeyValueWriter<K, V> {

public static enum CollisionCounter {
Expand Down
Expand Up @@ -35,6 +35,9 @@
import azkaban.common.utils.Props;
import azkaban.common.utils.Utils;

// Avro container files are not sequence input format files
// they contain records instead of k/v pairs
// to consume these files we use the AvroMapper
public class AvroStoreBuilderMapper extends
AvroMapper<GenericData.Record, Pair<ByteBuffer, ByteBuffer>> implements JobConfigurable {

Expand Down
Expand Up @@ -23,6 +23,9 @@
public class AvroStoreBuilderReducer implements
Reducer<AvroKey<ByteBuffer>, AvroValue<ByteBuffer>, Text, Text>, JobConfigurable, Closeable {

// The Class implementing the keyvaluewriter
// this provides a pluggable mechanism for generating your own on disk
// format for the data and index files
String keyValueWriterClass;
@SuppressWarnings("rawtypes")
KeyValueWriter writer;
Expand Down
Expand Up @@ -582,7 +582,7 @@ public void buildAvro() {

conf.setSpeculativeExecution(false);

System.out.println(config.get("avro.rec.schema"));
// AvroJob confs for the avro mapper
AvroJob.setInputSchema(conf, Schema.parse(config.get("avro.rec.schema")));

AvroJob.setOutputSchema(conf,
Expand Down
Expand Up @@ -64,6 +64,7 @@ public class VoldemortBuildAndPushJob extends AbstractJob {

private final List<String> dataDirs;

// Reads from properties to check if this takes Avro input
private final boolean isAvroJob;

private final String keyField;
Expand Down Expand Up @@ -112,8 +113,6 @@ public VoldemortBuildAndPushJob(String name, Props props) {

valueField = props.getString("avro.value.field", "value");

System.out.println("keyfield = " + keyField);
log.info("valueField = " + valueField);
}

@Override
Expand Down Expand Up @@ -495,36 +494,38 @@ private Path getInputPath() throws IOException {
return HadoopUtils.getSanitizedPath(path);
}

// Get the schema for the Avro Record from the object container file
public String getRecordSchema() throws IOException {
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());

// schema.getField("key").schema();
String recSchema = schema.toString();

return recSchema;

}

// Extract schema of the key field
public String getKeySchema() throws IOException {
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());

// schema.getField("key").schema();
String keySchema = schema.getField(keyField).schema().toString();

return keySchema;

}

// Extract schema of the value field
public String getValueSchema() throws IOException {
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());

// schema.getField("key").schema();
String valueSchema = schema.getField(valueField).schema().toString();

return valueSchema;

}

// Verify if the new avro schema being pushed is the same one as the old one
// Does not have logic to check for Avro schema evolution yet
public void verifyAvroSchema(String url) throws Exception {
// create new n store def with schema from the metadata in the input
// path
Expand All @@ -534,9 +535,7 @@ public void verifyAvroSchema(String url) throws Exception {
int requiredWrites = props.getInt("build.required.writes", 1);
String description = props.getString("push.store.description", "");
String owners = props.getString("push.store.owners", "");
// String keySchema = schema.getField("key").schema();

// schema.getField("key").schema();
String keySchema = "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(keyField).schema() + "</schema-info>\n\t";
String valSchema = "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
Expand Down Expand Up @@ -595,17 +594,7 @@ public void verifyAvroSchema(String url) throws Exception {
// want to push, we need
// to worry
if(!remoteStoreDef.equals(newStoreDef)) {
// it is possible that the stores actually DO match, but
// the
// json in the key/value serializers is out of order (eg
// {'a': 'int32', 'b': 'int32'} could have a/b reversed.
// this is just a reflection of the fact that voldemort
// json
// type defs use hashmaps that are unordered, and pig
// uses
// bags that are unordered as well. it's therefore
// unpredictable what order the keys will come out of
// pig.

// let's check to see if the key/value serializers are
// REALLY equal.
SerializerDefinition localKeySerializerDef = newStoreDef.getKeySerializer();
Expand Down
Expand Up @@ -148,7 +148,7 @@ public VoldemortStoreBuilderConf(JobConf configuration, Props props) throws Exce
props.getInt("num.chunks", -1));
}

// new constructor to include the key val and record schema
// new constructor to include the key val and record schema for Avro job

public VoldemortStoreBuilderConf(int replicationFactor,
int chunkSize,
Expand Down Expand Up @@ -340,6 +340,9 @@ public void setKeyField(String keyField) {
@Override
public void run() throws Exception {
JobConf configuration = this.createJobConf(VoldemortStoreBuilderMapper.class);

// Only if its a avro job we supply some additional fields
// for the key value schema of the avro record
if(isAvro) {
String recSchema = conf.getRecSchema();
String keySchema = conf.getKeySchema();
Expand Down

0 comments on commit 29d370b

Please sign in to comment.