Permalink
Browse files

Repair job now deletes the data after putting it into slop store + un…

…it tests
  • Loading branch information...
1 parent 6443cc8 commit a82ecad5c426ff6bfdb2bd7f88945253c39231d1 @rsumbaly rsumbaly committed with afeinberg Jan 4, 2011
@@ -124,9 +124,10 @@ public void run() {
slopStorageEngine.put(slop.makeKey(), slopVersioned, null);
repairSlops++;
}
+ engine.delete(keyAndVal.getFirst(), keyAndVal.getSecond().getVersion());
}
}
- iterator.close();
+ closeIterator(iterator);
localStats.put(storeDef.getName(), repairSlops);
logger.info("Completed store " + storeDef.getName());
}
@@ -135,8 +136,7 @@ public void run() {
logger.error(e, e);
terminatedEarly = true;
} finally {
- if(iterator != null)
- iterator.close();
+ closeIterator(iterator);
if(!terminatedEarly) {
resetStats(localStats);
@@ -147,6 +147,15 @@ public void run() {
}
+ private void closeIterator(ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> iterator) {
+ try {
+ if(iterator != null)
+ iterator.close();
+ } catch(Exception e) {
+ logger.error("Error in closing iterator", e);
+ }
+ }
+
private boolean hasDestination(List<Node> nodes) {
for(Node node: nodes) {
if(node.getId() == metadataStore.getNodeId()) {
@@ -47,6 +47,7 @@
@Test
public void testRepair() throws IOException {
+
Cluster cluster = new ClusterMapper().readCluster(new File(clusterXmlFile));
// Create 3 stores - RO, Memory
@@ -93,80 +94,117 @@ public void testRepair() throws IOException {
.setRequiredWrites(1)
.build();
- StoreRepository repository = new StoreRepository();
-
- StorageEngine<ByteArray, byte[], byte[]> store = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test1");
- repository.addStorageEngine(store);
-
- StorageEngine<ByteArray, byte[], byte[]> store2 = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test2");
- repository.addStorageEngine(store2);
-
- StorageEngine<ByteArray, byte[], byte[]> store3 = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test3");
- repository.addStorageEngine(store3);
-
// create new metadata store.
MetadataStore metadata = ServerTestUtils.createMetadataStore(cluster,
Lists.newArrayList(def1,
def2,
def3));
- repository.addLocalStore(metadata);
-
- StorageEngine<ByteArray, byte[], byte[]> slopStore = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("slop");
- SlopStorageEngine slopStorageEngine = new SlopStorageEngine(slopStore,
- metadata.getCluster());
- repository.setSlopStore(slopStorageEngine);
-
- // Populate the stores with keys
- for(int i = 0; i < NUM_KEYS; i++) {
- store.put(new ByteArray(new String("key" + i).getBytes()),
- new Versioned<byte[]>(new String("value" + i).getBytes()),
- null);
- store2.put(new ByteArray(new String("key" + i).getBytes()),
- new Versioned<byte[]>(new String("value" + i).getBytes()),
- null);
- store3.put(new ByteArray(new String("key" + i).getBytes()),
- new Versioned<byte[]>(new String("value" + i).getBytes()),
- null);
- }
-
RoutingStrategyFactory factory = new RoutingStrategyFactory();
RoutingStrategy strategy1 = factory.updateRoutingStrategy(def1, cluster), strategy3 = factory.updateRoutingStrategy(def3,
+
cluster);
for(int nodeId = 0; nodeId < 4; nodeId++) {
+ StoreRepository repository = new StoreRepository();
+
+ StorageEngine<ByteArray, byte[], byte[]> store = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test1");
+ repository.addStorageEngine(store);
+
+ StorageEngine<ByteArray, byte[], byte[]> store2 = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test2");
+ repository.addStorageEngine(store2);
+
+ StorageEngine<ByteArray, byte[], byte[]> store3 = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test3");
+ repository.addStorageEngine(store3);
+
+ repository.addLocalStore(metadata);
+
+ StorageEngine<ByteArray, byte[], byte[]> slopStore = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("slop");
+ SlopStorageEngine slopStorageEngine = new SlopStorageEngine(slopStore,
+ metadata.getCluster());
+ repository.setSlopStore(slopStorageEngine);
+
+ // Populate the stores with keys
+ for(int i = 0; i < NUM_KEYS; i++) {
+ store.put(new ByteArray(new String("key" + i).getBytes()),
+ new Versioned<byte[]>(new String("value" + i).getBytes()),
+ null);
+ store2.put(new ByteArray(new String("key" + i).getBytes()),
+ new Versioned<byte[]>(new String("value" + i).getBytes()),
+ null);
+ store3.put(new ByteArray(new String("key" + i).getBytes()),
+ new Versioned<byte[]>(new String("value" + i).getBytes()),
+ null);
+ }
+
metadata.put(MetadataStore.NODE_ID_KEY, nodeId);
RepairJob job = new RepairJob(repository, metadata, new Semaphore(1));
job.run();
- long slopsTest1 = job.getRepairSlopsPerStore("test1"), slopsTest2 = job.getRepairSlopsPerStore("test2"), slopsTest3 = job.getRepairSlopsPerStore("test3");
-
+ long slopsTest1 = job.getRepairSlopsPerStore("test1");
+ long slopsTest2 = job.getRepairSlopsPerStore("test2");
+ long slopsTest3 = job.getRepairSlopsPerStore("test3");
long slop1 = 0, slop2 = 0, slop3 = 0;
+
// Go over every slop and check if everything should be present
ClosableIterator<Pair<ByteArray, Versioned<Slop>>> iterator = slopStorageEngine.asSlopStore()
.entries();
while(iterator.hasNext()) {
Pair<ByteArray, Versioned<Slop>> keyVal = iterator.next();
String storeName = keyVal.getSecond().getValue().getStoreName();
- byte[] key = keyVal.getSecond().getValue().getKey().get();
+ ByteArray key = keyVal.getSecond().getValue().getKey();
+ List<Versioned<byte[]>> values = null;
if(storeName.compareTo("test1") == 0) {
slop1++;
- assertFalse(containsNode(strategy1.routeRequest(key), nodeId));
+ assertFalse(containsNode(strategy1.routeRequest(key.get()), nodeId));
+
+ values = store.get(key, null);
+ Assert.assertEquals(values.size(), 0);
+
} else if(storeName.compareTo("test3") == 0) {
slop3++;
- assertFalse(containsNode(strategy3.routeRequest(key), nodeId));
+ assertFalse(containsNode(strategy3.routeRequest(key.get()), nodeId));
+
+ values = store3.get(key, null);
+ Assert.assertEquals(values.size(), 0);
+
} else if(storeName.compareTo("test2") == 0) {
- slop2++;
+
Assert.fail("Cannot have slops for any other store");
}
}
Assert.assertEquals(slop1, slopsTest1);
Assert.assertEquals(slop2, slopsTest2);
Assert.assertEquals(slop3, slopsTest3);
+
+ ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> values = store.entries();
+ int valuesLeft = 0;
+ while(values.hasNext()) {
+ values.next();
+ valuesLeft++;
+ }
+ Assert.assertEquals(valuesLeft, NUM_KEYS - (slop1 / 2));
+
+ values = store2.entries();
+ valuesLeft = 0;
+ while(values.hasNext()) {
+ values.next();
+ valuesLeft++;
+ }
+ Assert.assertEquals(valuesLeft, NUM_KEYS - (slop2 / 2));
+
+ values = store3.entries();
+ valuesLeft = 0;
+ while(values.hasNext()) {
+ values.next();
+ valuesLeft++;
+ }
+ Assert.assertEquals(valuesLeft, NUM_KEYS - (slop3 / 2));
+
slopStore.truncate();
- }
+ }
}
private boolean containsNode(List<Node> nodes, int nodeId) {

0 comments on commit a82ecad

Please sign in to comment.