Permalink
Browse files

Added configurable Kerberos support to HdfsFetcher and upgraded hadoo…

…p jars to 1.0.2
  • Loading branch information...
1 parent daa49bf commit c745f800aab2751edc22a3fdfa69fbb63d459778 Chinmay Soman committed May 14, 2012
View
@@ -15,11 +15,12 @@
<classpathentry kind="src" path="contrib/collections/src/java"/>
<classpathentry kind="src" path="contrib/collections/test"/>
<classpathentry kind="lib" path="lib/catalina-ant.jar"/>
- <classpathentry kind="lib" path="lib/commons-codec-1.3.jar"/>
+ <classpathentry kind="lib" path="lib/commons-codec-1.4.jar"/>
<classpathentry kind="lib" path="lib/commons-dbcp-1.2.2.jar"/>
<classpathentry kind="lib" path="lib/colt-1.2.0.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-cli-2.0-SNAPSHOT.jar"/>
- <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.20.2-core.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-configuration-1.6.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-core-1.0.2-p1.jar"/>
<classpathentry kind="lib" path="lib/junit-4.6.jar"/>
<classpathentry kind="lib" path="lib/log4j-1.2.15.jar"/>
<classpathentry kind="lib" path="lib/jetty-6.1.18.jar"/>
Binary file not shown.
Binary file not shown.
@@ -21,6 +21,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Comparator;
@@ -36,6 +38,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
@@ -69,6 +72,8 @@
private long minBytesPerSecond = 0;
private DynamicThrottleLimit globalThrottleLimit = null;
private static final int NUM_RETRIES = 3;
+ private String keytabLocation = "";
+ private String proxyUser = "voldemrt";
public HdfsFetcher(VoldemortConfig config) {
this(config.getMaxBytesPerSecond(),
@@ -82,9 +87,12 @@ public HdfsFetcher(VoldemortConfig config) {
public HdfsFetcher(VoldemortConfig config, DynamicThrottleLimit dynThrottleLimit) {
this(dynThrottleLimit,
+ null,
config.getReportingIntervalBytes(),
config.getFetcherBufferSize(),
- config.getMinBytesPerSecond());
+ config.getMinBytesPerSecond(),
+ config.getReadOnlyKeytabPath(),
+ config.getReadOnlyKerberosProxyUser());
logger.info("Created hdfs fetcher with throttle rate " + dynThrottleLimit.getRate()
+ ", buffer size " + bufferSize + ", reporting interval bytes "
@@ -98,21 +106,16 @@ public HdfsFetcher() {
}
public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int bufferSize) {
- this(null, maxBytesPerSecond, reportingIntervalBytes, bufferSize, 0);
- }
-
- public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
- Long reportingIntervalBytes,
- int bufferSize,
- long minBytesPerSecond) {
- this(dynThrottleLimit, null, reportingIntervalBytes, bufferSize, minBytesPerSecond);
+ this(null, maxBytesPerSecond, reportingIntervalBytes, bufferSize, 0, "", "");
}
public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
Long maxBytesPerSecond,
Long reportingIntervalBytes,
int bufferSize,
- long minBytesPerSecond) {
+ long minBytesPerSecond,
+ String keytabLocation,
+ String proxyUser) {
if(maxBytesPerSecond != null) {
this.maxBytesPerSecond = maxBytesPerSecond;
this.throttler = new EventThrottler(this.maxBytesPerSecond);
@@ -128,6 +131,8 @@ public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
this.bufferSize = bufferSize;
this.status = null;
this.minBytesPerSecond = minBytesPerSecond;
+ this.keytabLocation = keytabLocation;
+ this.proxyUser = proxyUser;
}
public File fetch(String sourceFileUrl, String destinationFile) throws IOException {
@@ -140,12 +145,40 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
ObjectName jmxName = null;
try {
- Path path = new Path(sourceFileUrl);
- Configuration config = new Configuration();
+ final Path path = new Path(sourceFileUrl);
+ final Configuration config = new Configuration();
config.setInt("io.socket.receive.buffer", bufferSize);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
- FileSystem fs = path.getFileSystem(config);
+ FileSystem fs = null;
+
+ /*
+ * Get the filesystem object in a secured (authenticated) block in
+ * case this server is talking to a Kerberized Hadoop cluster.
+ *
+ * Otherwise get the default filesystem object.
+ */
+ if(this.keytabLocation.length() > 0) {
+ logger.debug("keytab path = " + keytabLocation + " and proxy user = " + proxyUser);
+ UserGroupInformation.loginUserFromKeytab(proxyUser, keytabLocation);
+ logger.debug("I've logged in and am now Doasing as "
+ + UserGroupInformation.getCurrentUser().getUserName());
+ try {
+ fs = UserGroupInformation.getCurrentUser()
+ .doAs(new PrivilegedExceptionAction<FileSystem>() {
+
+ public FileSystem run() throws Exception {
+ FileSystem fs = path.getFileSystem(config);
+ return fs;
+ }
+ });
+ } catch(InterruptedException e) {
+ logger.error(e.getMessage());
+ return null;
+ }
+ } else {
+ fs = path.getFileSystem(config);
+ }
CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
jmxName = JmxUtils.registerMbean("hdfs-copy-" + copyCount.getAndIncrement(), stats);
@@ -158,6 +191,9 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
boolean result = fetch(fs, path, destination, stats);
+ // Close the filesystem
+ fs.close();
+
if(result) {
return destination;
} else {
@@ -460,16 +496,50 @@ public void setAsyncOperationStatus(AsyncOperationStatus status) {
*/
public static void main(String[] args) throws Exception {
if(args.length != 1)
- Utils.croak("USAGE: java " + HdfsFetcher.class.getName() + " url");
+ Utils.croak("USAGE: java " + HdfsFetcher.class.getName()
+ + " url [keytab location] [kerberos username]");
String url = args[0];
+ String keytabLocation = args[1];
+ String proxyUser = args[2];
long maxBytesPerSec = 1024 * 1024 * 1024;
Path p = new Path(url);
- Configuration config = new Configuration();
+
+ final Configuration config = new Configuration();
+ final URI uri = new URI(url);
config.setInt("io.file.buffer.size", VoldemortConfig.DEFAULT_BUFFER_SIZE);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
- FileStatus status = p.getFileSystem(config).getFileStatus(p);
+ FileSystem fs = null;
+
+ /*
+ * Get the filesystem object in a secured (authenticated) block in case
+ * this server is talking to a Kerberized Hadoop cluster.
+ *
+ * Otherwise get the default filesystem object.
+ */
+ if(keytabLocation.length() > 0) {
+ logger.debug("keytab path = " + keytabLocation + " and proxy user = " + proxyUser);
+ UserGroupInformation.loginUserFromKeytab(proxyUser, keytabLocation);
+ logger.debug("I've logged in and am now Doasing as "
+ + UserGroupInformation.getCurrentUser().getUserName());
+ try {
+ fs = UserGroupInformation.getCurrentUser()
+ .doAs(new PrivilegedExceptionAction<FileSystem>() {
+
+ public FileSystem run() throws Exception {
+ FileSystem fs = FileSystem.get(uri, config);
+ return fs;
+ }
+ });
+ } catch(InterruptedException e) {
+ logger.error(e.getMessage());
+ }
+ } else {
+ fs = FileSystem.get(uri, config);
+ }
+
+ FileStatus status = fs.getFileStatus(p);
long size = status.getLen();
HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec,
VoldemortConfig.REPORTING_INTERVAL_BYTES,
@@ -482,5 +552,6 @@ public static void main(String[] args) throws Exception {
nf.setMaximumFractionDigits(2);
System.out.println("Fetch to " + location + " completed: "
+ nf.format(rate / (1024.0 * 1024.0)) + " MB/sec.");
+ fs.close();
}
}
Binary file not shown.
Binary file not shown.
@@ -1490,11 +1490,15 @@ public String waitForCompletion(int nodeId,
long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);
String description = null;
+ String oldStatus = "";
while(System.currentTimeMillis() < waitUntil) {
try {
AsyncOperationStatus status = getAsyncRequestStatus(nodeId, requestId);
- logger.info("Status from node " + nodeId + " (" + status.getDescription() + ") - "
- + status.getStatus());
+ if(!status.getStatus().equalsIgnoreCase(oldStatus))
+ logger.info("Status from node " + nodeId + " (" + status.getDescription()
+ + ") - " + status.getStatus());
+ oldStatus = status.getStatus();
+
if(higherStatus != null) {
higherStatus.setStatus("Status from node " + nodeId + " ("
+ status.getDescription() + ") - " + status.getStatus());
@@ -108,6 +108,8 @@
private long minBytesPerSecond;
private long reportingIntervalBytes;
private int fetcherBufferSize;
+ private String readOnlyKeytabPath;
+ private String readOnlyKerberosProxyUser;
private OpTimeMap testingSlowQueueingDelays;
private OpTimeMap testingSlowConcurrentDelays;
@@ -269,6 +271,8 @@ public VoldemortConfig(Props props) {
REPORTING_INTERVAL_BYTES);
this.fetcherBufferSize = (int) props.getBytes("hdfs.fetcher.buffer.size",
DEFAULT_BUFFER_SIZE);
+ this.readOnlyKeytabPath = props.getString("readonly.keytab.path", "");
+ this.readOnlyKerberosProxyUser = props.getString("readonly.kerberos.proxyuser", "voldemrt");
// TODO probably turn to false by default?
this.setUseMlock(props.getBoolean("readonly.mlock.index", true));
@@ -1582,6 +1586,22 @@ public void setReadOnlyDeleteBackupMs(int readOnlyDeleteBackupTimeMs) {
this.readOnlyDeleteBackupTimeMs = readOnlyDeleteBackupTimeMs;
}
+ public String getReadOnlyKeytabPath() {
+ return readOnlyKeytabPath;
+ }
+
+ public void setReadOnlyKeytabPath(String readOnlyKeytabPath) {
+ this.readOnlyKeytabPath = readOnlyKeytabPath;
+ }
+
+ public String getReadOnlyKerberosProxyUser() {
+ return readOnlyKerberosProxyUser;
+ }
+
+ public void setReadOnlyKerberosProxyUser(String readOnlyKerberosProxyUser) {
+ this.readOnlyKerberosProxyUser = readOnlyKerberosProxyUser;
+ }
+
public int getSocketBufferSize() {
return socketBufferSize;
}

0 comments on commit c745f80

Please sign in to comment.