Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ElasticSearchStorage for pig can now **read** data from elasticsearch…

… (and allows for lucene style free text queries) and can write to elasticsearch (both json and delimited records)
  • Loading branch information...
commit 62380cd0938d9acb9a64a632478d1702922fa420 1 parent a7a18f4
@thedatachef thedatachef authored
View
27 src/main/java/com/infochimps/elasticsearch/ElasticSearchInputFormat.java
@@ -1,6 +1,7 @@
package com.infochimps.elasticsearch;
import java.io.IOException;
+import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
@@ -73,10 +74,11 @@ is a json string of the (source) record contents.
The number of splits is specified in the Hadoop configuration object.
*/
public List<InputSplit> getSplits(JobContext context) {
-
+ setConf(context.getConfiguration());
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits.intValue());
for(int i = 0; i < numSplits; i++) {
- splits.add(new ElasticSearchSplit(queryString, i*numSplitRecords, numSplitRecords-1));
+ Long size = (numSplitRecords == 1) ? 1 : numSplitRecords-1;
+ splits.add(new ElasticSearchSplit(queryString, i*numSplitRecords, size));
}
if (numHits % numSplits > 0) splits.add(new ElasticSearchSplit(queryString, numSplits*numSplitRecords, numHits % numSplits - 1));
LOG.info("Created ["+splits.size()+"] splits for ["+numHits+"] hits");
@@ -132,8 +134,8 @@ private void initiate_search() {
.execute()
.actionGet();
this.numHits = response.hits().totalHits();
- this.numSplitRecords = (numHits/numSplits);
if(numSplits > numHits) numSplits = numHits; // This could be bad
+ this.numSplitRecords = (numHits/numSplits);
}
protected class ElasticSearchRecordReader extends RecordReader<Text, Text> {
@@ -203,22 +205,24 @@ private void start_embedded_client() {
.setSize(recsToRead.intValue())
.setQuery(QueryBuilders.queryString(queryString))
.execute()
- .actionGet();
+ .actionGet();
return response.hits().iterator();
}
@Override
public boolean nextKeyValue() throws IOException {
if (hitsItr!=null) {
- if (recordsRead < recsToRead && hitsItr.hasNext()) {
- SearchHit hit = hitsItr.next();
- currentKey = new Text(hit.id());
- currentValue = new Text(hit.sourceAsString());
- recordsRead += 1;
+ if (recordsRead < recsToRead) {
+ if (hitsItr.hasNext()) {
+ SearchHit hit = hitsItr.next();
+ currentKey = new Text(hit.id());
+ currentValue = new Text(hit.sourceAsString());
+ recordsRead += 1;
+ return true;
+ }
} else {
hitsItr = null;
}
- return true;
} else {
if (recordsRead < recsToRead) {
hitsItr = fetchNextHits();
@@ -229,10 +233,9 @@ public boolean nextKeyValue() throws IOException {
recordsRead += 1;
return true;
}
- return false;
}
- return false;
}
+ return false;
}
@Override
View
79 src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java
@@ -1,6 +1,7 @@
package com.infochimps.elasticsearch.pig;
import java.io.IOException;
+import java.lang.InterruptedException;
import java.util.Properties;
import java.util.List;
import java.util.Map;
@@ -29,7 +30,9 @@
import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
import com.infochimps.elasticsearch.ElasticSearchInputFormat;
@@ -51,6 +54,9 @@
private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
private static final String ES_IS_JSON = "elasticsearch.is_json";
private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names";
+ private static final String ES_REQUEST_SIZE = "elasticsearch.request.size";
+ private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits";
+ private static final String ES_QUERY_STRING = "elasticsearch.query.string";
private static final String COMMA = ",";
private static final String LOCAL_SCHEME = "file://";
@@ -59,6 +65,8 @@
private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";
+ private static final String ES_CONFIG = "es.config";
+ private static final String ES_PLUGINS = "es.path.plugins";
public ElasticSearchStorage() {
this(DEFAULT_ES_CONFIG, DEFAULT_ES_PLUGINS);
@@ -74,7 +82,19 @@ public ElasticSearchStorage(String esConfig, String esPlugins) {
}
@Override
- public Tuple getNext() throws IOException {
+ public Tuple getNext() throws IOException {
+ try {
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ if (reader.nextKeyValue()) {
+ Text docId = (Text)reader.getCurrentKey();
+ Text docContent = (Text)reader.getCurrentValue();
+ tuple.set(0, new DataByteArray(docId.toString()));
+ tuple.set(1, new DataByteArray(docContent.toString()));
+ return tuple;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
return null;
}
@@ -95,6 +115,7 @@ public void setUDFContextSignature(String signature) {
@Override
public void setLocation(String location, Job job) throws IOException {
+ elasticSearchSetup(location, job);
}
@Override
@@ -171,9 +192,7 @@ public void putNext(Tuple t) throws IOException {
}
}
}
-
-
-
+
try {
writer.write(NullWritable.get(), record);
} catch (InterruptedException e) {
@@ -187,13 +206,9 @@ public void setStoreFuncUDFContextSignature(String signature) {
}
/**
- Look at the passed in uri and hadoop configuration and set options.
- <p>
- <b>WARNING</b> Note that, since this is called more than once, it is
- critical to ensure that we do not change or reset anything we've already set.
- */
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
+ Pull out the elasticsearch setup code
+ */
+ private void elasticSearchSetup(String location, Job job) {
// Need to use the uri parsing library here to pull out everything
try {
@@ -217,16 +232,25 @@ public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set(ES_INDEX_NAME, esHost);
job.getConfiguration().set(ES_OBJECT_TYPE, parsedLocation.getPath().replaceAll("/", ""));
- // Set the bulk request size in the Hadoop configuration
- String bulkSize = query.get("size");
- if (bulkSize == null) bulkSize = DEFAULT_BULK;
- job.getConfiguration().set(ES_BULK_SIZE, bulkSize);
-
+ // Set the request size in the Hadoop configuration
+ String requestSize = query.get("size");
+ if (requestSize == null) requestSize = DEFAULT_BULK;
+ job.getConfiguration().set(ES_BULK_SIZE, requestSize);
+ job.getConfiguration().set(ES_REQUEST_SIZE, requestSize);
+
// Set the id field name in the Hadoop configuration
String idFieldName = query.get("id");
if (idFieldName == null) idFieldName = "-1";
job.getConfiguration().set(ES_ID_FIELD_NAME, idFieldName);
-
+
+ String queryString = query.get("q");
+ if (queryString==null) queryString = "*";
+ job.getConfiguration().set(ES_QUERY_STRING, queryString);
+
+ String numTasks = query.get("tasks");
+ if (numTasks==null) numTasks = "100";
+ job.getConfiguration().set(ES_NUM_SPLITS, numTasks);
+
// Adds the elasticsearch.yml file (esConfig) and the plugins directory (esPlugins) to the distributed cache
try {
Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
@@ -242,6 +266,9 @@ public void setStoreLocation(String location, Job job) throws IOException {
throw new RuntimeException(e);
}
+ //
+ // This gets set even when loading data from elasticsearch
+ //
String isJson = query.get("json");
if (isJson==null || isJson.equals("false")) {
// We're dealing with delimited records
@@ -249,11 +276,25 @@ public void setStoreLocation(String location, Job job) throws IOException {
Properties property = context.getUDFProperties(ResourceSchema.class);
property.setProperty(ES_IS_JSON, "false");
}
-
+
+ // Need to set this to start the local instance of elasticsearch
+ job.getConfiguration().set(ES_CONFIG, esConfig);
+ job.getConfiguration().set(ES_PLUGINS, esPlugins);
}
} catch (URISyntaxException e) {
throw new RuntimeException(e);
- }
+ }
+ }
+
+ /**
+ Look at the passed in uri and hadoop configuration and set options.
+ <p>
+ <b>WARNING</b> Note that, since this is called more than once, it is
+ critical to ensure that we do not change or reset anything we've already set.
+ */
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ elasticSearchSetup(location, job);
}
/**
Please sign in to comment.
Something went wrong with that request. Please try again.