Skip to content

Commit

Permalink
Finalized changes to HdfsFetcher to make it work with a Kerberized Ha…
Browse files Browse the repository at this point in the history
…doop cluster over webhdfs
  • Loading branch information
Chinmay Soman committed Dec 6, 2012
1 parent 44c4667 commit 22b99c3
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 93 deletions.
2 changes: 1 addition & 1 deletion bin/run-class.sh
Expand Up @@ -44,4 +44,4 @@ if [ -z "$VOLD_OPTS" ]; then
fi

export CLASSPATH
java -Dlog4j.configuration=$base_dir/src/java/log4j.properties $VOLD_OPTS -cp $CLASSPATH $@
java -Djava.net.preferIPv4Stack=true -Djava.security.krb5.realm=GRID.LINKEDIN.COM -Djava.security.krb5.kdc=eat1-ns01.grid.linkedin.com -Dlog4j.configuration=$base_dir/src/java/log4j.properties $VOLD_OPTS -cp $CLASSPATH $@
Expand Up @@ -21,10 +21,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.PrivilegedExceptionAction;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -152,43 +150,79 @@ public File fetch(String sourceFileUrl, String destinationFile, String hadoopCon
try {

final Configuration config = new Configuration();
FileSystem fs = null;
config.setInt("io.socket.receive.buffer", bufferSize);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.set("hadoop.security.group.mapping",
"org.apache.hadoop.security.ShellBasedUnixGroupsMapping");
config.addResource(new Path(hadoopConfigPath + "/core-site.xml"));
config.addResource(new Path(hadoopConfigPath + "/hdfs-site.xml"));

FileSystem fs = null;

final Path path = new Path(sourceFileUrl);
logger.info("Using path : " + path);

String security = config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
if(hadoopConfigPath.length() > 0) {

if(security == null || !security.equals("kerberos")) {
logger.info("Security isn't turned on in the conf: "
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + " = "
+ config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
logger.info("Fix that. Exiting.");
return null;
} else {
logger.info("Security is turned on in the conf. That's good");
config.addResource(new Path(hadoopConfigPath + "/core-site.xml"));
config.addResource(new Path(hadoopConfigPath + "/hdfs-site.xml"));

}
String security = config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);

if(security == null || !security.equals("kerberos")) {
logger.info("Security isn't turned on in the conf: "
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ " = "
+ config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
logger.info("Fix that. Exiting.");
return null;
} else {
logger.info("Security is turned on in the conf. Trying to authenticate ...");

System.err.println(HdfsFetcher.keytabPath);
}
}

try {
UserGroupInformation.setConfiguration(config);
UserGroupInformation.loginUserFromKeytab(HdfsFetcher.kerberosPrincipal,
HdfsFetcher.keytabPath);
fs = path.getFileSystem(config);
// fs = FileSystem.get(config);

if(HdfsFetcher.keytabPath.length() > 0) {

// First login using the specified principal and keytab file
UserGroupInformation.setConfiguration(config);
UserGroupInformation.loginUserFromKeytab(HdfsFetcher.kerberosPrincipal,
HdfsFetcher.keytabPath);

/*
* If login is successful, get the filesystem object. NOTE:
* Ideally we do not need a doAs block for this. Consider
* removing it in the future once the Hadoop jars have the
* corresponding patch (tracked in the Hadoop Apache
* project: HDFS-3367)
*/
try {
logger.info("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = path.getFileSystem(config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
} catch(Exception e) {
logger.error("Got an exception while getting the filesystem object: ");
logger.error("Exception class : " + e.getClass());
e.printStackTrace();
for(StackTraceElement et: e.getStackTrace()) {
logger.error(et.toString());
}
}
} else {
fs = path.getFileSystem(config);
}

} catch(IOException e) {
e.printStackTrace();
System.err.println("Error !!! Exiting !!!");
logger.error("Error in authenticating or getting the Filesystem object !!! Exiting !!!");
System.exit(-1);
}

Expand Down Expand Up @@ -509,33 +543,19 @@ public void setAsyncOperationStatus(AsyncOperationStatus status) {
this.status = status;
}

/*
* Function to add a resource to classpath at runtime In read-only case, it
* is used to load the Hadoop config in classpath
*/
public static void addPath(String s) throws Exception {
File f = new File(s);
URL u = f.toURI().toURL();
URLClassLoader urlClassLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
Class urlClass = URLClassLoader.class;
Method method = urlClass.getDeclaredMethod("addURL", new Class[] { URL.class });
method.setAccessible(true);
method.invoke(urlClassLoader, new Object[] { u });
}

/*
* Main method for testing fetching
*/
public static void main(String[] args) throws Exception {
if(args.length < 4)
if(args.length < 1)
Utils.croak("USAGE: java " + HdfsFetcher.class.getName()
+ " url [keytab location] [kerberos username] [hadoop-config-path]");
String url = args[0];

String keytabLocation = "";
String kerberosUser = "";
String hadoopPath = "";
if(args.length >= 4) {
if(args.length == 4) {
keytabLocation = args[1];
kerberosUser = args[2];
hadoopPath = args[3];
Expand All @@ -550,77 +570,82 @@ public static void main(String[] args) throws Exception {
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
config.set("hadoop.security.group.mapping",
"org.apache.hadoop.security.ShellBasedUnixGroupsMapping");
config.addResource(new Path(hadoopPath + "/core-site.xml"));
config.addResource(new Path(hadoopPath + "/hdfs-site.xml"));

FileSystem fs = null;

p = new Path(url);
System.err.println("Using path : " + p);
HdfsFetcher.keytabPath = keytabLocation;
HdfsFetcher.kerberosPrincipal = kerberosUser;

String security = config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
if(hadoopPath.length() > 0) {
config.set("hadoop.security.group.mapping",
"org.apache.hadoop.security.ShellBasedUnixGroupsMapping");

if(security == null || !security.equals("kerberos")) {
logger.info("Security isn't turned on in the conf: "
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + " = "
+ config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
logger.info("Fix that. Exiting.");
return;
} else {
logger.info("Security is turned on in the conf. That's good");
config.addResource(new Path(hadoopPath + "/core-site.xml"));
config.addResource(new Path(hadoopPath + "/hdfs-site.xml"));

}
String security = config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);

// Add the Hadoop config to classpath
// HdfsFetcher.addPath(hadoopPath);
if(security == null || !security.equals("kerberos")) {
logger.info("Security isn't turned on in the conf: "
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + " = "
+ config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
logger.info("Fix that. Exiting.");
return;
} else {
logger.info("Security is turned on in the conf. Trying to authenticate ...");
}
}

HdfsFetcher.keytabPath = keytabLocation;
HdfsFetcher.kerberosPrincipal = kerberosUser;
try {

/*
* Get the filesystem object in a secured (authenticated) block in case
* this server is talking to a Kerberized Hadoop cluster.
*
* Otherwise get the default filesystem object.
*/
/*
* if(keytabLocation.length() > 0) { logger.debug("keytab path = " +
* keytabLocation + " and Kerberos user = " + kerberosUser);
* UserGroupInformation.loginUserFromKeytab(kerberosUser,
* keytabLocation); logger.debug("I've logged in and am now Doasing as "
* + UserGroupInformation.getCurrentUser().getUserName()); try { fs =
* UserGroupInformation.getCurrentUser() .doAs(new
* PrivilegedExceptionAction<FileSystem>() {
*
* public FileSystem run() throws Exception { FileSystem fs =
* FileSystem.get(uri, config); return fs; } }); }
* catch(InterruptedException e) { logger.error(e.getMessage()); } }
* else { fs = FileSystem.get(uri, config); }
*/
// Get the filesystem object
if(keytabLocation.length() > 0) {
UserGroupInformation.setConfiguration(config);
UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabLocation);

final Path path = p;
try {
logger.debug("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = path.getFileSystem(config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
} catch(Exception e) {
logger.error("Got an exception while getting the filesystem object: ");
logger.error("Exception class : " + e.getClass());
e.printStackTrace();
for(StackTraceElement et: e.getStackTrace()) {
logger.error(et.toString());
}
}
} else {
fs = p.getFileSystem(config);
}

try {
UserGroupInformation.setConfiguration(config);
UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabLocation);
// fs = FileSystem.get(uri, config);
fs = p.getFileSystem(config);
} catch(IOException e) {
e.printStackTrace();
System.err.println("Error !!! Exiting !!!");
System.exit(-1);
}

// FileStatus status = fs.getFileStatus(p);
FileStatus status = fs.listStatus(p)[0];
long size = status.getLen();
HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec,
HdfsFetcher fetcher = new HdfsFetcher(null,
maxBytesPerSec,
VoldemortConfig.REPORTING_INTERVAL_BYTES,
VoldemortConfig.DEFAULT_BUFFER_SIZE);
VoldemortConfig.DEFAULT_BUFFER_SIZE,
0,
keytabLocation,
kerberosUser);
long start = System.currentTimeMillis();
// File location = fetcher.fetch(url,
// System.getProperty("java.io.tmpdir") + File.separator
// + start);

File location = fetcher.fetch(url, System.getProperty("java.io.tmpdir") + File.separator
+ start, hadoopPath);

Expand Down

0 comments on commit 22b99c3

Please sign in to comment.