Permalink
Browse files

Airlifting code from infochimps-piggybank

  • Loading branch information...
1 parent 38a662f commit b1c7f2d2742af55fb91221fc633121f4a796168b @temujin9 temujin9 committed Sep 7, 2011
Showing with 25 additions and 35 deletions.
  1. +25 −35 src/main/java/com/infochimps/elasticsearch/hadoop/util/HadoopUtils.java
@@ -1,8 +1,10 @@
-package com.infochimps.elasticsearch.hadoop.util;
+package com.infochimps.hadoop.util;
import java.io.IOException;
+import java.io.FileNotFoundException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -14,36 +16,37 @@
*/
public static void uploadLocalFile(Path localsrc, Path hdfsdest, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
- if (fs.exists(hdfsdest) && fs.getFileStatus(hdfsdest).isDir()) {
- fs.delete(hdfsdest, true);
- }
- fs.copyFromLocalFile(false, true, localsrc, hdfsdest);
+ fs.copyFromLocalFile(false, true, localsrc, hdfsdest);
}
+
/**
- Fetches a file with the basename specified from the distributed cache. Returns null if no file is found
+ Upload a local file to the cluster, if it's newer or nonexistent
*/
- public static String fetchFileFromCache(String basename, Configuration conf) throws IOException {
- Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
- if (cacheFiles != null && cacheFiles.length > 0) {
- for (Path cacheFile : cacheFiles) {
- if (cacheFile.getName().equals(basename)) {
- return cacheFile.toString();
- }
+ public static void uploadLocalFileIfChanged(Path localsrc, Path hdfsdest, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus l_stat = fs.getFileStatus(localsrc);
+ try {
+ FileStatus h_stat = fs.getFileStatus(hdfsdest);
+ if ( l_stat.getModificationTime() > h_stat.getModificationTime() ) {
+ uploadLocalFile(localsrc, hdfsdest, conf);
}
}
- return null;
+ catch (FileNotFoundException e) {
+ uploadLocalFile(localsrc, hdfsdest, conf);
+ }
}
+
/**
Fetches a file with the basename specified from the distributed cache. Returns null if no file is found
*/
- public static String fetchArchiveFromCache(String basename, Configuration conf) throws IOException {
- Path[] cacheArchives = DistributedCache.getLocalCacheArchives(conf);
- if (cacheArchives != null && cacheArchives.length > 0) {
- for (Path cacheArchive : cacheArchives) {
- if (cacheArchive.getName().equals(basename)) {
- return cacheArchive.toString();
+ public static String fetchFromCache(String basename, Configuration conf) throws IOException {
+ Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
+ if (cacheFiles != null && cacheFiles.length > 0) {
+ for (Path cacheFile : cacheFiles) {
+ if (cacheFile.getName().equals(basename)) {
+ return cacheFile.toString();
}
}
}
@@ -53,26 +56,13 @@ public static String fetchArchiveFromCache(String basename, Configuration conf)
/**
Takes a path on the hdfs and ships it in the distributed cache if it is not already in the distributed cache
*/
- public static void shipFileIfNotShipped(Path hdfsPath, Configuration conf) throws IOException {
- if (fetchFileFromCache(hdfsPath.getName(), conf) == null) {
+ public static void shipIfNotShipped(Path hdfsPath, Configuration conf) throws IOException {
+ if (fetchFromCache(hdfsPath.getName(), conf) == null) {
try {
DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
-
- /**
- Takes a path on the hdfs and ships it in the distributed cache if it is not already in the distributed cache
- */
- public static void shipArchiveIfNotShipped(Path hdfsPath, Configuration conf) throws IOException {
- if (fetchArchiveFromCache(hdfsPath.getName(), conf) == null) {
- try {
- DistributedCache.addCacheArchive(hdfsPath.toUri(), conf);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
}

0 comments on commit b1c7f2d

Please sign in to comment.