Permalink
Browse files

inputformat to dump records from elasticsearch actually works, now ju…

…st needs some tweaking to make it useful
  • Loading branch information...
1 parent 6209899 commit a2f09a92e5c35ec215719bc3061a170a780ecb6c @thedatachef thedatachef committed Jul 7, 2011
Showing with 105 additions and 14 deletions.
  1. +105 −14 src/main/java/com/infochimps/elasticsearch/ElasticSearchInputFormat.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -22,6 +23,7 @@
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
+import org.elasticsearch.search.SearchHit;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.xcontent.FilterBuilders.*;
@@ -59,7 +61,7 @@
public RecordReader<Text,Text> createRecordReader(InputSplit inputSplit,
TaskAttemptContext context) {
- return new ElasticSearchRecordReader();
+ return new ElasticSearchRecordReader(numSplitRecords, scrollSize, scrollId);
}
/**
@@ -92,10 +94,14 @@ public void setConf(Configuration configuration) {
this.objType = conf.get(ES_OBJECT_TYPE);
this.scrollSize = Integer.parseInt(conf.get(ES_SCROLL_SIZE));
this.numSplits = Long.parseLong(conf.get(ES_NUM_SPLITS));
-
- // FIXME: need to get this in some other way
- System.setProperty(ES_CONFIG, "/etc/elasticsearch/elasticsearch.yml");
- System.setProperty(ES_PLUGINS, "/usr/local/share/elasticsearch/plugins");
+
+ //
+ // Need to ensure that this is set in the hadoop configuration so we can
+ // instantiate a local client. The reason is that no files are in the
+ // distributed cache when this is called.
+ //
+ System.setProperty(ES_CONFIG, conf.get(ES_CONFIG));
+ System.setProperty(ES_PLUGINS, conf.get(ES_PLUGINS));
start_embedded_client();
initiate_scan();
@@ -139,41 +145,126 @@ private void initiate_scan() {
protected class ElasticSearchRecordReader extends RecordReader<Text, Text> {
+ private Node node;
+ private Client client;
+
+ private String indexName;
+ private String objType;
+ private Long numSplitRecords;
+ private Integer scrollSize;
+ private String scrollId;
private Text currentKey;
private Text currentValue;
- private boolean val = true;
- public ElasticSearchRecordReader() {
+ private Integer recordsRead;
+ private Iterator<SearchHit> hitsItr = null;
+
+ public ElasticSearchRecordReader(Long numSplitRecords, Integer scrollSize, String scrollId) {
+ this.numSplitRecords = numSplitRecords;
+ this.scrollSize = scrollSize;
+ this.scrollId = scrollId;
+ System.out.println(scrollId);
}
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ this.indexName = conf.get(ES_INDEX_NAME);
+ this.objType = conf.get(ES_OBJECT_TYPE);
+ LOG.info("Initializing elasticsearch record reader on index ["+indexName+"] and object type ["+objType+"]");
+
+ //
+ // Fetches elasticsearch.yml and the plugins directory from the distributed cache
+ //
+ try {
+ String taskConfigPath = HadoopUtils.fetchFileFromCache(ES_CONFIG_NAME, conf);
+ LOG.info("Using ["+taskConfigPath+"] as es.config");
+ String taskPluginsPath = HadoopUtils.fetchArchiveFromCache(ES_PLUGINS_NAME, conf);
+ LOG.info("Using ["+taskPluginsPath+"] as es.plugins.dir");
+ System.setProperty(ES_CONFIG, taskConfigPath);
+ System.setProperty(ES_PLUGINS, taskPluginsPath+SLASH+ES_PLUGINS_NAME);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ start_embedded_client();
+ recordsRead = 0;
}
- @Override
- public void close() throws IOException {
+ //
+ // Starts an embedded elasticsearch client (ie. data = false)
+ //
+ private void start_embedded_client() {
+ LOG.info("Starting embedded elasticsearch client ...");
+ this.node = NodeBuilder.nodeBuilder().client(true).node();
+ this.client = node.client();
}
+ private Iterator<SearchHit> fetchNextHits() {
+ SearchResponse response = client.prepareSearchScroll(scrollId)
+ .setScroll("10m")
+ .execute()
+ .actionGet();
+ return response.hits().iterator();
+ }
+
+ /**
+ Maintains a buffer of records from elasticsearch. If the buffer
+ is empty _and_ the number of records read by _this_ record reader
+ hasn't exceeded the numSplitRecords then a new scroll get request
+ is made to elasticsearch.
+ */
@Override
public boolean nextKeyValue() throws IOException {
- if(val) {
- val = false;
+ if (hitsItr!=null) {
+ if (recordsRead <= numSplitRecords && hitsItr.hasNext()) {
+ SearchHit hit = hitsItr.next();
+ currentKey = new Text(hit.id());
+ currentValue = new Text(hit.sourceAsString());
+ recordsRead += 1;
+ } else {
+ hitsItr = null;
+ }
return true;
+ } else {
+ if (recordsRead <= numSplitRecords) {
+ hitsItr = fetchNextHits();
+ if (hitsItr.hasNext()) {
+ SearchHit hit = hitsItr.next();
+ currentKey = new Text(hit.id());
+ currentValue = new Text(hit.sourceAsString());
+ recordsRead += 1;
+ return true;
+ }
+ return false;
+ }
+ return false;
}
- return false;
}
@Override
public Text getCurrentKey() {
- return new Text("howdy");
+ return currentKey;
}
@Override
public Text getCurrentValue() {
- return new Text("dood");
+ return currentValue;
}
@Override
public float getProgress() throws IOException {
return 0;
}
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Closing record reader");
+ client.close();
+ LOG.info("Client is closed");
+ if (node != null) {
+ node.close();
+ }
+ LOG.info("Record reader closed.");
+ }
+
}
}

0 comments on commit a2f09a9

Please sign in to comment.