Permalink
Browse files

Removed the slop pusher code from Repair Job

  • Loading branch information...
Chinmay Soman
Chinmay Soman committed Aug 11, 2011
1 parent 4a194df commit 389cfbf1f8fac6bb6288948a4d85724862873e87
@@ -19,7 +19,6 @@
import voldemort.store.StorageEngine; import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition; import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore; import voldemort.store.metadata.MetadataStore;
import voldemort.store.slop.Slop;
import voldemort.utils.ByteArray; import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator; import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair; import voldemort.utils.Pair;
@@ -39,31 +38,21 @@
private final StoreRepository storeRepo; private final StoreRepository storeRepo;
private final MetadataStore metadataStore; private final MetadataStore metadataStore;
private final Map<String, Long> storeStats; private final Map<String, Long> storeStats;
private final boolean deleteOnly;
private final int deleteBatchSize; private final int deleteBatchSize;
public RepairJob(StoreRepository storeRepo, public RepairJob(StoreRepository storeRepo,
MetadataStore metadataStore, MetadataStore metadataStore,
Semaphore repairPermits, Semaphore repairPermits,
boolean deleteOnly,
int deleteBatchSize) { int deleteBatchSize) {
this.storeRepo = storeRepo; this.storeRepo = storeRepo;
this.metadataStore = metadataStore; this.metadataStore = metadataStore;
this.repairPermits = Utils.notNull(repairPermits); this.repairPermits = Utils.notNull(repairPermits);
this.storeStats = Maps.newHashMap(); this.storeStats = Maps.newHashMap();
this.deleteOnly = deleteOnly;
this.deleteBatchSize = deleteBatchSize; this.deleteBatchSize = deleteBatchSize;
} }
public RepairJob(StoreRepository storeRepo, MetadataStore metadataStore, Semaphore repairPermits) { public RepairJob(StoreRepository storeRepo, MetadataStore metadataStore, Semaphore repairPermits) {
this(storeRepo, metadataStore, repairPermits, true, DELETE_BATCH_SIZE); this(storeRepo, metadataStore, repairPermits, DELETE_BATCH_SIZE);
}
public RepairJob(StoreRepository storeRepo,
MetadataStore metadataStore,
Semaphore repairPermits,
boolean deleteOnly) {
this(storeRepo, metadataStore, repairPermits, deleteOnly, DELETE_BATCH_SIZE);
} }
@JmxOperation(description = "Start the Repair Job thread", impact = MBeanOperationInfo.ACTION) @JmxOperation(description = "Start the Repair Job thread", impact = MBeanOperationInfo.ACTION)
@@ -116,9 +105,6 @@ public void run() {
// Get routing factory // Get routing factory
RoutingStrategyFactory routingStrategyFactory = new RoutingStrategyFactory(); RoutingStrategyFactory routingStrategyFactory = new RoutingStrategyFactory();
// Get slop store
StorageEngine<ByteArray, Slop, byte[]> slopStorageEngine = storeRepo.getSlopStore()
.asSlopStore();
for(StoreDefinition storeDef: metadataStore.getStoreDefList()) { for(StoreDefinition storeDef: metadataStore.getStoreDefList()) {
if(isWritableStore(storeDef)) { if(isWritableStore(storeDef)) {
logger.info("Repairing store " + storeDef.getName()); logger.info("Repairing store " + storeDef.getName());
@@ -137,22 +123,6 @@ public void run() {
List<Node> nodes = routingStrategy.routeRequest(keyAndVal.getFirst().get()); List<Node> nodes = routingStrategy.routeRequest(keyAndVal.getFirst().get());
if(!hasDestination(nodes)) { if(!hasDestination(nodes)) {
if(!deleteOnly) {
for(Node node: nodes) {
Slop slop = new Slop(storeDef.getName(),
Slop.Operation.PUT,
keyAndVal.getFirst(),
keyAndVal.getSecond().getValue(),
null,
node.getId(),
new Date());
Versioned<Slop> slopVersioned = new Versioned<Slop>(slop,
keyAndVal.getSecond()
.getVersion());
slopStorageEngine.put(slop.makeKey(), slopVersioned, null);
repairSlops++;
}
}
engine.delete(keyAndVal.getFirst(), keyAndVal.getSecond().getVersion()); engine.delete(keyAndVal.getFirst(), keyAndVal.getSecond().getVersion());
numDeletedKeys++; numDeletedKeys++;
} }
@@ -233,7 +233,7 @@ protected void startInner() {
// Create a repair job object and register it with Store repository // Create a repair job object and register it with Store repository
if(voldemortConfig.isRepairEnabled()) { if(voldemortConfig.isRepairEnabled()) {
logger.info("Initializing repair job " + voldemortConfig.getPusherType()); logger.info("Initializing repair job " + voldemortConfig.getPusherType());
RepairJob job = new RepairJob(storeRepository, metadata, scanPermits, true); RepairJob job = new RepairJob(storeRepository, metadata, scanPermits);
JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass())); JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
storeRepository.registerRepairJob(job); storeRepository.registerRepairJob(job);
} }

0 comments on commit 389cfbf

Please sign in to comment.