Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updates to files based on review comments (regarding style, dead code…

… and unused semaphore)
  • Loading branch information...
commit 9bc9159121f2d62fd95f04c1364f1a4d56f1df1b 1 parent ee466b9
Chinmay Soman authored
View
18 bin/vold-gen-data.sh
@@ -1,18 +0,0 @@
-#!/bin/bash
-
-# Check argument
-[ $# -lt 1 ] && echo "Arguments : #keys <output_file>" && exit -1;
-
-NUM_KEYS=${1};
-OUTPUT_FILE="bin/vold_auto_populate.txt"
-
-[ $# -gt 1 ] && OUTPUT_FILE=${2};
-
-# Delete output file if it already exists
-`rm -f ${OUTPUT_FILE} > /dev/null 2>&1`
-
-for((i=0;i<${NUM_KEYS};i++))
-do
- echo "put \"${i}\" \"${i}_value\"" >> ${OUTPUT_FILE}
-done
-
View
1  config/test_config2/config/server.properties
@@ -4,6 +4,7 @@ bdb.flush.transactions=false
bdb.cache.size=100MB
max.threads=100
+enable.repair=true
http.enable=true
socket.enable=true
View
4 src/java/voldemort/VoldemortAdminTool.java
@@ -430,10 +430,10 @@ public static void main(String[] args) throws Exception {
private static void executeRepairJob(Integer nodeId, AdminClient adminClient) {
if(nodeId < 0) {
for(Node node: adminClient.getAdminClientCluster().getNodes()) {
- adminClient.RepairJob(node.getId());
+ adminClient.repairJob(node.getId());
}
} else {
- adminClient.RepairJob(nodeId);
+ adminClient.repairJob(nodeId);
}
}
View
2  src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -1585,7 +1585,7 @@ public void rollbackStore(int nodeId, String storeName, long pushVersion) {
*
* @param nodeId The id of the node on which to do the repair
*/
- public void RepairJob(int nodeId) {
+ public void repairJob(int nodeId) {
VAdminProto.RepairJobRequest.Builder rebalanceRepairRequest = VAdminProto.RepairJobRequest.newBuilder();
VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
View
8 src/java/voldemort/server/StoreRepository.java
@@ -78,7 +78,7 @@
/*
* Repair Job object registered with StoreRepository
*/
- private RepairJob _repairJob;
+ private RepairJob repairJob;
public StoreRepository() {
super();
@@ -228,10 +228,10 @@ public boolean hasSlopStore() {
}
public RepairJob getRepairJob() {
- return _repairJob;
+ return repairJob;
}
- public void registerRepairJob(RepairJob repairJob) {
- _repairJob = repairJob;
+ public void registerRepairJob(RepairJob job) {
+ repairJob = job;
}
}
View
9 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -27,7 +27,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
@@ -103,9 +102,6 @@
private final StreamStats stats;
private FileFetcher fileFetcher;
- // Rebalance repair semaphore
- private Semaphore repairSemaphore;
-
public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
StorageService storageService,
StoreRepository storeRepository,
@@ -125,9 +121,6 @@ public AdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
this.rebalancer = rebalancer;
this.stats = stats;
setFetcherClass(voldemortConfig);
-
- // Rebalance repair semaphore initialization
- repairSemaphore = new Semaphore(1);
}
private void setFetcherClass(VoldemortConfig voldemortConfig) {
@@ -550,8 +543,6 @@ public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdateParti
public VAdminProto.RepairJobResponse handleRebalanceRepair(VAdminProto.RepairJobRequest request) {
VAdminProto.RepairJobResponse.Builder response = VAdminProto.RepairJobResponse.newBuilder();
try {
- // RepairJob job = new RepairJob(storeRepository, metadataStore,
- // repairSemaphore, true);
RepairJob job = storeRepository.getRepairJob();
logger.info("Starting the repair job now on ID : " + metadataStore.getNodeId());
job.run();
View
19 src/java/voldemort/server/scheduler/slop/RepairJob.java
@@ -30,7 +30,7 @@
public class RepairJob implements Runnable {
- private final int DELETE_BATCH = 1000;
+ private int DELETE_BATCH = 1000;
private final static Logger logger = Logger.getLogger(RepairJob.class.getName());
public final static List<String> blackList = Arrays.asList("mysql", "krati", "read-only");
@@ -39,7 +39,7 @@
private final StoreRepository storeRepo;
private final MetadataStore metadataStore;
private final Map<String, Long> storeStats;
- private final boolean deleteOnly;
+ private boolean deleteOnly;
public RepairJob(StoreRepository storeRepo, MetadataStore metadataStore, Semaphore repairPermits) {
this.storeRepo = storeRepo;
@@ -53,13 +53,19 @@ public RepairJob(StoreRepository storeRepo,
MetadataStore metadataStore,
Semaphore repairPermits,
boolean deleteOnly) {
- this.storeRepo = storeRepo;
- this.metadataStore = metadataStore;
- this.repairPermits = Utils.notNull(repairPermits);
- this.storeStats = Maps.newHashMap();
+ this(storeRepo, metadataStore, repairPermits);
this.deleteOnly = deleteOnly;
}
+ public RepairJob(StoreRepository storeRepo,
+ MetadataStore metadataStore,
+ Semaphore repairPermits,
+ boolean deleteOnly,
+ int deleteBatchSize) {
+ this(storeRepo, metadataStore, repairPermits, deleteOnly);
+ this.DELETE_BATCH = deleteBatchSize;
+ }
+
@JmxOperation(description = "Start the Repair Job thread", impact = MBeanOperationInfo.ACTION)
public void startRepairJob() {
run();
@@ -98,7 +104,6 @@ public void run() {
Date startTime = new Date();
boolean terminatedEarly = false;
logger.info("Started repair job at " + startTime);
- System.out.println("Started repair job at " + startTime);
Map<String, Long> localStats = Maps.newHashMap();
for(StoreDefinition storeDef: metadataStore.getStoreDefList()) {
View
18 src/java/voldemort/server/storage/StorageService.java
@@ -228,18 +228,14 @@ protected void startInner() {
scanPermits),
nextRun,
voldemortConfig.getSlopFrequencyMs());
+ }
- // Run the repair job only if slop pusher job is enabled
- // CHANGE: The repiar job is not scheduled automatically.
- // Invoked only via JMX
- if(voldemortConfig.isRepairEnabled()) {
- logger.info("Initializing repair job " + voldemortConfig.getPusherType());
- RepairJob job = new RepairJob(storeRepository, metadata, scanPermits);
- JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
- storeRepository.registerRepairJob(job);
-
- // scheduler.schedule("repair", job, nextRun);
- }
+ // Create a repair job object and register it with Store repository
+ if(voldemortConfig.isRepairEnabled()) {
+ logger.info("Initializing repair job " + voldemortConfig.getPusherType());
+ RepairJob job = new RepairJob(storeRepository, metadata, scanPermits);
+ JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
+ storeRepository.registerRepairJob(job);
}
}
View
31 src/java/voldemort/utils/KeyLocationValidation.java
@@ -13,6 +13,11 @@
public class KeyLocationValidation {
+ private final int nodeId;
+ private final ByteArray keyList;
+ private final Cluster cluster;
+ private final StoreDefinition storeDef;
+
public KeyLocationValidation(Cluster cluster,
int nodeId,
StoreDefinition storeDef,
@@ -23,24 +28,21 @@ public KeyLocationValidation(Cluster cluster,
this.storeDef = storeDef;
}
- private int nodeId;
- private ByteArray keyList;
- private Cluster cluster;
- private StoreDefinition storeDef;
-
/*
* Validate location of the 'keyList'
*
- * @param testType Indicates how to validate True: Positive test (the keys
- * should be present on nodeId). False : Negative test (the keys should not
- * be present on nodeId)
+ * @param positiveTest Indicates how to validate True: Positive test (the
+ * keys should be present on nodeId). False : Negative test (the keys should
+ * not be present on nodeId)
*/
- public boolean validate(boolean testType) {
+ public boolean validate(boolean positiveTest) {
+ boolean retVal = false;
+
SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2,
10000,
100000,
32 * 1024);
- // Cache connections to all nodes for this store in advance
+ // Cache connections to all nodes for this store and given node
Store<ByteArray, byte[], byte[]> socketStore = socketStoreFactory.create(storeDef.getName(),
cluster.getNodeById(nodeId)
.getHost(),
@@ -50,12 +52,13 @@ public boolean validate(boolean testType) {
RequestRoutingType.IGNORE_CHECKS);
List<Versioned<byte[]>> value = socketStore.get(keyList, null);
- if(testType == false && (value == null || value.size() == 0)) {
- return true;
+ if(!positiveTest && (value == null || value.size() == 0)) {
+ retVal = true;
} else if(value != null && value.size() != 0) {
- return true;
+ retVal = true;
}
- return false;
+ socketStore.close();
+ return retVal;
}
}
View
52 test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java
@@ -423,11 +423,13 @@ public void testRebalanceCleanPrimary() throws Exception {
Lists.newArrayList(3));
// start servers 0 , 1, 2
+ Map<String, String> configProps = new HashMap<String, String>();
+ configProps.put("enable.repair", "true");
List<Integer> serverList = Arrays.asList(0, 1, 2);
currentCluster = startServers(currentCluster,
rwStoreDefFileWithReplication,
serverList,
- null);
+ configProps);
// Update the cluster information based on the node information
targetCluster = updateCluster(targetCluster);
@@ -446,13 +448,13 @@ public void testRebalanceCleanPrimary() throws Exception {
ByteArray[] checkKeysNegative = new ByteArray[20];
List<Integer> movedPartitions = new ArrayList<Integer>();
movedPartitions.add(3);
- AdminClient _admin = rebalanceClient.getAdminClient();
+ AdminClient admin = rebalanceClient.getAdminClient();
Iterator<ByteArray> keys = null;
- keys = _admin.fetchKeys(1,
- rwStoreDefWithReplication.getName(),
- movedPartitions,
- null,
- false);
+ keys = admin.fetchKeys(1,
+ rwStoreDefWithReplication.getName(),
+ movedPartitions,
+ null,
+ false);
int keyIndex = 0;
while(keys.hasNext() && keyIndex < 20) {
checkKeysNegative[keyIndex++] = keys.next();
@@ -462,11 +464,11 @@ public void testRebalanceCleanPrimary() throws Exception {
List<Integer> stablePartitions = new ArrayList<Integer>();
stablePartitions.add(1);
Iterator<ByteArray> keys2 = null;
- keys2 = _admin.fetchKeys(1,
- rwStoreDefWithReplication.getName(),
- stablePartitions,
- null,
- false);
+ keys2 = admin.fetchKeys(1,
+ rwStoreDefWithReplication.getName(),
+ stablePartitions,
+ null,
+ false);
int keyIndex2 = 0;
while(keys2.hasNext() && keyIndex2 < 20) {
checkKeysPositive[keyIndex2++] = keys2.next();
@@ -482,7 +484,7 @@ public void testRebalanceCleanPrimary() throws Exception {
// Do the cleanup operation
for(int i = 0; i < 3; i++) {
- _admin.RepairJob(i);
+ admin.repairJob(i);
}
boolean cleanNode = true;
@@ -549,13 +551,13 @@ public void testRebalanceCleanSecondary() throws Exception {
ByteArray[] checkKeysNegative = new ByteArray[20];
List<Integer> movedPartitions = new ArrayList<Integer>();
movedPartitions.add(3);
- AdminClient _admin = rebalanceClient.getAdminClient();
+ AdminClient admin = rebalanceClient.getAdminClient();
Iterator<ByteArray> keys = null;
- keys = _admin.fetchKeys(1,
- rwStoreDefWithReplication.getName(),
- movedPartitions,
- null,
- false);
+ keys = admin.fetchKeys(1,
+ rwStoreDefWithReplication.getName(),
+ movedPartitions,
+ null,
+ false);
int keyIndex = 0;
while(keys.hasNext() && keyIndex < 20) {
checkKeysNegative[keyIndex++] = keys.next();
@@ -565,11 +567,11 @@ public void testRebalanceCleanSecondary() throws Exception {
List<Integer> stablePartitions = new ArrayList<Integer>();
stablePartitions.add(3);
Iterator<ByteArray> keys2 = null;
- keys2 = _admin.fetchKeys(0,
- rwStoreDefWithReplication.getName(),
- stablePartitions,
- null,
- false);
+ keys2 = admin.fetchKeys(0,
+ rwStoreDefWithReplication.getName(),
+ stablePartitions,
+ null,
+ false);
int keyIndex2 = 0;
while(keys2.hasNext() && keyIndex2 < 20) {
checkKeysPositive[keyIndex2++] = keys2.next();
@@ -585,7 +587,7 @@ public void testRebalanceCleanSecondary() throws Exception {
// Do the cleanup operation
for(int i = 0; i < 3; i++) {
- _admin.RepairJob(i);
+ admin.repairJob(i);
}
boolean cleanNode = true;
Please sign in to comment.
Something went wrong with that request. Please try again.