Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

both pig indexers actually use the distributed cache to distribute co…

…nfig, wasnt working properly before
  • Loading branch information...
commit 08322f360024e03930da6bf27cc82fe0f49d9cc0 1 parent 0ea5d90
@thedatachef thedatachef authored
View
29 src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java
@@ -37,6 +37,8 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.ExceptionsHelper;
+import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
+
/**
Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) into Elasticsearch. Records are batched up and sent
@@ -81,6 +83,7 @@ Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) in
// Other string constants
private static final String COMMA = ",";
+ private static final String SLASH = "/";
private static final String NO_ID_FIELD = "-1";
private volatile BulkRequestBuilder currentRequest;
@@ -130,28 +133,16 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) {
// Fetches elasticsearch.yml and the plugins directory from the distributed cache
//
try {
- URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
- for (URI cacheFile : cacheFiles) {
- if ((new File(cacheFile)).getName().equals(ES_CONFIG_NAME)) {
- LOG.info("Found ElasticSearch configuration ["+cacheFile.getPath()+"] in the distributed cache");
- System.setProperty(ES_CONFIG, cacheFile.getPath());
- break;
- }
- }
-
- URI[] cacheArchives = DistributedCache.getCacheArchives(conf);
-
- for (URI cacheArchive : cacheArchives) {
- if ((new File(cacheArchive)).getName().equals(ES_PLUGINS_NAME)) {
- LOG.info("Found ElasticSearch configuration ["+cacheArchive.getPath()+"] in the distributed cache");
- System.setProperty(ES_PLUGINS, cacheArchive.getPath());
- break;
- }
- }
+ String taskConfigPath = HadoopUtils.fetchFileFromCache(ES_CONFIG_NAME, conf);
+ LOG.info("Using ["+taskConfigPath+"] as es.config");
+ String taskPluginsPath = HadoopUtils.fetchArchiveFromCache(ES_PLUGINS_NAME, conf);
+ LOG.info("Using ["+taskPluginsPath+"] as es.plugins.dir");
+ System.setProperty(ES_CONFIG, taskConfigPath);
+ System.setProperty(ES_PLUGINS, taskPluginsPath+SLASH+ES_PLUGINS_NAME);
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
start_embedded_client();
initialize_index(indexName);
currentRequest = client.prepareBulk();
View
78 src/main/java/com/infochimps/elasticsearch/hadoop/util/HadoopUtils.java
@@ -0,0 +1,78 @@
+package com.infochimps.elasticsearch.hadoop.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+
+public class HadoopUtils {
+
+ /**
+ Upload a local file to the cluster
+ */
+ public static void uploadLocalFile(Path localsrc, Path hdfsdest, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(hdfsdest) && fs.getFileStatus(hdfsdest).isDir()) {
+ fs.delete(hdfsdest, true);
+ }
+ fs.copyFromLocalFile(false, true, localsrc, hdfsdest);
+ }
+
+ /**
+ Fetches a file with the basename specified from the distributed cache. Returns null if no file is found
+ */
+ public static String fetchFileFromCache(String basename, Configuration conf) throws IOException {
+ Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
+ if (cacheFiles != null && cacheFiles.length > 0) {
+ for (Path cacheFile : cacheFiles) {
+ if (cacheFile.getName().equals(basename)) {
+ return cacheFile.toString();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ Fetches a file with the basename specified from the distributed cache. Returns null if no file is found
+ */
+ public static String fetchArchiveFromCache(String basename, Configuration conf) throws IOException {
+ Path[] cacheArchives = DistributedCache.getLocalCacheArchives(conf);
+ if (cacheArchives != null && cacheArchives.length > 0) {
+ for (Path cacheArchive : cacheArchives) {
+ if (cacheArchive.getName().equals(basename)) {
+ return cacheArchive.toString();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ Takes a path on the hdfs and ships it in the distributed cache if it is not already in the distributed cache
+ */
+ public static void shipFileIfNotShipped(Path hdfsPath, Configuration conf) throws IOException {
+ if (fetchFileFromCache(hdfsPath.getName(), conf) == null) {
+ try {
+ DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ Takes a path on the hdfs and ships it in the distributed cache if it is not already in the distributed cache
+ */
+ public static void shipArchiveIfNotShipped(Path hdfsPath, Configuration conf) throws IOException {
+ if (fetchArchiveFromCache(hdfsPath.getName(), conf) == null) {
+ try {
+ DistributedCache.addCacheArchive(hdfsPath.toUri(), conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
View
14 src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchIndex.java
@@ -39,6 +39,7 @@
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.UDFContext;
+import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
/**
@@ -90,6 +91,8 @@
private static final String DEFAULT_BULK = "1000";
private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
+ private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
+ private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";
public ElasticSearchIndex() {
this(NO_ID_FIELD, DEFAULT_BULK);
@@ -151,8 +154,15 @@ public void setStoreLocation(String location, Job job) throws IOException {
// Adds the elasticsearch.yml file (esConfig) and the plugins directory (esPlugins) to the distributed cache
try {
- DistributedCache.addCacheFile(new URI(LOCAL_SCHEME+esConfig), job.getConfiguration());
- DistributedCache.addCacheArchive(new URI(LOCAL_SCHEME+esPlugins), job.getConfiguration());
+ Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
+ Path hdfsPluginsPath = new Path(ES_PLUGINS_HDFS_PATH);
+
+ HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME+esConfig), hdfsConfigPath, job.getConfiguration());
+ HadoopUtils.shipFileIfNotShipped(hdfsConfigPath, job.getConfiguration());
+
+ HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME+esPlugins), hdfsPluginsPath, job.getConfiguration());
+ HadoopUtils.shipArchiveIfNotShipped(hdfsPluginsPath, job.getConfiguration());
+
} catch (Exception e) {
throw new RuntimeException(e);
}
View
13 src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchJsonIndex.java
@@ -41,6 +41,7 @@
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.UDFContext;
+import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
/**
@@ -90,6 +91,8 @@
private static final String DEFAULT_BULK = "1000";
private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
+ private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
+ private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";
public ElasticSearchJsonIndex() {
this(NO_ID_FIELD, DEFAULT_BULK);
@@ -140,8 +143,14 @@ public void setStoreLocation(String location, Job job) throws IOException {
// Adds the elasticsearch.yml file (esConfig) to the distributed cache
try {
- DistributedCache.addCacheFile(new URI(LOCAL_SCHEME+esConfig), job.getConfiguration());
- DistributedCache.addCacheArchive(new URI(LOCAL_SCHEME+esPlugins), job.getConfiguration());
+ Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
+ Path hdfsPluginsPath = new Path(ES_PLUGINS_HDFS_PATH);
+
+ HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME+esConfig), hdfsConfigPath, job.getConfiguration());
+ HadoopUtils.shipFileIfNotShipped(hdfsConfigPath, job.getConfiguration());
+
+ HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME+esPlugins), hdfsPluginsPath, job.getConfiguration());
+ HadoopUtils.shipArchiveIfNotShipped(hdfsPluginsPath, job.getConfiguration());
} catch (Exception e) {
throw new RuntimeException(e);
}
Please sign in to comment.
Something went wrong with that request. Please try again.