diff --git a/rubix-bookkeeper/src/main/java/com/qubole/rubix/bookkeeper/BookKeeper.java b/rubix-bookkeeper/src/main/java/com/qubole/rubix/bookkeeper/BookKeeper.java index 599d7537..6364adae 100644 --- a/rubix-bookkeeper/src/main/java/com/qubole/rubix/bookkeeper/BookKeeper.java +++ b/rubix-bookkeeper/src/main/java/com/qubole/rubix/bookkeeper/BookKeeper.java @@ -284,6 +284,7 @@ public boolean readData(String remotePath, long offset, int length, long fileSiz if (fs == null) { fs = path.getFileSystem(conf); + log.info("Initializing FileSystem " + fs.toString() + " for Path " + path.toString()); fs.initialize(path.toUri(), conf); inputStream = fs.open(path, blockSize); diff --git a/rubix-core/pom.xml b/rubix-core/pom.xml index 60cbfca3..ea9adb9c 100644 --- a/rubix-core/pom.xml +++ b/rubix-core/pom.xml @@ -19,15 +19,10 @@ - - org.apache.hadoop - hadoop-core - provided - - org.apache.hadoop hadoop-common + ${dep.hadoop2.version} provided @@ -122,4 +117,4 @@ - \ No newline at end of file + diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java index b7f2203c..fdb1e8bc 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java @@ -15,6 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import java.io.FileInputStream; import java.io.IOException; @@ -32,25 +33,27 @@ public class CachedReadRequestChain extends ReadRequestChain private FileChannel fileChannel = null; private RandomAccessFile raf; private int read = 0; // data read + private FileSystem.Statistics statistics = null; private ByteBuffer directBuffer; private static final Log log = LogFactory.getLog(CachedReadRequestChain.class); - public CachedReadRequestChain(String fileToRead, ByteBuffer buffer) + public CachedReadRequestChain(String fileToRead, ByteBuffer buffer, FileSystem.Statistics statistics) throws IOException { this.raf = new RandomAccessFile(fileToRead, "r"); FileInputStream fis = new FileInputStream(raf.getFD()); fileChannel = fis.getChannel(); directBuffer = buffer; + this.statistics = statistics; } @VisibleForTesting public CachedReadRequestChain(String fileToRead) throws IOException { - this(fileToRead, ByteBuffer.allocate(1024)); + this(fileToRead, ByteBuffer.allocate(1024), null); } @VisibleForTesting @@ -100,6 +103,9 @@ public Integer call() log.info(String.format("Read %d bytes from cached file", read)); fileChannel.close(); raf.close(); + if (statistics != null) { + statistics.incrementBytesRead(read); + } return read; } diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java index 05f570c0..5edd5245 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java @@ -54,6 +54,9 @@ public abstract class CachingFileSystem extends FileSystem private static ClusterManager clusterManager; private boolean cacheSkipped = false; + private boolean isRubixSchemeUsed = false; + private URI uri; + private Path workingDir; private static CachingFileSystemStats statsMBean; public BookKeeperFactory bookKeeperFactory = new BookKeeperFactory(); @@ -84,6 +87,8 @@ public CachingFileSystem() } } + public abstract String getScheme(); + public void setClusterManager(ClusterManager clusterManager) { this.clusterManager = clusterManager; @@ -102,13 +107,17 @@ public void initialize(URI uri, Configuration conf) throws IOException throw new IOException("Cluster Manager not set"); } super.initialize(uri, conf); - fs.initialize(uri, conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this); + isRubixSchemeUsed = uri.getScheme().equals(CacheConfig.RUBIX_SCHEME); + URI originalUri = getOriginalURI(uri); + fs.initialize(originalUri, conf); } @Override public URI getUri() { - return fs.getUri(); + return this.uri; } @Override @@ -123,10 +132,12 @@ public FSDataInputStream open(Path path, int bufferSize) return inputStream; } + Path originalPath = new Path(getOriginalURI(path.toUri()).getScheme(), path.toUri().getAuthority(), + path.toUri().getPath()); return new FSDataInputStream( new BufferedFSInputStream( - new CachingInputStream(this, path, this.getConf(), statsMBean, - clusterManager.getClusterType(), bookKeeperFactory, fs, bufferSize), + new CachingInputStream(this, originalPath, this.getConf(), statsMBean, + clusterManager.getClusterType(), bookKeeperFactory, fs, bufferSize, statistics), CacheConfig.getBlockSize(getConf()))); } @@ -180,19 +191,23 @@ public boolean delete(Path path, boolean b) public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { - return fs.listStatus(path); + FileStatus[] files = fs.listStatus(path); + for (int i = 0; i < files.length; i++) { + files[i].setPath(getRubixPath(files[i].getPath(), isRubixSchemeUsed)); + } + return files; } @Override public void setWorkingDirectory(Path path) { - fs.setWorkingDirectory(path); + this.workingDir = path; } @Override public Path getWorkingDirectory() { - return fs.getWorkingDirectory(); + return this.workingDir; } @Override @@ -206,7 +221,27 @@ public boolean mkdirs(Path path, FsPermission fsPermission) public FileStatus getFileStatus(Path path) throws IOException { - return fs.getFileStatus(path); + FileStatus originalStatus = fs.getFileStatus(path); + originalStatus.setPath(getRubixPath(originalStatus.getPath(), isRubixSchemeUsed)); + return originalStatus; + } + + private Path getRubixPath(Path remotePath, boolean isRubixSchemeUsed) + { + String remotePathScheme = remotePath.toUri().getScheme(); + if (remotePathScheme.equals(getScheme()) && isRubixSchemeUsed) { + return new Path(CacheConfig.RUBIX_SCHEME, remotePath.toUri().getAuthority(), remotePath.toUri().getPath()); + } + return remotePath; + } + + private URI getOriginalURI(URI actualURI) + { + String actualScheme = actualURI.getScheme(); + if (!actualScheme.equals(CacheConfig.RUBIX_SCHEME)) { + return actualURI; + } + return URI.create(getScheme() + "://" + actualURI.getAuthority()); } @Override diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java index 37c154b2..15a4aa01 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java @@ -72,6 +72,7 @@ public class CachingInputStream private boolean strictMode = false; ClusterType clusterType; FileSystem remoteFileSystem; + FileSystem.Statistics statistics = null; private static DirectBufferPool bufferPool = new DirectBufferPool(); private ByteBuffer directWriteBuffer = null; @@ -83,7 +84,7 @@ public class CachingInputStream public CachingInputStream(FileSystem parentFs, Path backendPath, Configuration conf, CachingFileSystemStats statsMbean, ClusterType clusterType, BookKeeperFactory bookKeeperFactory, FileSystem remoteFileSystem, - int bufferSize) throws IOException + int bufferSize, FileSystem.Statistics statistics) throws IOException { this.remotePath = backendPath.toString(); FileStatus fileStatus = parentFs.getFileStatus(backendPath); @@ -94,13 +95,14 @@ public CachingInputStream(FileSystem parentFs, Path backendPath, Configuration c this.statsMbean = statsMbean; this.clusterType = clusterType; this.bufferSize = bufferSize; + this.statistics = statistics; } @VisibleForTesting public CachingInputStream(FSDataInputStream inputStream, Configuration conf, Path backendPath, long size, long lastModified, CachingFileSystemStats statsMbean, ClusterType clusterType, BookKeeperFactory bookKeeperFactory, - FileSystem remoteFileSystem) + FileSystem remoteFileSystem, int buffersize, FileSystem.Statistics statistics) throws IOException { this.inputStream = inputStream; @@ -112,6 +114,7 @@ public CachingInputStream(FSDataInputStream inputStream, Configuration conf, Pat this.clusterType = clusterType; this.remoteFileSystem = remoteFileSystem; this.bufferSize = bufferSize; + this.statistics = statistics; } private void initialize(Configuration conf, BookKeeperFactory bookKeeperFactory) @@ -365,7 +368,7 @@ else if (isCached.get(idx).getLocation() == Location.CACHED) { directReadBuffer = bufferPool.getBuffer(diskReadBufferSize); } if (cachedReadRequestChain == null) { - cachedReadRequestChain = new CachedReadRequestChain(localPath, directReadBuffer); + cachedReadRequestChain = new CachedReadRequestChain(localPath, directReadBuffer, statistics); } cachedReadRequestChain.addReadRequest(readRequest); } @@ -383,7 +386,9 @@ else if (isCached.get(idx).getLocation() == Location.CACHED) { String remoteLocation = isCached.get(idx).getRemoteLocation(); log.debug(String.format("Sending block %d to NonLocalReadRequestChain to node : %s", blockNum, remoteLocation)); if (!nonLocalRequests.containsKey(remoteLocation)) { - NonLocalReadRequestChain nonLocalReadRequestChain = new NonLocalReadRequestChain(remoteLocation, fileSize, lastModified, conf, remoteFileSystem, remotePath, clusterType.ordinal(), strictMode); + NonLocalReadRequestChain nonLocalReadRequestChain = + new NonLocalReadRequestChain(remoteLocation, fileSize, lastModified, conf, + remoteFileSystem, remotePath, clusterType.ordinal(), strictMode, statistics); nonLocalRequests.put(remoteLocation, nonLocalReadRequestChain); } nonLocalRequests.get(remoteLocation).addReadRequest(readRequest); diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java b/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java index 2f06e76a..b23bd9df 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java @@ -47,12 +47,15 @@ public class NonLocalReadRequestChain extends ReadRequestChain FileSystem remoteFileSystem; int clusterType; public boolean strictMode = false; + FileSystem.Statistics statistics = null; DirectReadRequestChain directReadChain = null; // Used when Non Local Requests fail private static final Log log = LogFactory.getLog(NonLocalReadRequestChain.class); - public NonLocalReadRequestChain(String remoteLocation, long fileSize, long lastModified, Configuration conf, FileSystem remoteFileSystem, String remotePath, int clusterType, boolean strictMode) + public NonLocalReadRequestChain(String remoteLocation, long fileSize, long lastModified, Configuration conf, + FileSystem remoteFileSystem, String remotePath, int clusterType, + boolean strictMode, FileSystem.Statistics statistics) { this.remoteNodeName = remoteLocation; this.remoteFileSystem = remoteFileSystem; @@ -62,6 +65,7 @@ public NonLocalReadRequestChain(String remoteLocation, long fileSize, long lastM this.conf = conf; this.clusterType = clusterType; this.strictMode = strictMode; + this.statistics = statistics; } public ReadRequestChainStats getStats() @@ -147,6 +151,9 @@ public Integer call() } } finally { + if (statistics != null) { + statistics.incrementBytesRead(totalRead); + } try { log.info(String.format("Read %d bytes internally from node %s", totalRead, remoteNodeName)); dataTransferClient.close(); diff --git a/rubix-core/src/test/java/com/qubole/rubix/core/MockCachingFileSystem.java b/rubix-core/src/test/java/com/qubole/rubix/core/MockCachingFileSystem.java index d30a58a7..5260248f 100644 --- a/rubix-core/src/test/java/com/qubole/rubix/core/MockCachingFileSystem.java +++ b/rubix-core/src/test/java/com/qubole/rubix/core/MockCachingFileSystem.java @@ -35,6 +35,7 @@ public class MockCachingFileSystem { private static final Log log = LogFactory.getLog(MockCachingFileSystem.class); Configuration conf; + private static final String SCHEME = "file"; @Override public void initialize(URI uri, Configuration conf) @@ -44,6 +45,10 @@ public void initialize(URI uri, Configuration conf) log.debug("Initializing TestCachingFileSystem"); } + public String getScheme() + { + return SCHEME; + } @Override public FSDataInputStream open(Path path, int i) @@ -56,7 +61,8 @@ public FSDataInputStream open(Path path, int i) new BufferedFSInputStream( new CachingInputStream(new FSDataInputStream(inputStream), conf, path, file.length(), file.lastModified(), new CachingFileSystemStats(), - ClusterType.TEST_CLUSTER_MANAGER, bookKeeperFactory, this), + ClusterType.TEST_CLUSTER_MANAGER, bookKeeperFactory, this, + CacheConfig.getBlockSize(conf), statistics), CacheConfig.getBlockSize(conf))); } diff --git a/rubix-hadoop1/src/main/java/com/qubole/rubix/hadoop1/CachingNativeS3FileSystem.java b/rubix-hadoop1/src/main/java/com/qubole/rubix/hadoop1/CachingNativeS3FileSystem.java index 09bcd290..99cd40a5 100644 --- a/rubix-hadoop1/src/main/java/com/qubole/rubix/hadoop1/CachingNativeS3FileSystem.java +++ b/rubix-hadoop1/src/main/java/com/qubole/rubix/hadoop1/CachingNativeS3FileSystem.java @@ -36,6 +36,8 @@ public CachingNativeS3FileSystem() super(); } + private static final String SCHEME = "s3n"; + @Override public void initialize(URI uri, Configuration conf) throws IOException { @@ -50,6 +52,11 @@ public void initialize(URI uri, Configuration conf) throws IOException super.initialize(uri, conf); } + public String getScheme() + { + return SCHEME; + } + private synchronized void initializeClusterManager(Configuration conf) { if (clusterManager != null) { diff --git a/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingDistributedFileSystem.java b/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingDistributedFileSystem.java index 1096d012..66cab0ac 100644 --- a/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingDistributedFileSystem.java +++ b/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingDistributedFileSystem.java @@ -27,6 +27,7 @@ public class CachingDistributedFileSystem extends CachingFileSystem { private static final Log LOG = LogFactory.getLog(CachingNativeS3FileSystem.class); + private static final String SCHEME = "s3n"; private ClusterManager clusterManager; @@ -37,6 +38,11 @@ public CachingNativeS3FileSystem() super(); } + public String getScheme() + { + return SCHEME; + } + @Override public void initialize(URI uri, Configuration conf) throws IOException diff --git a/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingS3AFileSystem.java b/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingS3AFileSystem.java index 433363c4..996e5b47 100644 --- a/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingS3AFileSystem.java +++ b/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/CachingS3AFileSystem.java @@ -26,6 +26,7 @@ public class CachingS3AFileSystem extends CachingFileSystem { private static final Log LOG = LogFactory.getLog(CachingS3AFileSystem.class); private ClusterManager clusterManager; + private static final String SCHEME = "s3a"; public CachingS3AFileSystem() throws IOException { @@ -43,6 +44,11 @@ public void initialize(URI uri, Configuration conf) throws IOException super.initialize(uri, conf); } + public String getScheme() + { + return SCHEME; + } + private synchronized void initializeClusterManager(Configuration conf) { if (clusterManager != null) { diff --git a/rubix-presto/src/main/java/com/qubole/rubix/presto/CachingPrestoS3FileSystem.java b/rubix-presto/src/main/java/com/qubole/rubix/presto/CachingPrestoS3FileSystem.java index 3cf3a6b1..fdeeb20d 100644 --- a/rubix-presto/src/main/java/com/qubole/rubix/presto/CachingPrestoS3FileSystem.java +++ b/rubix-presto/src/main/java/com/qubole/rubix/presto/CachingPrestoS3FileSystem.java @@ -30,6 +30,7 @@ public CachingPrestoS3FileSystem() { super(); } + private static final String SCHEME = "s3n"; @Override public void initialize(URI uri, Configuration conf) throws IOException @@ -42,6 +43,11 @@ public void initialize(URI uri, Configuration conf) throws IOException super.initialize(uri, conf); } + public String getScheme() + { + return SCHEME; + } + private synchronized void initializeClusterManager(Configuration conf) { if (clusterManager == null) { diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java index 111355a1..1c5bb1f3 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java @@ -42,6 +42,7 @@ public class CacheConfig { + public static final String RUBIX_SCHEME = "rubix"; public static final String DATA_CACHE_ENABLED = "hadoop.cache.data.enabled"; public static final String DATA_CACHE_TABLE_WHITELIST = "hadoop.cache.data.table.whitelist"; public static final String DATA_CACHE_TABLE = "hadoop.cache.data.table"; diff --git a/rubix-tests/src/test/java/com/qubole/rubix/tests/TestCachingInputStream.java b/rubix-tests/src/test/java/com/qubole/rubix/tests/TestCachingInputStream.java index c55d3a1c..1b02f0e3 100644 --- a/rubix-tests/src/test/java/com/qubole/rubix/tests/TestCachingInputStream.java +++ b/rubix-tests/src/test/java/com/qubole/rubix/tests/TestCachingInputStream.java @@ -127,7 +127,7 @@ public void createCachingStream(Configuration conf) // This should be after server comes up else client could not be created inputStream = new CachingInputStream(fsDataInputStream, conf, backendPath, file.length(), file.lastModified(), new CachingFileSystemStats(), ClusterType.TEST_CLUSTER_MANAGER, - new BookKeeperFactory(), null); + new BookKeeperFactory(), null, CacheConfig.getBlockSize(conf), null); } diff --git a/rubix-tests/src/test/java/com/qubole/rubix/tests/TestNonLocalReadRequestChain.java b/rubix-tests/src/test/java/com/qubole/rubix/tests/TestNonLocalReadRequestChain.java index 907ded7f..58e73438 100644 --- a/rubix-tests/src/test/java/com/qubole/rubix/tests/TestNonLocalReadRequestChain.java +++ b/rubix-tests/src/test/java/com/qubole/rubix/tests/TestNonLocalReadRequestChain.java @@ -112,7 +112,9 @@ public void run() conf.setClass("fs.testfile.impl", MockCachingFileSystem.class, FileSystem.class); MockCachingFileSystem fs = new MockCachingFileSystem(); fs.initialize(null, conf); - nonLocalReadRequestChain = new NonLocalReadRequestChain("localhost", backendFile.length(), backendFile.lastModified(), conf, fs, backendPath.toString(), ClusterType.TEST_CLUSTER_MANAGER.ordinal(), false); + nonLocalReadRequestChain = new NonLocalReadRequestChain("localhost", backendFile.length(), + backendFile.lastModified(), conf, fs, backendPath.toString(), + ClusterType.TEST_CLUSTER_MANAGER.ordinal(), false, null); } @Test