Skip to content

Commit

Permalink
Doing authentication in a synchronized block for the Hdfs fetcher, se…
Browse files Browse the repository at this point in the history
…tting correct permission for the hadoop files
  • Loading branch information
Chinmay Soman committed Dec 6, 2012
1 parent 7693d6b commit e6b24c6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -146,7 +147,12 @@ public void conf(JobConf job) {
this.fs = this.taskIndexFileName.getFileSystem(job);

this.indexFileStream = fs.create(this.taskIndexFileName);
fs.setPermission(this.taskIndexFileName, new FsPermission("755"));
logger.info("Setting permission to 755 for " + this.taskIndexFileName);

this.valueFileStream = fs.create(this.taskValueFileName);
fs.setPermission(this.taskValueFileName, new FsPermission("755"));
logger.info("Setting permission to 755 for " + this.taskValueFileName);

logger.info("Opening " + this.taskIndexFileName + " and " + this.taskValueFileName
+ " for writing.");
Expand Down Expand Up @@ -304,6 +310,8 @@ public void close() throws IOException {
// Create output directory, if it doesn't exist
FileSystem outputFs = nodeDir.getFileSystem(this.conf);
outputFs.mkdirs(nodeDir);
outputFs.setPermission(nodeDir, new FsPermission("755"));
logger.info("Setting permission to 755 for " + nodeDir);

// Write the checksum and output files
if(this.checkSumType != CheckSumType.NONE) {
Expand All @@ -313,10 +321,12 @@ public void close() throws IOException {
Path checkSumValueFile = new Path(nodeDir, fileNamePrefix + ".data.checksum");

FSDataOutputStream output = outputFs.create(checkSumIndexFile);
outputFs.setPermission(checkSumIndexFile, new FsPermission("755"));
output.write(this.checkSumDigestIndex.getCheckSum());
output.close();

output = outputFs.create(checkSumValueFile);
outputFs.setPermission(checkSumValueFile, new FsPermission("755"));
output.write(this.checkSumDigestValue.getCheckSum());
output.close();
} else {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -117,7 +118,12 @@ public void conf(JobConf job) {
this.fs = this.taskIndexFileName[chunkId].getFileSystem(job);

this.indexFileStream[chunkId] = fs.create(this.taskIndexFileName[chunkId]);
fs.setPermission(this.taskIndexFileName[chunkId], new FsPermission("755"));
logger.info("Setting permission to 755 for " + this.taskIndexFileName[chunkId]);

this.valueFileStream[chunkId] = fs.create(this.taskValueFileName[chunkId]);
fs.setPermission(this.taskValueFileName[chunkId], new FsPermission("755"));
logger.info("Setting permission to 755 for " + this.taskValueFileName[chunkId]);

logger.info("Opening " + this.taskIndexFileName[chunkId] + " and "
+ this.taskValueFileName[chunkId] + " for writing.");
Expand Down Expand Up @@ -278,6 +284,8 @@ public void close() throws IOException {
// Create output directory, if it doesn't exist
FileSystem outputFs = nodeDir.getFileSystem(this.conf);
outputFs.mkdirs(nodeDir);
outputFs.setPermission(nodeDir, new FsPermission("755"));
logger.info("Setting permission to 755 for " + nodeDir);

// Write the checksum and output files
for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
Expand All @@ -291,10 +299,12 @@ public void close() throws IOException {
Path checkSumValueFile = new Path(nodeDir, chunkFileName + ".data.checksum");

FSDataOutputStream output = outputFs.create(checkSumIndexFile);
outputFs.setPermission(checkSumIndexFile, new FsPermission("755"));
output.write(this.checkSumDigestIndex[chunkId].getCheckSum());
output.close();

output = outputFs.create(checkSumValueFile);
outputFs.setPermission(checkSumValueFile, new FsPermission("755"));
output.write(this.checkSumDigestValue[chunkId].getCheckSum());
output.close();
} else {
Expand Down
Expand Up @@ -158,26 +158,29 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
*
* Otherwise get the default filesystem object.
*/
if(this.keytabLocation.length() > 0) {
logger.debug("keytab path = " + keytabLocation + " and proxy user = " + proxyUser);
UserGroupInformation.loginUserFromKeytab(proxyUser, keytabLocation);
logger.debug("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
try {
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = path.getFileSystem(config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
return null;
synchronized(this) {
if(this.keytabLocation.length() > 0) {
logger.info("keytab path = " + keytabLocation + " and proxy user = "
+ proxyUser);
UserGroupInformation.loginUserFromKeytab(proxyUser, keytabLocation);
logger.info("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
try {
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = path.getFileSystem(config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
return null;
}
} else {
fs = path.getFileSystem(config);
}
} else {
fs = path.getFileSystem(config);
}

CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
Expand Down Expand Up @@ -316,7 +319,7 @@ private void copyFileWithCheckSum(FileSystem fs,
File dest,
CopyStats stats,
CheckSum fileCheckSumGenerator) throws IOException {
logger.info("Starting copy of " + source + " to " + dest);
logger.debug("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
OutputStream output = null;
for(int attempt = 0; attempt < NUM_RETRIES; attempt++) {
Expand Down Expand Up @@ -378,7 +381,7 @@ private void copyFileWithCheckSum(FileSystem fs,
}

}

logger.debug("Completed copy of " + source + " to " + dest);
}
}

Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
Expand Down Expand Up @@ -470,6 +471,8 @@ public void build() {
logger.info("No data generated for node " + node.getId()
+ ". Generating empty folder");
outputFs.mkdirs(nodePath); // Create empty folder
outputFs.setPermission(nodePath, new FsPermission("755"));
logger.info("Setting permission to 755 for " + nodePath);
}

if(checkSumType != CheckSumType.NONE) {
Expand Down Expand Up @@ -518,7 +521,10 @@ public boolean accept(Path arg0) {
}

// Write metadata
FSDataOutputStream metadataStream = outputFs.create(new Path(nodePath, ".metadata"));
Path metadataPath = new Path(nodePath, ".metadata");
FSDataOutputStream metadataStream = outputFs.create(metadataPath);
outputFs.setPermission(metadataPath, new FsPermission("755"));
logger.info("Setting permission to 755 for " + metadataPath);
metadataStream.write(metadata.toJsonString().getBytes());
metadataStream.flush();
metadataStream.close();
Expand Down

0 comments on commit e6b24c6

Please sign in to comment.