Permalink
Browse files

First version of better versioned pushes

  • Loading branch information...
rsumbaly committed Aug 12, 2010
1 parent 3ba4e35 commit d4d39955c1e2dd57d732dc06f615ed15e8cf4c00
View
@@ -50,5 +50,6 @@
<classpathentry kind="lib" path="lib/avro-modified-jdk5-1.3.0.jar"/>
<classpathentry kind="lib" path="contrib/hadoop/lib/pig-0.7.1-dev-core.jar"/>
<classpathentry kind="lib" path="contrib/krati/lib/krati-0.3.4.jar"/>
+ <classpathentry kind="lib" path="lib/jna.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
@@ -55,8 +55,6 @@
public class HdfsFetcher implements FileFetcher {
private static final Logger logger = Logger.getLogger(HdfsFetcher.class);
- private static final String DEFAULT_TEMP_DIR = new File(System.getProperty("java.io.tmpdir"),
- "hdfs-fetcher").getAbsolutePath();
private static final int REPORTING_INTERVAL_BYTES = 100 * 1024 * 1024;
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
@@ -70,45 +68,40 @@
public HdfsFetcher(Props props) {
this(props.containsKey("fetcher.max.bytes.per.sec") ? props.getBytes("fetcher.max.bytes.per.sec")
: null,
- new File(props.getString("hdfs.fetcher.tmp.dir", DEFAULT_TEMP_DIR)),
(int) props.getBytes("hdfs.fetcher.buffer.size", DEFAULT_BUFFER_SIZE));
logger.info("Created hdfs fetcher with temp dir = " + tempDir.getAbsolutePath()
+ " and throttle rate " + maxBytesPerSecond + " and buffer size " + bufferSize);
}
public HdfsFetcher() {
- this((Long) null, null, DEFAULT_BUFFER_SIZE);
+ this((Long) null, DEFAULT_BUFFER_SIZE);
}
- public HdfsFetcher(Long maxBytesPerSecond, File tempDir, int bufferSize) {
- if(tempDir == null)
- this.tempDir = new File(DEFAULT_TEMP_DIR);
- else
- this.tempDir = Utils.notNull(new File(tempDir, "hdfs-fetcher"));
+ public HdfsFetcher(Long maxBytesPerSecond, int bufferSize) {
this.maxBytesPerSecond = maxBytesPerSecond;
if(this.maxBytesPerSecond != null)
this.throttler = new EventThrottler(this.maxBytesPerSecond);
this.bufferSize = bufferSize;
this.status = null;
- Utils.mkdirs(this.tempDir);
}
- public File fetch(String fileUrl, String storeName) throws IOException {
- Path path = new Path(fileUrl);
+ public File fetch(String sourceFileUrl, String destinationFile, String storeName)
+ throws IOException {
+ Path path = new Path(sourceFileUrl);
Configuration config = new Configuration();
config.setInt("io.socket.receive.buffer", bufferSize);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
FileSystem fs = path.getFileSystem(config);
- CopyStats stats = new CopyStats(fileUrl, sizeOfPath(fs, path));
+ CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
ObjectName jmxName = JmxUtils.registerMbean("hdfs-copy-" + copyCount.getAndIncrement(),
stats);
try {
File storeDir = new File(this.tempDir, storeName + "_" + System.currentTimeMillis());
Utils.mkdirs(storeDir);
- File destination = new File(storeDir.getAbsoluteFile(), path.getName());
+ File destination = new File(destinationFile);
boolean result = fetch(fs, path, destination, stats);
if(result) {
return destination;
@@ -342,9 +335,9 @@ public static void main(String[] args) throws Exception {
config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
FileStatus status = p.getFileSystem(config).getFileStatus(p);
long size = status.getLen();
- HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec, null, DEFAULT_BUFFER_SIZE);
+ HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec, DEFAULT_BUFFER_SIZE);
long start = System.currentTimeMillis();
- File location = fetcher.fetch(url, storeName);
+ File location = fetcher.fetch(url, System.getProperty("java.io.tmpdir"), storeName);
double rate = size * Time.MS_PER_SECOND / (double) (System.currentTimeMillis() - start);
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(2);
@@ -33,72 +33,87 @@
public class HdfsFetcherTest extends TestCase {
public void testFetch() throws Exception {
- File testDirectory = TestUtils.createTempDir();
+ File testSourceDirectory = TestUtils.createTempDir();
+ File testDestinationDirectory = TestUtils.createTempDir();
- File testFile = File.createTempFile("test", ".dat", testDirectory);
+ File testFile = File.createTempFile("test", ".dat", testSourceDirectory);
testFile.createNewFile();
// Test 1: No checksum file - return correctly
// Required for backward compatibility with existing hadoop stores
HdfsFetcher fetcher = new HdfsFetcher();
- File fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ File fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNotNull(fetchedFile);
// Test 2: Add checksum file with incorrect fileName, should not fail
- File checkSumFile = new File(testDirectory, "blahcheckSum.txt");
+ File checkSumFile = new File(testSourceDirectory, "blahcheckSum.txt");
checkSumFile.createNewFile();
- fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNotNull(fetchedFile);
checkSumFile.delete();
// Test 3: Add checksum file with correct fileName, but empty = wrong
// md5
- checkSumFile = new File(testDirectory, "adler32checkSum.txt");
+ checkSumFile = new File(testSourceDirectory, "adler32checkSum.txt");
checkSumFile.createNewFile();
- fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNull(fetchedFile);
// Test 4: Add wrong contents to file i.e. contents of CRC32 instead of
// Adler
- byte[] checkSumBytes = CheckSumTests.calculateCheckSum(testDirectory.listFiles(),
+ byte[] checkSumBytes = CheckSumTests.calculateCheckSum(testSourceDirectory.listFiles(),
CheckSumType.CRC32);
DataOutputStream os = new DataOutputStream(new FileOutputStream(checkSumFile));
os.write(checkSumBytes);
os.close();
- fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNull(fetchedFile);
checkSumFile.delete();
// Test 5: Add correct checksum contents - MD5
- checkSumFile = new File(testDirectory, "md5checkSum.txt");
- byte[] checkSumBytes2 = CheckSumTests.calculateCheckSum(testDirectory.listFiles(),
+ checkSumFile = new File(testSourceDirectory, "md5checkSum.txt");
+ byte[] checkSumBytes2 = CheckSumTests.calculateCheckSum(testSourceDirectory.listFiles(),
CheckSumType.MD5);
os = new DataOutputStream(new FileOutputStream(checkSumFile));
os.write(checkSumBytes2);
os.close();
- fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNotNull(fetchedFile);
checkSumFile.delete();
// Test 6: Add correct checksum contents - ADLER32
- checkSumFile = new File(testDirectory, "adler32checkSum.txt");
- byte[] checkSumBytes3 = CheckSumTests.calculateCheckSum(testDirectory.listFiles(),
+ checkSumFile = new File(testSourceDirectory, "adler32checkSum.txt");
+ byte[] checkSumBytes3 = CheckSumTests.calculateCheckSum(testSourceDirectory.listFiles(),
CheckSumType.ADLER32);
os = new DataOutputStream(new FileOutputStream(checkSumFile));
os.write(checkSumBytes3);
os.close();
- fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNotNull(fetchedFile);
checkSumFile.delete();
// Test 7: Add correct checksum contents - CRC32
- checkSumFile = new File(testDirectory, "crc32checkSum.txt");
- byte[] checkSumBytes4 = CheckSumTests.calculateCheckSum(testDirectory.listFiles(),
+ checkSumFile = new File(testSourceDirectory, "crc32checkSum.txt");
+ byte[] checkSumBytes4 = CheckSumTests.calculateCheckSum(testSourceDirectory.listFiles(),
CheckSumType.CRC32);
os = new DataOutputStream(new FileOutputStream(checkSumFile));
os.write(checkSumBytes4);
os.close();
- fetchedFile = fetcher.fetch(testDirectory.getAbsolutePath(), "storeName");
+ fetchedFile = fetcher.fetch(testSourceDirectory.getAbsolutePath(),
+ testDestinationDirectory.getAbsolutePath(),
+ "storeName");
assertNotNull(fetchedFile);
checkSumFile.delete();
@@ -172,6 +172,7 @@ private void doFetch(HttpServletRequest req, HttpServletResponse resp) throws IO
ServletException {
String fetchUrl = getRequired(req, "dir");
String storeName = getOptional(req, "store");
+ ReadOnlyStorageEngine store = this.getStore(storeName);
// fetch the files if necessary
File fetchDir = null;
@@ -181,7 +182,10 @@ private void doFetch(HttpServletRequest req, HttpServletResponse resp) throws IO
} else {
logger.info("Executing fetch of " + fetchUrl);
try {
- fetchDir = fileFetcher.fetch(fetchUrl, storeName);
+ fetchDir = fileFetcher.fetch(fetchUrl,
+ store.getStoreDirPath() + File.separator + "version-"
+ + (store.getMaxVersion() + 1),
+ storeName);
} catch(Exception e) {
throw new ServletException("Exception in Fetcher = " + e.getMessage());
}
@@ -329,10 +329,9 @@ public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdateParti
VAdminProto.SwapStoreResponse.Builder response = VAdminProto.SwapStoreResponse.newBuilder();
try {
- ReadOnlyStorageEngine store = (ReadOnlyStorageEngine) storeRepository.getStorageEngine(storeName);
- if(store == null)
- throw new VoldemortException("'" + storeName
- + "' is not a registered read-only store.");
+ ReadOnlyStorageEngine store = (ReadOnlyStorageEngine) getStorageEngine(storeRepository,
+ storeName);
+
if(!Utils.isReadableDir(dir))
throw new VoldemortException("Store directory '" + dir
+ "' is not a readable directory.");
@@ -367,6 +366,9 @@ public void markComplete() {
@Override
public void operate() {
+ ReadOnlyStorageEngine store = (ReadOnlyStorageEngine) getStorageEngine(storeRepository,
+ storeName);
+
File fetchDir = null;
if(fileFetcher == null) {
@@ -377,7 +379,11 @@ public void operate() {
updateStatus("Executing fetch of " + fetchUrl);
try {
fileFetcher.setAsyncOperationStatus(status);
- fetchDir = fileFetcher.fetch(fetchUrl, storeName);
+ fetchDir = fileFetcher.fetch(fetchUrl,
+ store.getStoreDirPath() + File.separator
+ + "version-"
+ + (store.getMaxVersion() + 1),
+ storeName);
updateStatus("Completed fetch of " + fetchUrl);
} catch(Exception e) {
throw new VoldemortException("Exception in Fetcher = " + e.getMessage());
@@ -4,6 +4,7 @@
import java.io.IOException;
import voldemort.server.protocol.admin.AsyncOperationStatus;
+
/**
* An interface to fetch data for readonly store. The fetch could be via rsync
* or hdfs. If the store is already on the local filesystem then no fetcher is
@@ -16,7 +17,7 @@
*/
public interface FileFetcher {
- public File fetch(String fileUrl, String storeName) throws IOException;
+ public File fetch(String source, String dest, String storeName) throws IOException;
public void setAsyncOperationStatus(AsyncOperationStatus status);
}
Oops, something went wrong.

0 comments on commit d4d3995

Please sign in to comment.