Permalink
Browse files

added ability to index arbitrary json documents, added test directory…

… with example data and pig scripts for both types of indexers
  • Loading branch information...
1 parent 9fe5e7d commit 74e68fdf12bca62b04a91e62cf88be9add44513b @thedatachef thedatachef committed May 11, 2011
View
@@ -1,17 +0,0 @@
-register build/wonderdog.jar
-register /usr/local/share/elasticsearch/lib/elasticsearch-0.14.2.jar
-register /usr/local/share/elasticsearch/lib/jline-0.9.94.jar
-register /usr/local/share/elasticsearch/lib/jna-3.2.7.jar
-register /usr/local/share/elasticsearch/lib/log4j-1.2.15.jar
-register /usr/local/share/elasticsearch/lib/lucene-analyzers-3.0.3.jar
-register /usr/local/share/elasticsearch/lib/lucene-core-3.0.3.jar
-register /usr/local/share/elasticsearch/lib/lucene-fast-vector-highlighter-3.0.3.jar
-register /usr/local/share/elasticsearch/lib/lucene-highlighter-3.0.3.jar
-register /usr/local/share/elasticsearch/lib/lucene-memory-3.0.3.jar
-register /usr/local/share/elasticsearch/lib/lucene-queries-3.0.3.jar
-
-%default INDEX 'ufo_sightings'
-%default OBJ 'ufo_sighting'
-
-ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);
-STORE ufo_sightings INTO 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchIndex('-1', '1000');
@@ -36,7 +36,7 @@
/**
- Output format for writing hashmaps into elasticsearch. Records are batched up and sent
+ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) into Elasticsearch. Records are batched up and sent
in a one-hop manner to the elastic search data nodes that will index them.
*/
@@ -62,34 +62,73 @@
private AtomicLong totalBulkItems = new AtomicLong();
private Random randgen = new Random();
private long runStartTime = System.currentTimeMillis();
+
+ // For hadoop configuration
+ private static final String ES_INDEX_NAME = "elasticsearch.index.name";
+ private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
+ private static final String ES_IS_JSON = "elasticsearch.is_json";
+ private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
+ private static final String ES_FIELD_NAMES = "elasticsearch.field.names";
+ private static final String ES_ID_FIELD = "elasticsearch.id.field";
+ private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
+ private static final String ES_CONFIG = "elasticsearch.config";
+ private static final String ES_PLUGINS = "elasticsearch.plugins.dir";
+
+ // Other string constants
+ private static final String COMMA = ",";
+ private static final String NO_ID_FIELD = "-1";
private volatile BulkRequestBuilder currentRequest;
-
+
+ /**
+ Instantiates a new RecordWriter for Elasticsearch
+ <p>
+ The properties that <b>MUST</b> be set in the hadoop Configuration object
+ are as follows:
+ <ul>
+ <li><b>elasticsearch.index.name</b> - The name of the elasticsearch index data will be written to. It does not have to exist ahead of time</li>
+ <li><b>elasticsearch.bulk.size</b> - The number of records to be accumulated into a bulk request before writing to elasticsearch.</li>
+ <li><b>elasticsearch.is_json</b> - A boolean indicating whether the records to be indexed are json records. If false the records are assumed to be tsv, in which case <b>elasticsearch.field.names</b> must be set and contain a comma separated list of field names</li>
+ <li><b>elasticsearch.object.type</b> - The type of objects being indexed</li>
+ <li><b>elasticsearch.config</b> - The full path the elasticsearch.yml. It is a local path and must exist on all machines in the hadoop cluster.</li>
+ <li><b>elasticsearch.plugins.dir</b> - The full path the elasticsearch plugins directory. It is a local path and must exist on all machines in the hadoop cluster.</li>
+ </ul>
+ <p>
+ The following fields depend on whether <b>elasticsearch.is_json</b> is true or false.
+ <ul>
+ <li><b>elasticsearch.id.field.name</b> - When <b>elasticsearch.is_json</b> is true, this is the name of a field in the json document that contains the document's id. If -1 is used then the document is assumed to have no id and one is assigned to it by elasticsearch.</li>
+ <li><b>elasticsearch.field.names</b> - When <b>elasticsearch.is_json</b> is false, this is a comma separated list of field names.</li>
+ <li><b>elasticsearch.id.field</b> - When <b>elasticsearch.is_json</b> is false, this is the numeric index of the field to use as the document id. If -1 is used the document is assumed to have no id and one is assigned to it by elasticsearch.</li>
+ </ul>
+ */
public ElasticSearchRecordWriter(TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
- this.indexName = conf.get("elasticsearch.index.name");
- this.bulkSize = Integer.parseInt(conf.get("elasticsearch.bulk.size"));
- this.isJSON = conf.getBoolean("elasticsearch.is_json", false);
+ this.indexName = conf.get(ES_INDEX_NAME);
+ this.bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE));
+ this.isJSON = conf.getBoolean(ES_IS_JSON, false);
if (isJSON) {
LOG.info("Documents will be processed as JSON documents");
- this.idFieldName = conf.get("elasticsearch.id.field.name");
- if (idFieldName == null) {
+ this.idFieldName = conf.get(ES_ID_FIELD_NAME);
+ if (idFieldName.equals(NO_ID_FIELD)) {
+ LOG.info("Documents will be assigned ids by elasticsearch");
this.idField = -1;
+ } else {
+ LOG.info("Using field:["+idFieldName+"] for document ids");
}
} else {
- this.fieldNames = conf.get("elasticsearch.field.names").split(",");
- this.idField = Integer.parseInt(conf.get("elasticsearch.id.field"));
+ this.fieldNames = conf.get(ES_FIELD_NAMES).split(COMMA);
+ this.idField = Integer.parseInt(conf.get(ES_ID_FIELD));
}
- this.objType = conf.get("elasticsearch.object.type");
- System.setProperty("es.config",conf.get("elasticsearch.config"));
- System.setProperty("es.path.plugins",conf.get("elasticsearch.plugins.dir"));
+ this.objType = conf.get(ES_OBJECT_TYPE);
+ System.setProperty("es.config",conf.get(ES_CONFIG));
+ System.setProperty("es.path.plugins",conf.get(ES_PLUGINS));
start_embedded_client();
initialize_index(indexName);
currentRequest = client.prepareBulk();
}
/**
- Need to index any remaining content.
+ Closes the connection to elasticsearch. Any documents remaining in the bulkRequest object are indexed.
*/
public void close(TaskAttemptContext context) throws IOException {
if (currentRequest.numberOfActions() > 0) {
@@ -102,21 +141,20 @@ public void close(TaskAttemptContext context) throws IOException {
}
LOG.info("Closing record writer");
client.close();
- // LOG.info("Client is closed")
- // if (node != null) {
- // node.close();
- // }
+ LOG.info("Client is closed");
+ if (node != null) {
+ node.close();
+ }
LOG.info("Record writer closed.");
}
+ /**
+ Writes a single MapWritable record to the bulkRequest object. Once <b>elasticsearch.bulk.size</b> are accumulated the
+ records are written to elasticsearch.
+ */
public void write(NullWritable key, MapWritable fields) throws IOException {
- XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
- for (Map.Entry<Writable, Writable> entry : fields.entrySet()) {
- String name = entry.getKey().toString();
- String value = entry.getValue().toString();
- builder.field(name, value);
- }
- builder.endObject();
+ XContentBuilder builder = XContentFactory.jsonBuilder();
+ buildContent(builder, fields);
if (idField == -1) {
// Document has no inherent id
currentRequest.add(Requests.indexRequest(indexName).type(objType).source(builder));
@@ -128,44 +166,51 @@ public void write(NullWritable key, MapWritable fields) throws IOException {
} else {
mapKey = new Text(fieldNames[idField]);
}
- LOG.info("map key ["+mapKey+"]");
String record_id = fields.get(mapKey).toString();
- LOG.info("record id ["+record_id+"]");
currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder));
} catch (Exception e) {
- LOG.info("Increment bad record counter");
+ LOG.warn("Encountered malformed record");
}
}
processBulkIfNeeded();
}
- private void buildContent(XContentBuilder builder, String key, Writable value) throws IOException {
+ /**
+ Recursively untangles the MapWritable and writes the fields into elasticsearch's XContentBuilder builder.
+ */
+ private void buildContent(XContentBuilder builder, Writable value) throws IOException {
if (value instanceof Text) {
- builder.field(key, ((Text)value).toString());
+ builder.value(((Text)value).toString());
} else if (value instanceof LongWritable) {
- builder.field(key, ((LongWritable)value).get());
+ builder.value(((LongWritable)value).get());
} else if (value instanceof IntWritable) {
- builder.field(key, ((IntWritable)value).get());
+ builder.value(((IntWritable)value).get());
} else if (value instanceof DoubleWritable) {
- builder.field(key, ((DoubleWritable)value).get());
+ builder.value(((DoubleWritable)value).get());
} else if (value instanceof FloatWritable) {
- builder.field(key, ((FloatWritable)value).get());
+ builder.value(((FloatWritable)value).get());
} else if (value instanceof MapWritable) {
builder.startObject();
for (Map.Entry<Writable,Writable> entry : ((MapWritable)value).entrySet()) {
- buildContent(builder, entry.getKey().toString(), entry.getValue());
+ if (!(entry.getValue() instanceof NullWritable)) {
+ builder.field(entry.getKey().toString());
+ buildContent(builder, entry.getValue());
+ }
}
builder.endObject();
} else if (value instanceof ArrayWritable) {
builder.startArray();
Writable[] arrayOfThings = ((ArrayWritable)value).get();
for (int i = 0; i < arrayOfThings.length; i++) {
- Writable thing = arrayOfThings[i];
+ buildContent(builder, arrayOfThings[i]);
}
builder.endArray();
- }
+ }
}
+ /**
+ Indexes content to elasticsearch when <b>elasticsearch.bulk.size</b> records have been accumulated.
+ */
private void processBulkIfNeeded() {
totalBulkItems.incrementAndGet();
if (currentRequest.numberOfActions() >= bulkSize) {
@@ -69,16 +69,36 @@
protected String esConfig;
protected String esPlugins;
+ // For hadoop configuration
+ private static final String ES_INDEX_NAME = "elasticsearch.index.name";
+ private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
+ private static final String ES_IS_JSON = "elasticsearch.is_json";
+ private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
+ private static final String ES_FIELD_NAMES = "elasticsearch.field.names";
+ private static final String ES_ID_FIELD = "elasticsearch.id.field";
+ private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
+ private static final String ES_CONFIG = "elasticsearch.config";
+ private static final String ES_PLUGINS = "elasticsearch.plugins.dir";
+ private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names";
+
+ // Other string constants
+ private static final String SLASH = "/";
+ private static final String COMMA = ",";
+ private static final String NO_ID_FIELD = "-1";
+ private static final String DEFAULT_BULK = "1000";
+ private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
+ private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
+
public ElasticSearchIndex() {
- this("-1", "1000");
+ this(NO_ID_FIELD, DEFAULT_BULK);
}
public ElasticSearchIndex(String idField, String bulkSize) {
- this(idField, bulkSize, "/etc/elasticsearch/elasticsearch.yml");
+ this(idField, bulkSize, DEFAULT_ES_CONFIG);
}
public ElasticSearchIndex(String idField, String bulkSize, String esConfig) {
- this(idField, bulkSize, esConfig, "/usr/local/share/elasticsearch/plugins");
+ this(idField, bulkSize, esConfig, DEFAULT_ES_PLUGINS);
}
public ElasticSearchIndex(String idField, String bulkSize, String esConfig, String esPlugins) {
@@ -98,9 +118,9 @@ public void checkSchema(ResourceSchema s) throws IOException {
String fieldNames = "";
for (String field : s.fieldNames()) {
fieldNames += field;
- fieldNames += ",";
+ fieldNames += COMMA;
}
- property.setProperty("elasticsearch.pig.field.names", fieldNames);
+ property.setProperty(PIG_ES_FIELD_NAMES, fieldNames);
}
/**
@@ -110,27 +130,27 @@ public void checkSchema(ResourceSchema s) throws IOException {
*/
@Override
public void setStoreLocation(String location, Job job) throws IOException {
- String[] es_store = location.substring(5).split("/");
+ String[] es_store = location.substring(5).split(SLASH);
if (es_store.length != 2) {
throw new RuntimeException("Please specify a valid elasticsearch index, eg. es://myindex/myobj");
}
Configuration conf = job.getConfiguration();
// Only set if we haven't already
- if (conf.get("elasticsearch.index.name") == null) {
+ if (conf.get(ES_INDEX_NAME) == null) {
try {
- job.getConfiguration().set("elasticsearch.index.name", es_store[0]);
- job.getConfiguration().set("elasticsearch.object.type", es_store[1]);
+ job.getConfiguration().set(ES_INDEX_NAME, es_store[0]);
+ job.getConfiguration().set(ES_OBJECT_TYPE, es_store[1]);
} catch (ArrayIndexOutOfBoundsException e) {
throw new RuntimeException("You must specify both an index and an object type.");
}
- job.getConfiguration().setBoolean("elasticsearch.is_json", false);
- job.getConfiguration().set("elasticsearch.bulk.size", bulkSize);
- job.getConfiguration().set("elasticsearch.id.field", idField);
- job.getConfiguration().set("elasticsearch.config", esConfig);
- job.getConfiguration().set("elasticsearch.plugins.dir", esPlugins);
+ job.getConfiguration().setBoolean(ES_IS_JSON, false);
+ job.getConfiguration().set(ES_BULK_SIZE, bulkSize);
+ job.getConfiguration().set(ES_ID_FIELD, idField);
+ job.getConfiguration().set(ES_CONFIG, esConfig);
+ job.getConfiguration().set(ES_PLUGINS, esPlugins);
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(ResourceSchema.class);
- job.getConfiguration().set("elasticsearch.field.names", property.getProperty("elasticsearch.pig.field.names"));
+ job.getConfiguration().set(ES_FIELD_NAMES, property.getProperty(PIG_ES_FIELD_NAMES));
}
}
@@ -154,7 +174,7 @@ public void putNext(Tuple t) throws IOException {
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(ResourceSchema.class);
MapWritable record = new MapWritable();
- String[] fieldNames = property.getProperty("elasticsearch.pig.field.names").split(",");
+ String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES).split(COMMA);
for (int i = 0; i < t.size(); i++) {
if (i < fieldNames.length) {
try {
Oops, something went wrong. Retry.

0 comments on commit 74e68fd

Please sign in to comment.