Permalink
Browse files

elasticsearchinputformat now does actually work and work correctly, n…

…o longer uses a scan + scroll, but pages through the old fashioned way
  • Loading branch information...
thedatachef committed Jul 7, 2011
1 parent a2f09a9 commit 7bea9b146101df648084d403f3a86638caecd3e7
@@ -29,7 +29,13 @@
import org.elasticsearch.index.query.xcontent.FilterBuilders.*;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
-
+/**
+
+ A Hadoop InputFormat to read data from an Elasticsearch index. The RecordReader
+ divulges records where the key is the record id in elasticsearch and the value
+ is a json string of the (source) record contents.
+
+ */
public class ElasticSearchInputFormat extends InputFormat<Text, Text> implements Configurable {
static Log LOG = LogFactory.getLog(ElasticSearchInputFormat.class);
@@ -38,62 +44,57 @@
private Node node;
private Client client;
- private Integer scrollSize;
- private String scrollId;
+ private Integer requestSize;
private Long numHits;
private Long numSplits;
private Long numSplitRecords;
private String indexName;
private String objType;
+ private String queryString;
+
+ private static final String ES_REQUEST_SIZE = "elasticsearch.request.size"; // number of records to fetch at one time
+ private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits"; // number of hadoop map tasks to launch
+ private static final String ES_QUERY_STRING = "elasticsearch.query.string";
- private static final String ES_SCROLL_SIZE = "elasticsearch.scroll.size";
- private static final String ES_NUMHITS = "elasticsearch.num.hits";
- private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits";
- private static final String ES_SCROLL_ID = "elasticsearch.scroll.id";
private static final String ES_CONFIG_NAME = "elasticsearch.yml";
private static final String ES_PLUGINS_NAME = "plugins";
private static final String ES_INDEX_NAME = "elasticsearch.index.name";
private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
private static final String ES_CONFIG = "es.config";
private static final String ES_PLUGINS = "es.path.plugins";
-
private static final String SLASH = "/";
public RecordReader<Text,Text> createRecordReader(InputSplit inputSplit,
TaskAttemptContext context) {
- return new ElasticSearchRecordReader(numSplitRecords, scrollSize, scrollId);
+ return new ElasticSearchRecordReader();
}
/**
- Sort of silly, really. All we're going to do is use the number of desired splits
- and break the total number of results into that many splits.
-
- FIXME: Need to check that the number of splits isn't larger than the number of records
+ The number of splits is specified in the Hadoop configuration object.
*/
public List<InputSplit> getSplits(JobContext context) {
- LOG.info("Generating approximately ["+numSplits+"] splits for ["+numHits+"] results using elasticsearch scroll id ["+scrollId+"]");
- Configuration conf = context.getConfiguration();
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits.intValue());
- for(int i = 0; i < numSplits+1; i++) {
- splits.add(new ElasticSearchSplit());
+ for(int i = 0; i < numSplits; i++) {
+ splits.add(new ElasticSearchSplit(queryString, i*numSplitRecords, numSplitRecords-1));
}
+ if (numHits % numSplits > 0) splits.add(new ElasticSearchSplit(queryString, numSplits*numSplitRecords, numHits % numSplits - 1));
+ LOG.info("Created ["+splits.size()+"] splits for ["+numHits+"] hits");
return splits;
}
-
+
/**
- This is where we will get information from the outside world such as the cluster to talk to,
- which index and object type to refer to, as well as more mundane things like the number of
- documents per scroll request to use. Need to create a single connection object here and
- determine things like the scroll id, etc.
+ Sets the configuration object, opens a connection to elasticsearch, and
+ initiates the initial search request.
*/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
this.indexName = conf.get(ES_INDEX_NAME);
this.objType = conf.get(ES_OBJECT_TYPE);
- this.scrollSize = Integer.parseInt(conf.get(ES_SCROLL_SIZE));
+ this.requestSize = Integer.parseInt(conf.get(ES_REQUEST_SIZE));
this.numSplits = Long.parseLong(conf.get(ES_NUM_SPLITS));
+ this.queryString = conf.get(ES_QUERY_STRING);
//
// Need to ensure that this is set in the hadoop configuration so we can
@@ -104,45 +105,37 @@ public void setConf(Configuration configuration) {
System.setProperty(ES_PLUGINS, conf.get(ES_PLUGINS));
start_embedded_client();
- initiate_scan();
+
+ initiate_search();
}
- /**
- Doesn't have to do anything other than make the configuration publicly accessible.
- */
@Override
public Configuration getConf() {
return conf;
}
- //
- // Starts an embedded elasticsearch client (ie. data = false)
- //
+ /**
+ Starts an embedded elasticsearch client (ie. data = false)
+ */
private void start_embedded_client() {
LOG.info("Starting embedded elasticsearch client ...");
this.node = NodeBuilder.nodeBuilder().client(true).node();
this.client = node.client();
}
- /**
- FIXME: We would sure like to specify more specific kinds of queries other than
- 'match all'
- */
- private void initiate_scan() {
+ private void initiate_search() {
SearchResponse response = client.prepareSearch(indexName)
.setTypes(objType)
- .setSearchType(SearchType.SCAN)
- .setScroll("10m") // this should be settable
- .setQuery(QueryBuilders.matchAllQuery())
- .setFrom(0)
- .setSize(scrollSize)
+ .setSearchType(SearchType.COUNT)
+ .setQuery(QueryBuilders.queryString(queryString))
+ .setSize(requestSize)
.execute()
.actionGet();
this.numHits = response.hits().totalHits();
- this.scrollId = response.scrollId();
- this.numSplitRecords = (numHits/numSplits);
+ this.numSplitRecords = (numHits/numSplits);
+ if(numSplits > numHits) numSplits = numHits; // This could be bad
}
-
+
protected class ElasticSearchRecordReader extends RecordReader<Text, Text> {
private Node node;
@@ -151,18 +144,18 @@ private void initiate_scan() {
private String indexName;
private String objType;
private Long numSplitRecords;
- private Integer scrollSize;
- private String scrollId;
+ private Integer requestSize;
private Text currentKey;
private Text currentValue;
private Integer recordsRead;
private Iterator<SearchHit> hitsItr = null;
+
+
+ private String queryString;
+ private Long from;
+ private Long recsToRead;
- public ElasticSearchRecordReader(Long numSplitRecords, Integer scrollSize, String scrollId) {
- this.numSplitRecords = numSplitRecords;
- this.scrollSize = scrollSize;
- this.scrollId = scrollId;
- System.out.println(scrollId);
+ public ElasticSearchRecordReader() {
}
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -185,37 +178,39 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
throw new RuntimeException(e);
}
+ queryString = ((ElasticSearchSplit)split).getQueryString();
+ from = ((ElasticSearchSplit)split).getFrom();
+ recsToRead = ((ElasticSearchSplit)split).getSize();
+
+ LOG.info("elasticsearch record reader: query ["+queryString+"], from ["+from+"], size ["+recsToRead+"]");
start_embedded_client();
recordsRead = 0;
}
- //
- // Starts an embedded elasticsearch client (ie. data = false)
- //
+ /**
+ Starts an embedded elasticsearch client (ie. data = false)
+ */
private void start_embedded_client() {
LOG.info("Starting embedded elasticsearch client ...");
this.node = NodeBuilder.nodeBuilder().client(true).node();
this.client = node.client();
}
private Iterator<SearchHit> fetchNextHits() {
- SearchResponse response = client.prepareSearchScroll(scrollId)
- .setScroll("10m")
+ SearchResponse response = client.prepareSearch(indexName)
+ .setTypes(objType)
+ .setFrom(from.intValue())
+ .setSize(recsToRead.intValue())
+ .setQuery(QueryBuilders.queryString(queryString))
.execute()
- .actionGet();
+ .actionGet();
return response.hits().iterator();
}
- /**
- Maintains a buffer of records from elasticsearch. If the buffer
- is empty _and_ the number of records read by _this_ record reader
- hasn't exceeded the numSplitRecords then a new scroll get request
- is made to elasticsearch.
- */
@Override
public boolean nextKeyValue() throws IOException {
if (hitsItr!=null) {
- if (recordsRead <= numSplitRecords && hitsItr.hasNext()) {
+ if (recordsRead < recsToRead && hitsItr.hasNext()) {
SearchHit hit = hitsItr.next();
currentKey = new Text(hit.id());
currentValue = new Text(hit.sourceAsString());
@@ -225,7 +220,7 @@ public boolean nextKeyValue() throws IOException {
}
return true;
} else {
- if (recordsRead <= numSplitRecords) {
+ if (recordsRead < recsToRead) {
hitsItr = fetchNextHits();
if (hitsItr.hasNext()) {
SearchHit hit = hitsItr.next();
@@ -4,13 +4,36 @@
import java.io.DataInput;
import java.io.DataOutput;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
public class ElasticSearchSplit extends InputSplit implements Writable {
- public ElasticSearchSplit() {
+
+ private String queryString;
+ private long from;
+ private long size;
+
+ public ElasticSearchSplit() {}
+
+ public ElasticSearchSplit(String queryString, long from, long size) {
+ this.queryString = queryString;
+ this.from = from;
+ this.size = size;
}
+ public String getQueryString() {
+ return queryString;
+ }
+
+ public long getFrom() {
+ return from;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
@Override
public String[] getLocations() {
return new String[] {};
@@ -23,9 +46,15 @@ public long getLength() {
@Override
public void readFields(DataInput in) throws IOException {
+ queryString = Text.readString(in);
+ from = in.readLong();
+ size = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
+ Text.writeString(out, queryString);
+ out.writeLong(from);
+ out.writeLong(size);
}
}

0 comments on commit 7bea9b1

Please sign in to comment.