Permalink
Browse files

deprecating elasticsearchindex and elasticsearchjsonindex for pig in …

…favor of a unified elasticsearchstorage udf
  • Loading branch information...
1 parent 7bea9b1 commit a7a18f490ee81b951d9e2d11feca4e9df9efe7e0 @thedatachef thedatachef committed Jul 11, 2011
@@ -60,7 +60,6 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
private String idFieldName;
private String objType;
private String[] fieldNames;
- private boolean isJSON;
// Used for bookkeeping purposes
private AtomicLong totalBulkTime = new AtomicLong();
@@ -73,9 +72,7 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
private static final String ES_PLUGINS_NAME = "plugins";
private static final String ES_INDEX_NAME = "elasticsearch.index.name";
private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
- private static final String ES_IS_JSON = "elasticsearch.is_json";
private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
- private static final String ES_FIELD_NAMES = "elasticsearch.field.names";
private static final String ES_ID_FIELD = "elasticsearch.id.field";
private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
private static final String ES_CONFIG = "es.config";
@@ -113,19 +110,12 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
this.indexName = conf.get(ES_INDEX_NAME);
this.bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE));
- this.isJSON = conf.getBoolean(ES_IS_JSON, false);
- if (isJSON) {
- LOG.info("Documents will be processed as JSON documents");
- this.idFieldName = conf.get(ES_ID_FIELD_NAME);
- if (idFieldName.equals(NO_ID_FIELD)) {
- LOG.info("Documents will be assigned ids by elasticsearch");
- this.idField = -1;
- } else {
- LOG.info("Using field:["+idFieldName+"] for document ids");
- }
+ this.idFieldName = conf.get(ES_ID_FIELD_NAME);
+ if (idFieldName.equals(NO_ID_FIELD)) {
+ LOG.info("Documents will be assigned ids by elasticsearch");
+ this.idField = -1;
} else {
- this.fieldNames = conf.get(ES_FIELD_NAMES).split(COMMA);
- this.idField = Integer.parseInt(conf.get(ES_ID_FIELD));
+ LOG.info("Using field:["+idFieldName+"] for document ids");
}
this.objType = conf.get(ES_OBJECT_TYPE);
@@ -181,12 +171,7 @@ public void write(NullWritable key, MapWritable fields) throws IOException {
currentRequest.add(Requests.indexRequest(indexName).type(objType).source(builder));
} else {
try {
- Text mapKey = null;
- if (isJSON) {
- mapKey = new Text(idFieldName);
- } else {
- mapKey = new Text(fieldNames[idField]);
- }
+ Text mapKey = new Text(idFieldName);
String record_id = fields.get(mapKey).toString();
currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder));
} catch (Exception e) {
@@ -0,0 +1,312 @@
+package com.infochimps.elasticsearch.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.io.*;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+
+import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
+import com.infochimps.elasticsearch.ElasticSearchInputFormat;
+import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
+
+public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface {
+
+ private String contextSignature = null;
+ private RecordReader reader;
+ protected RecordWriter writer = null;
+ protected ObjectMapper mapper = new ObjectMapper();
+ protected String esConfig;
+ protected String esPlugins;
+
+ // For hadoop configuration
+ private static final String ES_INDEX_NAME = "elasticsearch.index.name";
+ private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
+ private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
+ private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
+ private static final String ES_IS_JSON = "elasticsearch.is_json";
+ private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names";
+
+ private static final String COMMA = ",";
+ private static final String LOCAL_SCHEME = "file://";
+ private static final String DEFAULT_BULK = "1000";
+ private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
+ private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
+ private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
+ private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";
+
+ public ElasticSearchStorage() {
+ this(DEFAULT_ES_CONFIG, DEFAULT_ES_PLUGINS);
+ }
+
+ public ElasticSearchStorage(String esConfig) {
+ this(esConfig, DEFAULT_ES_PLUGINS);
+ }
+
+ public ElasticSearchStorage(String esConfig, String esPlugins) {
+ this.esConfig = esConfig;
+ this.esPlugins = esPlugins;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ return null;
+ }
+
+ @Override
+ public InputFormat getInputFormat() {
+ return new ElasticSearchInputFormat();
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new ElasticSearchOutputFormat();
+ }
+
+ /**
+ Here we set the field names for a given tuple even if we
+ */
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+ String fieldNames = "";
+ for (String field : s.fieldNames()) {
+ fieldNames += field;
+ fieldNames += COMMA;
+ }
+ property.setProperty(PIG_ES_FIELD_NAMES, fieldNames);
+ }
+
+ // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
+ @Override
+ public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
+ this.writer = writer;
+ }
+
+ /**
+ Here we handle both the delimited record case and the json case.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple t) throws IOException {
+
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+ MapWritable record = new MapWritable();
+
+ String isJson = property.getProperty(ES_IS_JSON);
+ // Handle delimited records (ie. isJson == false)
+ if (isJson != null && isJson.equals("false")) {
+ String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES).split(COMMA);
+ for (int i = 0; i < t.size(); i++) {
+ if (i < fieldNames.length) {
+ try {
+ record.put(new Text(fieldNames[i]), new Text(t.get(i).toString()));
+ } catch (NullPointerException e) {
+ //LOG.info("Increment null field counter.");
+ }
+ }
+ }
+ } else {
+ if (!t.isNull(0)) {
+ String jsonData = t.get(0).toString();
+ // parse json data and put into mapwritable record
+ try {
+ HashMap<String,Object> data = mapper.readValue(jsonData, HashMap.class);
+ record = (MapWritable)toWritable(data);
+ } catch (JsonParseException e) {
+ e.printStackTrace();
+ } catch (JsonMappingException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+
+ try {
+ writer.write(NullWritable.get(), record);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+
+ /**
+ Look at the passed in uri and hadoop configuration and set options.
+ <p>
+ <b>WARNING</b> Note that, since this is called more than once, it is
+ critical to ensure that we do not change or reset anything we've already set.
+ */
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ // Need to use the uri parsing library here to pull out everything
+ try {
+
+ // Parse the passed in location URI, pulling out the arguments as well
+ URI parsedLocation = new URI(location);
+ HashMap<String, String> query = parseURIQuery(parsedLocation.getQuery());
+
+ String esHost = location.substring(5).split("/")[0];
+ if (esHost==null) {
+ throw new RuntimeException("Missing elasticsearch index name, URI must be formatted as es://<index_name>/<object_type>?<params>");
+ }
+
+ if (parsedLocation.getPath()==null) {
+ throw new RuntimeException("Missing elasticsearch object type, URI must be formatted as es://<index_name>/<object_type>?<params>");
+ }
+
+ Configuration conf = job.getConfiguration();
+ if (conf.get(ES_INDEX_NAME) == null) {
+
+ // Set elasticsearch index and object type in the Hadoop configuration
+ job.getConfiguration().set(ES_INDEX_NAME, esHost);
+ job.getConfiguration().set(ES_OBJECT_TYPE, parsedLocation.getPath().replaceAll("/", ""));
+
+ // Set the bulk request size in the Hadoop configuration
+ String bulkSize = query.get("size");
+ if (bulkSize == null) bulkSize = DEFAULT_BULK;
+ job.getConfiguration().set(ES_BULK_SIZE, bulkSize);
+
+ // Set the id field name in the Hadoop configuration
+ String idFieldName = query.get("id");
+ if (idFieldName == null) idFieldName = "-1";
+ job.getConfiguration().set(ES_ID_FIELD_NAME, idFieldName);
+
+ // Adds the elasticsearch.yml file (esConfig) and the plugins directory (esPlugins) to the distributed cache
+ try {
+ Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
+ Path hdfsPluginsPath = new Path(ES_PLUGINS_HDFS_PATH);
+
+ HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME+esConfig), hdfsConfigPath, job.getConfiguration());
+ HadoopUtils.shipFileIfNotShipped(hdfsConfigPath, job.getConfiguration());
+
+ HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME+esPlugins), hdfsPluginsPath, job.getConfiguration());
+ HadoopUtils.shipArchiveIfNotShipped(hdfsPluginsPath, job.getConfiguration());
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ String isJson = query.get("json");
+ if (isJson==null || isJson.equals("false")) {
+ // We're dealing with delimited records
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+ property.setProperty(ES_IS_JSON, "false");
+ }
+
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ Given a URI query string, eg. "foo=bar&happy=true" returns
+ a hashmap ({'foo' => 'bar', 'happy' => 'true'})
+ */
+ private HashMap<String, String> parseURIQuery(String query) {
+ HashMap<String, String> argMap = new HashMap<String, String>();
+ if (query != null) {
+ String[] pairs = query.split("&");
+ for (String pair : pairs) {
+ String[] splitPair = pair.split("=");
+ argMap.put(splitPair[0], splitPair[1]);
+ }
+ }
+ return argMap;
+ }
+
+ /**
+ Recursively converts an arbitrary object into the appropriate writable. Please enlighten me if there is an existing
+ method for doing this.
+ */
+ private Writable toWritable(Object thing) {
+ if (thing instanceof String) {
+ return new Text((String)thing);
+ } else if (thing instanceof Long) {
+ return new LongWritable((Long)thing);
+ } else if (thing instanceof Integer) {
+ return new IntWritable((Integer)thing);
+ } else if (thing instanceof Double) {
+ return new DoubleWritable((Double)thing);
+ } else if (thing instanceof Float) {
+ return new FloatWritable((Float)thing);
+ } else if (thing instanceof Map) {
+ MapWritable result = new MapWritable();
+ for (Map.Entry<String,Object> entry : ((Map<String,Object>)thing).entrySet()) {
+ result.put(new Text(entry.getKey().toString()), toWritable(entry.getValue()));
+ }
+ return result;
+ } else if (thing instanceof List) {
+ if (((List)thing).size() > 0) {
+ Object first = ((List)thing).get(0);
+ Writable[] listOfThings = new Writable[((List)thing).size()];
+ for (int i = 0; i < listOfThings.length; i++) {
+ listOfThings[i] = toWritable(((List)thing).get(i));
+ }
+ return new ArrayWritable(toWritable(first).getClass(), listOfThings);
+ }
+ }
+ return NullWritable.get();
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ }
+}

0 comments on commit a7a18f4

Please sign in to comment.