Permalink
Browse files

Add support for kerberized grids in the job by supporting protocols

  • Loading branch information...
1 parent 374d02a commit daa49bf7aecf27aa468711c06ae494e5ce6f6352 @abh1nay abh1nay committed Dec 6, 2012
@@ -98,6 +98,11 @@
private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";
+ // new properties for the push job
+
+ private final String hdfsFetcherPort;
+ private final String hdfsFetcherProtocol;
+
/* Informed stuff */
private final String informedURL = "http://informed.corp.linkedin.com/_post";
private final List<Future> informedResults;
@@ -134,6 +139,12 @@ public VoldemortBuildAndPushJob(String name, Props props) {
this.informedResults = Lists.newArrayList();
this.informedExecutor = Executors.newFixedThreadPool(2);
+ this.hdfsFetcherProtocol = props.getString("voldemort.fetcher.protocol", "hftp");
+ this.hdfsFetcherPort = props.getString("voldemort.fetcher.port", "50070");
+
+ log.info("voldemort.fetcher.protocol is set to : " + hdfsFetcherProtocol);
+ log.info("voldemort.fetcher.port is set to : " + hdfsFetcherPort);
+
isAvroJob = props.getBoolean("build.type.avro", false);
// Set default to false
@@ -43,16 +43,23 @@
private final Props _props;
private VoldemortSwapConf swapConf;
+ private String hdfsFetcherProtocol;
+ private String hdfsFetcherPort;
public VoldemortSwapJob(String id, Props props) throws IOException {
super(id);
_props = props;
+
+ this.hdfsFetcherProtocol = props.getString("voldemort.fetcher.protocol", "hftp");
+ this.hdfsFetcherPort = props.getString("voldemort.fetcher.port", "50070");
swapConf = new VoldemortSwapConf(_props);
}
public VoldemortSwapJob(String id, Props props, VoldemortSwapConf conf) throws IOException {
super(id);
_props = props;
+ this.hdfsFetcherProtocol = props.getString("voldemort.fetcher.protocol", "hftp");
+ this.hdfsFetcherPort = props.getString("voldemort.fetcher.port", "50070");
swapConf = conf;
}
@@ -150,17 +157,6 @@ public void run() throws Exception {
dataDir = dataPath.makeQualified(FileSystem.get(conf)).toString();
/*
- * Set the protocol according to config: webhdfs if its enabled
- * Otherwise use hftp.
- */
- Configuration hadoopConfig = new Configuration();
- String protocolName = hadoopConfig.get("dfs.webhdfs.enabled");
- String protocolPort = "";
- if(hadoopConfig.get("dfs.http.address").split(":").length >= 2)
- protocolPort = hadoopConfig.get("dfs.http.address").split(":")[1];
- protocolName = (protocolName == null) ? "hftp" : "webhdfs";
-
- /*
* Replace the default protocol and port with the one derived as above
*/
String existingProtocol = "";
@@ -171,12 +167,10 @@ public void run() throws Exception {
existingPort = pathComponents[2].split("/")[0];
}
info("Existing protocol = " + existingProtocol + " and port = " + existingPort);
- if(protocolName.length() > 0 && protocolPort.length() > 0) {
- dataDir = dataDir.replaceFirst(existingProtocol, protocolName);
- dataDir = dataDir.replaceFirst(existingPort, protocolPort);
+ if(hdfsFetcherProtocol.length() > 0 && hdfsFetcherPort.length() > 0) {
+ dataDir = dataDir.replaceFirst(existingProtocol, this.hdfsFetcherProtocol);
+ dataDir = dataDir.replaceFirst(existingPort, this.hdfsFetcherPort);
}
- info("dfs.webhdfs.enabled = " + hadoopConfig.get("dfs.webhdfs.enabled")
- + " and new protocol = " + protocolName + " and port = " + protocolPort);
// Create admin client
AdminClient client = new AdminClient(cluster,

0 comments on commit daa49bf

Please sign in to comment.