Permalink
Browse files

works with elasticsearch v 0.19.8

  • Loading branch information...
1 parent 04d0d3c commit 5410c6e943819c186449dc1aaa3fe3559080d44c Dhruv Bansal committed Oct 24, 2012
View
@@ -18,7 +18,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>0.18.7</version>
+ <version>0.19.8</version>
</dependency>
<dependency>
@@ -32,7 +32,7 @@
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.ExceptionsHelper;
@@ -1,8 +1,12 @@
package com.infochimps.elasticsearch;
import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.FileReader;
import java.util.List;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -14,12 +18,11 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.loader.YamlSettingsLoader;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.client.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
@@ -36,6 +39,7 @@
private String indexName;
private static final String ES_TYPE_OPT = "elasticsearch.input.type";
+ private static final String ES_DEFAULT_TYPE = "streaming_record";
private String typeName;
private static final String ES_NUM_SPLITS_OPT = "elasticsearch.input.splits";
@@ -57,6 +61,8 @@
private static final String ES_PLUGINS_OPT = "es.path.plugins";
private static final String ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
+
+ private static final String ES_UNICAST_HOSTS_NAME = "discovery.zen.ping.unicast.hosts";
private TransportClient client;
@@ -93,7 +99,7 @@ public void setLocalElasticSearchInstallation(JobConf conf) {
private void parseInput(JobConf conf) {
this.indexName = conf.get(ES_INDEX_OPT, ES_DEFAULT_INDEX);
- this.typeName = conf.get(ES_TYPE_OPT, "");
+ this.typeName = conf.get(ES_TYPE_OPT, ES_DEFAULT_TYPE);
// this.numSplits = Integer.parseInt(conf.get(ES_NUM_SPLITS_OPT, ES_NUM_SPLITS));
this.queryJSON = conf.get(ES_QUERY_OPT, ES_QUERY);
String message = "Using input /"+indexName;
@@ -111,24 +117,62 @@ private void parseInput(JobConf conf) {
//
private void startTransportClient(JobConf conf) {
+ this.client = new TransportClient();
+ Map<String,String> settings = parsedSettings(conf);
+ String host = hostname(settings);
+ if (host.toString().length() == 0) {
+ System.exit(1);
+ }
+ LOG.info("Attempting to connect to Elasticsearch node at " + host + ":9300");
+ this.client = new TransportClient().addTransportAddress(new InetSocketTransportAddress(host, 9300));
+ LOG.info("Connected to Elasticsearch cluster");
+ }
+
+ private Map<String,String> parsedSettings(JobConf conf) {
String esConfigPath = conf.get(ES_CONFIG_OPT, ES_CONFIG);
String esPluginsPath = conf.get(ES_PLUGINS_OPT, ES_PLUGINS);
- Settings settings = ImmutableSettings.settingsBuilder()
- .put(ES_CONFIG_OPT, esConfigPath)
- .put(ES_PLUGINS_OPT, esPluginsPath)
- .build();
-
- // FIXME -- can't figure out how to get settings from
- // elasticsearch.yml to control TransportClient.
- this.client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("10.124.115.89", 9300));
+ try {
+ BufferedReader reader = new BufferedReader( new FileReader(esConfigPath));
+ String line = null;
+ StringBuilder stringBuilder = new StringBuilder();
+ String ls = System.getProperty("line.separator");
+ while( ( line = reader.readLine() ) != null ) {
+ stringBuilder.append( line );
+ stringBuilder.append( ls );
+ }
+ return new YamlSettingsLoader().load(stringBuilder.toString());
+ } catch (IOException e) {
+ LOG.error("Could not find or read the configuration file " + esConfigPath + ".");
+ return new HashMap<String,String>();
+ }
+ }
- LOG.info("Connected to Elasticsearch cluster");
+ private String hostname(Map<String,String> settings) {
+ String hostsString = settings.get(ES_UNICAST_HOSTS_NAME);
+ if (hostsString.toString().length() == 0) {
+ LOG.error("Could not find hosts. Did you set the '" + ES_UNICAST_HOSTS_NAME + "' key?");
+ return "";
+ }
+
+ String[] hosts = hostsString.split(",");
+ if (hosts.length > 0) {
+ String host = hosts[0];
+ if (host.toString().length() == 0) {
+ LOG.error("Could not parse hosts from '" + ES_UNICAST_HOSTS_NAME + "' key.");
+ return "";
+ } else {
+ return host;
+ }
+ } else {
+ LOG.error("Could not find any hosts in the '" + ES_UNICAST_HOSTS_NAME + "' key.");
+ return "";
+ }
}
private void stopTransportClient() {
if (client != null) client.close();
- LOG.info("Disconnected from Elasticsearch cluster");
+ LOG.info("Disconnected from Elasticsearch cluster");
}
private void findNumHits() {
@@ -155,26 +199,31 @@ private void readjustSplitsByHits() {
}
private InputSplit[] createSplits() {
- // This could be bad
+ // Say that
+ //
+ // numHits = 7
+ // numSplits = 2
if((long) numSplits > numHits) {
numSplits = (int) numHits;
}
- this.recordsPerSplit = (int) (numHits/((long)numSplits));
+ this.recordsPerSplit = (int) (numHits/((long)numSplits)); // == 3 records/split
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+
+ // i == 0, 1
for(int i = 0; i < numSplits; i++) {
Integer from = i * recordsPerSplit;
- Integer size = (recordsPerSplit == 1) ? 1 : recordsPerSplit;
- splits.add(new ElasticSearchStreamingSplit(indexName, typeName, numSplits, queryJSON, numHits, from, size));
+ splits.add(new ElasticSearchStreamingSplit(indexName, typeName, numSplits, queryJSON, numHits, from, recordsPerSplit));
}
+ // 7 is > (2 * 3) == 6
if (numHits > ((long) (numSplits * recordsPerSplit))) {
Integer from = numSplits * recordsPerSplit;
Integer size = (int) (numHits - ((long) from));
splits.add(new ElasticSearchStreamingSplit(indexName, typeName, numSplits, queryJSON, numHits, from, size));
}
- LOG.info("Splitting "+String.valueOf(numHits)+" hits across "+String.valueOf(numSplits)+" splits ("+String.valueOf(recordsPerSplit)+" hits/split)");
+ LOG.info("Splitting "+String.valueOf(numHits)+" hits across "+String.valueOf(splits.size())+" splits ("+String.valueOf(recordsPerSplit)+" hits/split)");
return splits.toArray(new InputSplit[splits.size()]);
}
@@ -17,8 +17,8 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.action.search.SearchScrollRequestBuilder;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.Scroll;
@@ -20,7 +20,7 @@
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.ExceptionsHelper;
@@ -11,8 +11,8 @@
import org.elasticsearch.search.Scroll;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.action.search.SearchScrollRequestBuilder;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.SearchType;
public class ElasticSearchStreamingSplit implements InputSplit, Writable {
@@ -33,7 +33,7 @@
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.ExceptionsHelper;

0 comments on commit 5410c6e

Please sign in to comment.