Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'delegation'

  • Loading branch information...
commit cbe2b4aafb0b6e121a141df263c295f559b4053b 2 parents 495431e + 1334add
Costin Leau authored
View
22 src/main/java/org/springframework/data/hadoop/fs/FileSystemFactoryBean.java
@@ -22,6 +22,7 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.util.StringUtils;
/**
* FactoryBean for creating Hadoop {@link FileSystem} instances. Useful for interacting with
@@ -35,10 +36,19 @@
private FileSystem fs;
private Configuration configuration;
private URI uri;
+ private String user;
public void afterPropertiesSet() throws Exception {
Configuration cfg = (configuration != null ? configuration : new Configuration(true));
- fs = (uri != null ? FileSystem.get(uri, cfg) : FileSystem.get(cfg));
+ if (uri == null) {
+ uri = FileSystem.getDefaultUri(cfg);
+ }
+ if (StringUtils.hasText(user)) {
+ fs = FileSystem.get(uri, cfg, user);
+ }
+ else {
+ fs = FileSystem.get(uri, cfg);
+ }
}
public void destroy() throws Exception {
@@ -78,4 +88,14 @@ public void setConfiguration(Configuration configuration) {
public void setUri(URI uri) {
this.uri = uri;
}
+
+ /**
+ * Sets the user impersonation (optional) for creating this file-system.
+ * Should be used when running against a Hadoop Kerberos cluster.
+ *
+ * @param user user/group information
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
}
View
3  src/main/java/org/springframework/data/hadoop/fs/HdfsResourceLoader.java
@@ -34,6 +34,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PathMatcher;
+import org.springframework.util.StringUtils;
/**
* Spring ResourceLoader over Hadoop FileSystem.
@@ -72,7 +73,7 @@ public HdfsResourceLoader(Configuration config, URI uri, String user) {
if (uri == null) {
uri = FileSystem.getDefaultUri(config);
}
- tempFS = (user != null ? FileSystem.get(uri, config, user) : FileSystem.get(uri, config));
+ tempFS = (StringUtils.hasText(user) ? FileSystem.get(uri, config, user) : FileSystem.get(uri, config));
} catch (Exception ex) {
tempFS = null;
throw new IllegalStateException("Cannot create filesystem", ex);
View
31 src/main/java/org/springframework/data/hadoop/mapreduce/JobFactoryBean.java
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -36,6 +37,7 @@
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
@@ -95,6 +97,8 @@
private Boolean validatePaths = Boolean.TRUE;
private ClassLoader beanClassLoader;
+ private String user;
+
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;
@@ -118,11 +122,24 @@ public boolean isSingleton() {
@SuppressWarnings("rawtypes")
public void afterPropertiesSet() throws Exception {
- Configuration cfg = (properties != null ? ConfigurationUtils.createFrom(configuration, properties) : (configuration != null ? configuration : new Configuration()));
+ final Configuration cfg = (properties != null ? ConfigurationUtils.createFrom(configuration, properties) : (configuration != null ? configuration : new Configuration()));
buildGenericOptions(cfg);
- job = new Job(cfg);
+ if (StringUtils.hasText(user)) {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ job = new Job(cfg);
+ return null;
+ }
+ });
+ }
+ else {
+ job = new Job(cfg);
+ }
ClassLoader loader = (beanClassLoader != null ? beanClassLoader : org.springframework.util.ClassUtils.getDefaultClassLoader());
@@ -493,4 +510,14 @@ public void setValidatePaths(Boolean validatePaths) {
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ /**
+ * Sets the user impersonation (optional) for running this job.
+ * Should be used when running against a Hadoop Kerberos cluster.
+ *
+ * @param user user/group information
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
}
View
34 src/main/java/org/springframework/data/hadoop/mapreduce/StreamJobFactoryBean.java
@@ -17,6 +17,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.LinkedHashMap;
@@ -26,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.streaming.StreamJob;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
@@ -57,6 +59,8 @@
private Properties properties;
private Properties cmdEnv;
+ private String user;
+
public void setBeanName(String name) {
this.name = name;
}
@@ -77,7 +81,7 @@ public void afterPropertiesSet() throws Exception {
Assert.isTrue(!ObjectUtils.isEmpty(input), "at least one input required");
Assert.hasText(output, "the output is required");
- Configuration cfg = (properties != null ? ConfigurationUtils.createFrom(configuration, properties) : (configuration != null ? configuration : new Configuration()));
+ final Configuration cfg = (properties != null ? ConfigurationUtils.createFrom(configuration, properties) : (configuration != null ? configuration : new Configuration()));
buildGenericOptions(cfg);
@@ -96,7 +100,7 @@ public void afterPropertiesSet() throws Exception {
addArgument(numReduceTasks.toString(), "-numReduceTasks", args);
// translate map to list
- List<String> argsList = new ArrayList<String>(args.size() * 2 + 16);
+ final List<String> argsList = new ArrayList<String>(args.size() * 2 + 16);
for (Map.Entry<String, String> entry : args.entrySet()) {
argsList.add(entry.getKey());
@@ -116,7 +120,21 @@ public void afterPropertiesSet() throws Exception {
// add recurring arguments
addArgument(input, "-input", argsList);
- job = new Job(createStreamJob(cfg, argsList.toArray(new String[argsList.size()])));
+ if (StringUtils.hasText(user)) {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ job = new Job(createStreamJob(cfg, argsList.toArray(new String[argsList.size()])));
+ return null;
+ }
+ });
+ }
+ else {
+ job = new Job(createStreamJob(cfg, argsList.toArray(new String[argsList.size()])));
+ }
+
job.setJobName(name);
}
@@ -273,4 +291,14 @@ public void setNumReduceTasks(Integer numReduceTasks) {
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ /**
+ * Sets the user impersonation (optional) for running this job.
+ * Should be used when running against a Hadoop Kerberos cluster.
+ *
+ * @param user user/group information
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
}
View
34 src/main/java/org/springframework/data/hadoop/mapreduce/ToolExecutor.java
@@ -15,9 +15,11 @@
*/
package org.springframework.data.hadoop.mapreduce;
+import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.BeanClassLoaderAware;
@@ -25,6 +27,7 @@
import org.springframework.data.hadoop.configuration.ConfigurationUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
+import org.springframework.util.StringUtils;
/**
* Base class for configuring a Tool.
@@ -41,9 +44,11 @@
Resource jar;
private ClassLoader beanClassLoader;
+ private String user;
+
int runTool() throws Exception {
- Configuration cfg = ConfigurationUtils.createFrom(configuration, properties);
+ final Configuration cfg = ConfigurationUtils.createFrom(configuration, properties);
ClassLoader cl = beanClassLoader;
Tool t = tool;
@@ -57,9 +62,24 @@ int runTool() throws Exception {
Thread th = Thread.currentThread();
ClassLoader oldTccl = th.getContextClassLoader();
+ final Tool ft = t;
+
try {
th.setContextClassLoader(cl);
- return org.apache.hadoop.util.ToolRunner.run(cfg, t, arguments);
+
+ if (StringUtils.hasText(user)) {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+ return ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+
+ @Override
+ public Integer run() throws Exception {
+ return org.apache.hadoop.util.ToolRunner.run(cfg, ft, arguments);
+ }
+ });
+ }
+ else {
+ return org.apache.hadoop.util.ToolRunner.run(cfg, ft, arguments);
+ }
} finally {
th.setContextClassLoader(oldTccl);
}
@@ -126,4 +146,14 @@ public void setProperties(Properties properties) {
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;
}
+
+ /**
+ * Sets the user impersonation (optional) for running this task.
+ * Should be used when running against a Hadoop Kerberos cluster.
+ *
+ * @param user user/group information
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
}
View
33 src/main/java/org/springframework/data/hadoop/pig/PigServerFactoryBean.java
@@ -17,9 +17,11 @@
import java.io.IOException;
import java.io.InputStream;
+import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pig.PigServer;
import org.apache.pig.impl.PigContext;
import org.springframework.beans.factory.BeanInitializationException;
@@ -46,6 +48,8 @@
private Boolean validateEachStatement;
private String beanName;
+ private String user;
+
public PigServer getObject() throws Exception {
return createPigInstance();
}
@@ -58,11 +62,24 @@ public boolean isSingleton() {
return false;
}
- protected PigServer createPigInstance() throws IOException {
- PigContext ctx = (pigContext != null ? pigContext : new PigContext());
+ protected PigServer createPigInstance() throws Exception {
+ final PigContext ctx = (pigContext != null ? pigContext : new PigContext());
// apparently if not connected, pig can cause all kind of errors
- PigServer pigServer = new PigServer(ctx, true);
+ PigServer pigServer = null;
+
+ if (StringUtils.hasText(user)) {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+ pigServer = ugi.doAs(new PrivilegedExceptionAction<PigServer>() {
+ @Override
+ public PigServer run() throws Exception {
+ return new PigServer(ctx, true);
+ }
+ });
+ }
+ else {
+ pigServer = new PigServer(ctx, true);
+ }
if (!CollectionUtils.isEmpty(pathToSkip)) {
for (String path : pathToSkip) {
@@ -175,4 +192,14 @@ public void setJobPriority(String jobPriority) {
public void setValidateEachStatement(Boolean validateEachStatement) {
this.validateEachStatement = validateEachStatement;
}
+
+ /**
+ * Sets the user impersonation (optional) for executing Pig jobs.
+ * Should be used when running against a Hadoop Kerberos cluster.
+ *
+ * @param user user/group information
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
}
View
12 src/test/resources/core-site.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+ <property>
+ <name>hadoop.security.authentication</name>
+ <value>simple</value>
+ </property>
+
+</configuration>
Please sign in to comment.
Something went wrong with that request. Please try again.