Skip to content

Commit

Permalink
ElasticSearchStorage for pig can now **read** data from elasticsearch…
Browse files Browse the repository at this point in the history
… (and allows for lucene style free text queries) and can write to elasticsearch (both json and delimited records)
  • Loading branch information
Jacob Perkins committed Jul 11, 2011
1 parent a7a18f4 commit 62380cd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 31 deletions.
@@ -1,6 +1,7 @@
package com.infochimps.elasticsearch;

import java.io.IOException;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -73,10 +74,11 @@ public RecordReader<Text,Text> createRecordReader(InputSplit inputSplit,
The number of splits is specified in the Hadoop configuration object.
*/
public List<InputSplit> getSplits(JobContext context) {

setConf(context.getConfiguration());
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits.intValue());
for(int i = 0; i < numSplits; i++) {
splits.add(new ElasticSearchSplit(queryString, i*numSplitRecords, numSplitRecords-1));
Long size = (numSplitRecords == 1) ? 1 : numSplitRecords-1;
splits.add(new ElasticSearchSplit(queryString, i*numSplitRecords, size));
}
if (numHits % numSplits > 0) splits.add(new ElasticSearchSplit(queryString, numSplits*numSplitRecords, numHits % numSplits - 1));
LOG.info("Created ["+splits.size()+"] splits for ["+numHits+"] hits");
Expand Down Expand Up @@ -132,8 +134,8 @@ private void initiate_search() {
.execute()
.actionGet();
this.numHits = response.hits().totalHits();
this.numSplitRecords = (numHits/numSplits);
if(numSplits > numHits) numSplits = numHits; // This could be bad
this.numSplitRecords = (numHits/numSplits);
}

protected class ElasticSearchRecordReader extends RecordReader<Text, Text> {
Expand Down Expand Up @@ -203,22 +205,24 @@ private Iterator<SearchHit> fetchNextHits() {
.setSize(recsToRead.intValue())
.setQuery(QueryBuilders.queryString(queryString))
.execute()
.actionGet();
.actionGet();
return response.hits().iterator();
}

@Override
public boolean nextKeyValue() throws IOException {
if (hitsItr!=null) {
if (recordsRead < recsToRead && hitsItr.hasNext()) {
SearchHit hit = hitsItr.next();
currentKey = new Text(hit.id());
currentValue = new Text(hit.sourceAsString());
recordsRead += 1;
if (recordsRead < recsToRead) {
if (hitsItr.hasNext()) {
SearchHit hit = hitsItr.next();
currentKey = new Text(hit.id());
currentValue = new Text(hit.sourceAsString());
recordsRead += 1;
return true;
}
} else {
hitsItr = null;
}
return true;
} else {
if (recordsRead < recsToRead) {
hitsItr = fetchNextHits();
Expand All @@ -229,10 +233,9 @@ public boolean nextKeyValue() throws IOException {
recordsRead += 1;
return true;
}
return false;
}
return false;
}
return false;
}

@Override
Expand Down
@@ -1,6 +1,7 @@
package com.infochimps.elasticsearch.pig;

import java.io.IOException;
import java.lang.InterruptedException;
import java.util.Properties;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -29,7 +30,9 @@
import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
import com.infochimps.elasticsearch.ElasticSearchInputFormat;
Expand All @@ -51,6 +54,9 @@ public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface
private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
private static final String ES_IS_JSON = "elasticsearch.is_json";
private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names";
private static final String ES_REQUEST_SIZE = "elasticsearch.request.size";
private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits";
private static final String ES_QUERY_STRING = "elasticsearch.query.string";

private static final String COMMA = ",";
private static final String LOCAL_SCHEME = "file://";
Expand All @@ -59,6 +65,8 @@ public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface
private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";
private static final String ES_CONFIG = "es.config";
private static final String ES_PLUGINS = "es.path.plugins";

public ElasticSearchStorage() {
this(DEFAULT_ES_CONFIG, DEFAULT_ES_PLUGINS);
Expand All @@ -74,7 +82,19 @@ public ElasticSearchStorage(String esConfig, String esPlugins) {
}

@Override
public Tuple getNext() throws IOException {
public Tuple getNext() throws IOException {
try {
Tuple tuple = TupleFactory.getInstance().newTuple(2);
if (reader.nextKeyValue()) {
Text docId = (Text)reader.getCurrentKey();
Text docContent = (Text)reader.getCurrentValue();
tuple.set(0, new DataByteArray(docId.toString()));
tuple.set(1, new DataByteArray(docContent.toString()));
return tuple;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
return null;
}

Expand All @@ -95,6 +115,7 @@ public void setUDFContextSignature(String signature) {

@Override
public void setLocation(String location, Job job) throws IOException {
elasticSearchSetup(location, job);
}

@Override
Expand Down Expand Up @@ -171,9 +192,7 @@ record = (MapWritable)toWritable(data);
}
}
}




try {
writer.write(NullWritable.get(), record);
} catch (InterruptedException e) {
Expand All @@ -187,13 +206,9 @@ public void setStoreFuncUDFContextSignature(String signature) {
}

/**
Look at the passed in uri and hadoop configuration and set options.
<p>
<b>WARNING</b> Note that, since this is called more than once, it is
critical to ensure that we do not change or reset anything we've already set.
*/
@Override
public void setStoreLocation(String location, Job job) throws IOException {
Pull out the elasticsearch setup code
*/
private void elasticSearchSetup(String location, Job job) {
// Need to use the uri parsing library here to pull out everything
try {

Expand All @@ -217,16 +232,25 @@ public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set(ES_INDEX_NAME, esHost);
job.getConfiguration().set(ES_OBJECT_TYPE, parsedLocation.getPath().replaceAll("/", ""));

// Set the bulk request size in the Hadoop configuration
String bulkSize = query.get("size");
if (bulkSize == null) bulkSize = DEFAULT_BULK;
job.getConfiguration().set(ES_BULK_SIZE, bulkSize);

// Set the request size in the Hadoop configuration
String requestSize = query.get("size");
if (requestSize == null) requestSize = DEFAULT_BULK;
job.getConfiguration().set(ES_BULK_SIZE, requestSize);
job.getConfiguration().set(ES_REQUEST_SIZE, requestSize);

// Set the id field name in the Hadoop configuration
String idFieldName = query.get("id");
if (idFieldName == null) idFieldName = "-1";
job.getConfiguration().set(ES_ID_FIELD_NAME, idFieldName);


String queryString = query.get("q");
if (queryString==null) queryString = "*";
job.getConfiguration().set(ES_QUERY_STRING, queryString);

String numTasks = query.get("tasks");
if (numTasks==null) numTasks = "100";
job.getConfiguration().set(ES_NUM_SPLITS, numTasks);

// Adds the elasticsearch.yml file (esConfig) and the plugins directory (esPlugins) to the distributed cache
try {
Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
Expand All @@ -242,18 +266,35 @@ public void setStoreLocation(String location, Job job) throws IOException {
throw new RuntimeException(e);
}

//
// This gets set even when loading data from elasticsearch
//
String isJson = query.get("json");
if (isJson==null || isJson.equals("false")) {
// We're dealing with delimited records
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(ResourceSchema.class);
property.setProperty(ES_IS_JSON, "false");
}


// Need to set this to start the local instance of elasticsearch
job.getConfiguration().set(ES_CONFIG, esConfig);
job.getConfiguration().set(ES_PLUGINS, esPlugins);
}
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}

/**
Look at the passed in uri and hadoop configuration and set options.
<p>
<b>WARNING</b> Note that, since this is called more than once, it is
critical to ensure that we do not change or reset anything we've already set.
*/
@Override
public void setStoreLocation(String location, Job job) throws IOException {
elasticSearchSetup(location, job);
}

/**
Expand Down

0 comments on commit 62380cd

Please sign in to comment.