Permalink
Browse files

elasticsearch configuration is now distributed via hadoops distribute…

…d cache. What this means for you: elasticsearch.yml NO LONGER NEEDS TO EXIST ON ALL MACHINES IN THE CLUSTER
  • Loading branch information...
1 parent 74e68fd commit e91f0a91a771c3332130bf4ffcd718c162cd29de @thedatachef thedatachef committed May 13, 2011
@@ -1,12 +1,14 @@
package com.infochimps.elasticsearch;
+import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Random;
+import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -22,6 +24,7 @@
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.filecache.DistributedCache;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -64,6 +67,7 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
private long runStartTime = System.currentTimeMillis();
// For hadoop configuration
+ private static final String ES_CONFIG_NAME = "elasticsearch.yml";
private static final String ES_INDEX_NAME = "elasticsearch.index.name";
private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
private static final String ES_IS_JSON = "elasticsearch.is_json";
@@ -120,8 +124,30 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) {
this.idField = Integer.parseInt(conf.get(ES_ID_FIELD));
}
this.objType = conf.get(ES_OBJECT_TYPE);
- System.setProperty("es.config",conf.get(ES_CONFIG));
+
+ //
+ // Fetches elasticsearch.yml from the distributed cache
+ //
+ try {
+ URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
+ for (URI cacheFile : cacheFiles) {
+ if ((new File(cacheFile)).getName().equals(ES_CONFIG_NAME)) {
+ LOG.info("Found ElasticSearch configuration ["+cacheFile.getPath()+"] in the distributed cache");
+ System.setProperty("es.config", cacheFile.getPath());
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ //
+ // FIXME! This needs to use the distributed cache
+ //
System.setProperty("es.path.plugins",conf.get(ES_PLUGINS));
+ //
+ //
+ //
start_embedded_client();
initialize_index(indexName);
currentRequest = client.prepareBulk();
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
+import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -21,6 +22,8 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.filecache.DistributedCache;
+
import org.apache.pig.StoreFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -84,6 +87,7 @@
// Other string constants
private static final String SLASH = "/";
private static final String COMMA = ",";
+ private static final String LOCAL_SCHEME = "file://";
private static final String NO_ID_FIELD = "-1";
private static final String DEFAULT_BULK = "1000";
private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
@@ -146,8 +150,22 @@ public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().setBoolean(ES_IS_JSON, false);
job.getConfiguration().set(ES_BULK_SIZE, bulkSize);
job.getConfiguration().set(ES_ID_FIELD, idField);
- job.getConfiguration().set(ES_CONFIG, esConfig);
+
+ //
+ // FIXME! This needs to use the distributed cache
+ //
job.getConfiguration().set(ES_PLUGINS, esPlugins);
+ //
+ //
+ //
+
+ // Adds the elasticsearch.yml file (esConfig) to the distributed cache
+ try {
+ DistributedCache.addCacheFile(new URI(LOCAL_SCHEME+esConfig), job.getConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(ResourceSchema.class);
job.getConfiguration().set(ES_FIELD_NAMES, property.getProperty(PIG_ES_FIELD_NAMES));
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
+import java.net.URI;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.JsonParseException;
@@ -23,6 +24,7 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.pig.StoreFunc;
import org.apache.pig.ResourceSchema;
@@ -86,6 +88,7 @@
// Other string constants
private static final String SLASH = "/";
private static final String NO_ID_FIELD = "-1";
+ private static final String LOCAL_SCHEME = "file://";
private static final String DEFAULT_BULK = "1000";
private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
@@ -136,8 +139,21 @@ public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().setBoolean(ES_IS_JSON, true);
job.getConfiguration().set(ES_BULK_SIZE, bulkSize);
job.getConfiguration().set(ES_ID_FIELD_NAME, idFieldName);
- job.getConfiguration().set(ES_CONFIG, esConfig);
+
+ //
+ // FIXME! This needs to use the distributed cache
+ //
job.getConfiguration().set(ES_PLUGINS, esPlugins);
+ //
+ //
+ //
+
+ // Adds the elasticsearch.yml file (esConfig) to the distributed cache
+ try {
+ DistributedCache.addCacheFile(new URI(LOCAL_SCHEME+esConfig), job.getConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}

0 comments on commit e91f0a9

Please sign in to comment.