Skip to content

Commit

Permalink
new: usr: Adding support for rubix scheme and populating filesystem c…
Browse files Browse the repository at this point in the history
…ounters (#77)
  • Loading branch information
abhishekdas99 committed Nov 28, 2017
1 parent 7511eea commit 3242001
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public boolean readData(String remotePath, long offset, int length, long fileSiz

if (fs == null) {
fs = path.getFileSystem(conf);
log.info("Initializing FileSystem " + fs.toString() + " for Path " + path.toString());
fs.initialize(path.toUri(), conf);

inputStream = fs.open(path, blockSize);
Expand Down
9 changes: 2 additions & 7 deletions rubix-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@

<dependencies>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${dep.hadoop2.version}</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -122,4 +117,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;

import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -32,25 +33,27 @@ public class CachedReadRequestChain extends ReadRequestChain
private FileChannel fileChannel = null;
private RandomAccessFile raf;
private int read = 0; // data read
private FileSystem.Statistics statistics = null;

private ByteBuffer directBuffer;

private static final Log log = LogFactory.getLog(CachedReadRequestChain.class);

public CachedReadRequestChain(String fileToRead, ByteBuffer buffer)
public CachedReadRequestChain(String fileToRead, ByteBuffer buffer, FileSystem.Statistics statistics)
throws IOException
{
this.raf = new RandomAccessFile(fileToRead, "r");
FileInputStream fis = new FileInputStream(raf.getFD());
fileChannel = fis.getChannel();
directBuffer = buffer;
this.statistics = statistics;
}

@VisibleForTesting
public CachedReadRequestChain(String fileToRead)
throws IOException
{
this(fileToRead, ByteBuffer.allocate(1024));
this(fileToRead, ByteBuffer.allocate(1024), null);
}

@VisibleForTesting
Expand Down Expand Up @@ -100,6 +103,9 @@ public Integer call()
log.info(String.format("Read %d bytes from cached file", read));
fileChannel.close();
raf.close();
if (statistics != null) {
statistics.incrementBytesRead(read);
}
return read;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public abstract class CachingFileSystem<T extends FileSystem> extends FileSystem
private static ClusterManager clusterManager;

private boolean cacheSkipped = false;
private boolean isRubixSchemeUsed = false;
private URI uri;
private Path workingDir;

private static CachingFileSystemStats statsMBean;
public BookKeeperFactory bookKeeperFactory = new BookKeeperFactory();
Expand Down Expand Up @@ -84,6 +87,8 @@ public CachingFileSystem()
}
}

public abstract String getScheme();

public void setClusterManager(ClusterManager clusterManager)
{
this.clusterManager = clusterManager;
Expand All @@ -102,13 +107,17 @@ public void initialize(URI uri, Configuration conf) throws IOException
throw new IOException("Cluster Manager not set");
}
super.initialize(uri, conf);
fs.initialize(uri, conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this);
isRubixSchemeUsed = uri.getScheme().equals(CacheConfig.RUBIX_SCHEME);
URI originalUri = getOriginalURI(uri);
fs.initialize(originalUri, conf);
}

@Override
public URI getUri()
{
return fs.getUri();
return this.uri;
}

@Override
Expand All @@ -123,10 +132,12 @@ public FSDataInputStream open(Path path, int bufferSize)
return inputStream;
}

Path originalPath = new Path(getOriginalURI(path.toUri()).getScheme(), path.toUri().getAuthority(),
path.toUri().getPath());
return new FSDataInputStream(
new BufferedFSInputStream(
new CachingInputStream(this, path, this.getConf(), statsMBean,
clusterManager.getClusterType(), bookKeeperFactory, fs, bufferSize),
new CachingInputStream(this, originalPath, this.getConf(), statsMBean,
clusterManager.getClusterType(), bookKeeperFactory, fs, bufferSize, statistics),
CacheConfig.getBlockSize(getConf())));
}

Expand Down Expand Up @@ -180,19 +191,23 @@ public boolean delete(Path path, boolean b)
public FileStatus[] listStatus(Path path)
throws FileNotFoundException, IOException
{
return fs.listStatus(path);
FileStatus[] files = fs.listStatus(path);
for (int i = 0; i < files.length; i++) {
files[i].setPath(getRubixPath(files[i].getPath(), isRubixSchemeUsed));
}
return files;
}

@Override
public void setWorkingDirectory(Path path)
{
fs.setWorkingDirectory(path);
this.workingDir = path;
}

@Override
public Path getWorkingDirectory()
{
return fs.getWorkingDirectory();
return this.workingDir;
}

@Override
Expand All @@ -206,7 +221,27 @@ public boolean mkdirs(Path path, FsPermission fsPermission)
public FileStatus getFileStatus(Path path)
throws IOException
{
return fs.getFileStatus(path);
FileStatus originalStatus = fs.getFileStatus(path);
originalStatus.setPath(getRubixPath(originalStatus.getPath(), isRubixSchemeUsed));
return originalStatus;
}

private Path getRubixPath(Path remotePath, boolean isRubixSchemeUsed)
{
String remotePathScheme = remotePath.toUri().getScheme();
if (remotePathScheme.equals(getScheme()) && isRubixSchemeUsed) {
return new Path(CacheConfig.RUBIX_SCHEME, remotePath.toUri().getAuthority(), remotePath.toUri().getPath());
}
return remotePath;
}

private URI getOriginalURI(URI actualURI)
{
String actualScheme = actualURI.getScheme();
if (!actualScheme.equals(CacheConfig.RUBIX_SCHEME)) {
return actualURI;
}
return URI.create(getScheme() + "://" + actualURI.getAuthority());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class CachingInputStream
private boolean strictMode = false;
ClusterType clusterType;
FileSystem remoteFileSystem;
FileSystem.Statistics statistics = null;

private static DirectBufferPool bufferPool = new DirectBufferPool();
private ByteBuffer directWriteBuffer = null;
Expand All @@ -83,7 +84,7 @@ public class CachingInputStream
public CachingInputStream(FileSystem parentFs, Path backendPath, Configuration conf,
CachingFileSystemStats statsMbean, ClusterType clusterType,
BookKeeperFactory bookKeeperFactory, FileSystem remoteFileSystem,
int bufferSize) throws IOException
int bufferSize, FileSystem.Statistics statistics) throws IOException
{
this.remotePath = backendPath.toString();
FileStatus fileStatus = parentFs.getFileStatus(backendPath);
Expand All @@ -94,13 +95,14 @@ public CachingInputStream(FileSystem parentFs, Path backendPath, Configuration c
this.statsMbean = statsMbean;
this.clusterType = clusterType;
this.bufferSize = bufferSize;
this.statistics = statistics;
}

@VisibleForTesting
public CachingInputStream(FSDataInputStream inputStream, Configuration conf, Path backendPath,
long size, long lastModified, CachingFileSystemStats statsMbean,
ClusterType clusterType, BookKeeperFactory bookKeeperFactory,
FileSystem remoteFileSystem)
FileSystem remoteFileSystem, int buffersize, FileSystem.Statistics statistics)
throws IOException
{
this.inputStream = inputStream;
Expand All @@ -112,6 +114,7 @@ public CachingInputStream(FSDataInputStream inputStream, Configuration conf, Pat
this.clusterType = clusterType;
this.remoteFileSystem = remoteFileSystem;
this.bufferSize = bufferSize;
this.statistics = statistics;
}

private void initialize(Configuration conf, BookKeeperFactory bookKeeperFactory)
Expand Down Expand Up @@ -365,7 +368,7 @@ else if (isCached.get(idx).getLocation() == Location.CACHED) {
directReadBuffer = bufferPool.getBuffer(diskReadBufferSize);
}
if (cachedReadRequestChain == null) {
cachedReadRequestChain = new CachedReadRequestChain(localPath, directReadBuffer);
cachedReadRequestChain = new CachedReadRequestChain(localPath, directReadBuffer, statistics);
}
cachedReadRequestChain.addReadRequest(readRequest);
}
Expand All @@ -383,7 +386,9 @@ else if (isCached.get(idx).getLocation() == Location.CACHED) {
String remoteLocation = isCached.get(idx).getRemoteLocation();
log.debug(String.format("Sending block %d to NonLocalReadRequestChain to node : %s", blockNum, remoteLocation));
if (!nonLocalRequests.containsKey(remoteLocation)) {
NonLocalReadRequestChain nonLocalReadRequestChain = new NonLocalReadRequestChain(remoteLocation, fileSize, lastModified, conf, remoteFileSystem, remotePath, clusterType.ordinal(), strictMode);
NonLocalReadRequestChain nonLocalReadRequestChain =
new NonLocalReadRequestChain(remoteLocation, fileSize, lastModified, conf,
remoteFileSystem, remotePath, clusterType.ordinal(), strictMode, statistics);
nonLocalRequests.put(remoteLocation, nonLocalReadRequestChain);
}
nonLocalRequests.get(remoteLocation).addReadRequest(readRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ public class NonLocalReadRequestChain extends ReadRequestChain
FileSystem remoteFileSystem;
int clusterType;
public boolean strictMode = false;
FileSystem.Statistics statistics = null;

DirectReadRequestChain directReadChain = null; // Used when Non Local Requests fail

private static final Log log = LogFactory.getLog(NonLocalReadRequestChain.class);

public NonLocalReadRequestChain(String remoteLocation, long fileSize, long lastModified, Configuration conf, FileSystem remoteFileSystem, String remotePath, int clusterType, boolean strictMode)
public NonLocalReadRequestChain(String remoteLocation, long fileSize, long lastModified, Configuration conf,
FileSystem remoteFileSystem, String remotePath, int clusterType,
boolean strictMode, FileSystem.Statistics statistics)
{
this.remoteNodeName = remoteLocation;
this.remoteFileSystem = remoteFileSystem;
Expand All @@ -62,6 +65,7 @@ public NonLocalReadRequestChain(String remoteLocation, long fileSize, long lastM
this.conf = conf;
this.clusterType = clusterType;
this.strictMode = strictMode;
this.statistics = statistics;
}

public ReadRequestChainStats getStats()
Expand Down Expand Up @@ -147,6 +151,9 @@ public Integer call()
}
}
finally {
if (statistics != null) {
statistics.incrementBytesRead(totalRead);
}
try {
log.info(String.format("Read %d bytes internally from node %s", totalRead, remoteNodeName));
dataTransferClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class MockCachingFileSystem
{
private static final Log log = LogFactory.getLog(MockCachingFileSystem.class);
Configuration conf;
private static final String SCHEME = "file";

@Override
public void initialize(URI uri, Configuration conf)
Expand All @@ -44,6 +45,10 @@ public void initialize(URI uri, Configuration conf)
log.debug("Initializing TestCachingFileSystem");
}

public String getScheme()
{
return SCHEME;
}

@Override
public FSDataInputStream open(Path path, int i)
Expand All @@ -56,7 +61,8 @@ public FSDataInputStream open(Path path, int i)
new BufferedFSInputStream(
new CachingInputStream(new FSDataInputStream(inputStream), conf, path, file.length(),
file.lastModified(), new CachingFileSystemStats(),
ClusterType.TEST_CLUSTER_MANAGER, bookKeeperFactory, this),
ClusterType.TEST_CLUSTER_MANAGER, bookKeeperFactory, this,
CacheConfig.getBlockSize(conf), statistics),
CacheConfig.getBlockSize(conf)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public CachingNativeS3FileSystem()
super();
}

private static final String SCHEME = "s3n";

@Override
public void initialize(URI uri, Configuration conf) throws IOException
{
Expand All @@ -50,6 +52,11 @@ public void initialize(URI uri, Configuration conf) throws IOException
super.initialize(uri, conf);
}

public String getScheme()
{
return SCHEME;
}

private synchronized void initializeClusterManager(Configuration conf)
{
if (clusterManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class CachingDistributedFileSystem extends CachingFileSystem<DistributedF
{
private static final Log LOG = LogFactory.getLog(CachingDistributedFileSystem.class);
private ClusterManager clusterManager;
private static final String SCHEME = "hdfs";

@Override
public void initialize(URI uri, Configuration conf) throws IOException
Expand All @@ -39,6 +40,11 @@ public void initialize(URI uri, Configuration conf) throws IOException
super.initialize(uri, conf);
}

public String getScheme()
{
return SCHEME;
}

private synchronized void initializeClusterManager(Configuration conf)
{
if (clusterManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class CachingNativeS3FileSystem
extends CachingFileSystem<NativeS3FileSystem>
{
private static final Log LOG = LogFactory.getLog(CachingNativeS3FileSystem.class);
private static final String SCHEME = "s3n";

private ClusterManager clusterManager;

Expand All @@ -37,6 +38,11 @@ public CachingNativeS3FileSystem()
super();
}

public String getScheme()
{
return SCHEME;
}

@Override
public void initialize(URI uri, Configuration conf)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class CachingS3AFileSystem extends CachingFileSystem<S3AFileSystem>
{
private static final Log LOG = LogFactory.getLog(CachingS3AFileSystem.class);
private ClusterManager clusterManager;
private static final String SCHEME = "s3a";

public CachingS3AFileSystem() throws IOException
{
Expand All @@ -43,6 +44,11 @@ public void initialize(URI uri, Configuration conf) throws IOException
super.initialize(uri, conf);
}

public String getScheme()
{
return SCHEME;
}

private synchronized void initializeClusterManager(Configuration conf)
{
if (clusterManager != null) {
Expand Down
Loading

0 comments on commit 3242001

Please sign in to comment.