Skip to content
Browse files

Merge pull request #3 from jghoman/prefetchToken

Prefetch token
  • Loading branch information...
2 parents 77f51be + 8300c32 commit 04bac086a47a26dba4da3b33e4384922c00c7447 @rbpark committed
View
5 azkaban/src/java/azkaban/jobs/builtin/PigProcessJob.java
@@ -30,6 +30,7 @@
import static azkaban.util.SecurityUtils.PROXY_USER;
import static azkaban.util.SecurityUtils.TO_PROXY;
import static azkaban.util.SecurityUtils.shouldProxy;
+import static azkaban.jobs.builtin.SecurePigWrapper.OBTAIN_BINARY_TOKEN;
public class PigProcessJob extends JavaProcessJob {
@@ -73,6 +74,10 @@ protected String getJVMArguments() {
secure = " -D" + PROXY_USER + "=" + p.getProperty(PROXY_USER);
secure += " -D" + PROXY_KEYTAB_LOCATION + "=" + p.getProperty(PROXY_KEYTAB_LOCATION);
secure += " -D" + TO_PROXY + "=" + p.getProperty(TO_PROXY);
+ String extraToken = p.getProperty(OBTAIN_BINARY_TOKEN);
+ if(extraToken != null) {
+ secure += " -D" + OBTAIN_BINARY_TOKEN + "=" + extraToken;
+ }
info("Secure settings = " + secure);
args += secure;
} else {
View
59 azkaban/src/java/azkaban/jobs/builtin/SecurePigWrapper.java
@@ -16,9 +16,18 @@
package azkaban.jobs.builtin;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.apache.pig.Main;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
@@ -26,17 +35,61 @@
import static azkaban.util.SecurityUtils.getProxiedUser;
public class SecurePigWrapper {
+
+ public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+ public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
+
public static void main(final String[] args) throws IOException, InterruptedException {
- Logger logger = Logger.getRootLogger();
- Properties p = System.getProperties();
- Configuration conf = new Configuration();
+ final Logger logger = Logger.getRootLogger();
+ final Properties p = System.getProperties();
+ final Configuration conf = new Configuration();
getProxiedUser(p, logger, conf).doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
+ prefetchToken();
Main.main(args);
return null;
}
+
+ // For Pig jobs that need to do extra communication with the JobTracker,
+ // it's necessary to pre-fetch a token and include it in the credentials
+ // cache
+ private void prefetchToken() throws InterruptedException, IOException {
+ String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
+ if(shouldPrefetch != null && shouldPrefetch.equals("true") ) {
+ logger.info("Pre-fetching token");
+ Job job = new Job(conf, "totally phony, extremely fake, not real job");
+
+ JobConf jc = new JobConf(conf);
+ JobClient jobClient = new JobClient(jc);
+ logger.info("Pre-fetching: Got new JobClient: " + jc);
+ Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
+ job.getCredentials().addToken(new Text("howdy"), mrdt);
+
+ File temp = File.createTempFile("mr-azkaban", ".token");
+ temp.deleteOnExit();
+
+ FileOutputStream fos = null;
+ DataOutputStream dos = null;
+ try {
+ fos = new FileOutputStream(temp);
+ dos = new DataOutputStream(fos);
+ job.getCredentials().writeTokenStorageToStream(dos);
+ } finally {
+ if(dos != null) {
+ dos.close();
+ }
+ if(fos != null) {
+ fos.close();
+ }
+ }
+ logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
+ System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
+ } else {
+ logger.info("Not pre-fetching token");
+ }
+ }
});
}
View
2 azkaban/src/java/azkaban/util/SecurityUtils.java
@@ -41,7 +41,7 @@ public static synchronized UserGroupInformation getProxiedUser(String toProxy, P
throw new IllegalArgumentException("toProxy can't be null");
}
if(conf == null) {
- throw new IllegalAccessError("conf can't be null");
+ throw new IllegalArgumentException("conf can't be null");
}
if (loginUser == null) {

0 comments on commit 04bac08

Please sign in to comment.
Something went wrong with that request. Please try again.