Skip to content

Commit

Permalink
Tweaks and corrections to rebalancing.
Browse files Browse the repository at this point in the history
Removed un-needed CountDownLatch from RebalanceController.

Increased default number of async operation scheduler threads to 6.

Improvements to MetadataStore:
    * Corrected mutations of REBALANCE_STEAL_INFO
    * Improved synchronization (using a simple lock for now)
    * Corrected clean up behaviour.
  • Loading branch information
afeinberg committed May 7, 2010
1 parent 1fdce3f commit 141b9ac
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 23 deletions.
10 changes: 2 additions & 8 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -28,7 +28,6 @@ public class RebalanceController {
private static Logger logger = Logger.getLogger(RebalanceController.class);

private final AdminClient adminClient;
private final Random random = new Random(SEED);
RebalanceClientConfig rebalanceConfig;

public RebalanceController(String bootstrapUrl, RebalanceClientConfig rebalanceConfig) {
Expand Down Expand Up @@ -132,7 +131,6 @@ public void run() {
final int stealerNodeId = rebalanceTask.getStealerNode();
final SetMultimap<Integer, RebalancePartitionsInfo> rebalanceSubTaskMap = divideRebalanceNodePlan(rebalanceTask);
final Set<Integer> parallelDonors = rebalanceSubTaskMap.keySet();
final CountDownLatch latch = new CountDownLatch(parallelDonors.size());
ExecutorService parallelDonorExecutor = createExecutors(rebalanceConfig.getMaxParallelRebalancing());

for (final int donorNodeId: parallelDonors) {
Expand Down Expand Up @@ -176,20 +174,16 @@ public void run() {
}
} catch(Exception e) {
logger.error("Rebalancing task failed with exception", e);
} finally {
latch.countDown();
}
}
}
});
}

try {
latch.await(rebalanceConfig.getRebalancingClientTimeoutSeconds() * parallelDonors.size(),
TimeUnit.SECONDS);
} catch (InterruptedException e) {
executorShutDown(parallelDonorExecutor);
} catch (Exception e) {
logger.error("Interrupted", e);
Thread.currentThread().interrupt();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -260,7 +260,7 @@ public VoldemortConfig(Props props) {
this.gossipInterval = props.getInt("gossip.interval.ms", 30 * 1000);
this.pusherPollMs = props.getInt("pusher.poll.ms", 2 * 60 * 1000);

this.schedulerThreads = props.getInt("scheduler.threads", 3);
this.schedulerThreads = props.getInt("scheduler.threads", 6);

this.numCleanupPermits = props.getInt("num.cleanup.permits", 1);

Expand Down
49 changes: 35 additions & 14 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -53,7 +53,9 @@ private boolean acquireRebalancingPermit(int donorNodeId) {
if (!rebalancePermitMap.containsKey(donorNodeId)) {
rebalancePermitMap.put(donorNodeId, new AtomicBoolean(false));
}
if(rebalancePermitMap.get(donorNodeId).compareAndSet(false, true)) {
AtomicBoolean rebalancePermit = rebalancePermitMap.get(donorNodeId);
if(rebalancePermit.compareAndSet(false, true)) {
rebalancePermitMap.put(donorNodeId, rebalancePermit);
return true;
}
}
Expand Down Expand Up @@ -89,7 +91,7 @@ public void run() {
} else {
logger.warn("Rebalancing for rebalancing task " + stealInfo
+ " failed multiple times, Aborting more trials.");
metadataStore.cleanAllRebalancingState();
metadataStore.cleanRebalancingState(stealInfo);
}
} catch(Exception e) {
logger.error("RebalanceService rebalancing attempt " + stealInfo
Expand Down Expand Up @@ -207,7 +209,8 @@ public void run() {
+ " completed successfully.");
// clean state only if
// successfull.
metadataStore.cleanAllRebalancingState();
// operation, not all operations.
metadataStore.cleanRebalancingState(stealInfo);
} else {
throw new VoldemortRebalancingException("Failed to rebalance task "
+ stealInfo,
Expand Down Expand Up @@ -286,20 +289,38 @@ private void rebalanceStore(String storeName,
return requestId;
}

private synchronized void setRebalancingState(MetadataStore metadataStore, RebalancePartitionsInfo stealInfo) {
metadataStore.put(MetadataStore.SERVER_STATE_KEY, VoldemortState.REBALANCING_MASTER_SERVER);
List<RebalancePartitionsInfo> stealInfoList = metadataStore.getRebalancingStealInfo();
stealInfoList.add(stealInfo);
metadataStore.put(MetadataStore.REBALANCING_STEAL_INFO, stealInfoList);
private void setRebalancingState(MetadataStore metadataStore, RebalancePartitionsInfo stealInfo) {
synchronized (metadataStore.getLock()) {
metadataStore.put(MetadataStore.SERVER_STATE_KEY, VoldemortState.REBALANCING_MASTER_SERVER);
List<RebalancePartitionsInfo> stealInfoList = metadataStore.getRebalancingStealInfo();

int index = -1;
for (int i=0; i < stealInfoList.size(); i++) {
if (stealInfoList.get(i).getDonorId() == stealInfo.getDonorId()) {
index = i;
break;
}
}
if (index != -1) {
stealInfoList.remove(index);
stealInfoList.add(index, stealInfo);
} else {
stealInfoList.add(stealInfo);
}

metadataStore.put(MetadataStore.REBALANCING_STEAL_INFO, stealInfoList);
}
}

private void checkCurrentState(MetadataStore metadataStore, RebalancePartitionsInfo stealInfo) {
if(metadataStore.getServerState().equals(VoldemortState.REBALANCING_MASTER_SERVER)
&& Iterables.any(metadataStore.getRebalancingStealInfo(), new DonorIdPredicate(stealInfo.getDonorId())))
throw new VoldemortException("Server " + metadataStore.getNodeId()
+ " is already rebalancing from:"
+ Joiner.on(",").join(metadataStore.getRebalancingStealInfo())
+ " rejecting rebalance request:" + stealInfo);
synchronized (metadataStore.getLock()) {
if(metadataStore.getServerState().equals(VoldemortState.REBALANCING_MASTER_SERVER)
&& Iterables.any(metadataStore.getRebalancingStealInfo(), new DonorIdPredicate(stealInfo.getDonorId())))
throw new VoldemortException("Server " + metadataStore.getNodeId()
+ " is already rebalancing from:"
+ Joiner.on(",").join(metadataStore.getRebalancingStealInfo())
+ " rejecting rebalance request:" + stealInfo);
}
}

private ExecutorService createExecutors(int numThreads) {
Expand Down
36 changes: 36 additions & 0 deletions src/java/voldemort/store/metadata/MetadataStore.java
Expand Up @@ -108,6 +108,8 @@ public static enum VoldemortState {
private static final StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper();
private static final RoutingStrategyFactory routingFactory = new RoutingStrategyFactory();

private static final Object lock = new Object();

private static final Logger logger = Logger.getLogger(MetadataStore.class);

public MetadataStore(Store<String, String> innerStore, int nodeId) {
Expand Down Expand Up @@ -206,6 +208,10 @@ public Object getCapability(StoreCapabilityType capability) {
return innerStore.getCapability(capability);
}

public Object getLock() {
return lock;
}

/**
* @param key : keyName strings serialized as bytes eg. 'cluster.xml'
* @return List of values (only 1 for Metadata) versioned byte[] eg. UTF
Expand Down Expand Up @@ -253,6 +259,36 @@ public void cleanAllRebalancingState() {
init(getNodeId());
}

public void cleanRebalancingState(RebalancePartitionsInfo stealInfo) {
synchronized (lock) {
List<RebalancePartitionsInfo> stealInfoList = getRebalancingStealInfo();


// TODO: just implement equals/hashCode for RebalancePartitionsInfo
int index = -1;
for (int i=0; i < stealInfoList.size(); i++) {
if (stealInfoList.get(i).getDonorId() == stealInfo.getDonorId()) {
index = i;
break;
}
}
if (index != -1) {
stealInfoList.remove(index);
} else {
throw new IllegalArgumentException("Couldn't find " + stealInfo + " in stealInfoList " +
RebalancePartitionsInfo.listToJsonString(stealInfoList));
}

if (stealInfoList.isEmpty()) {
logger.debug("stealInfoList empty, cleaning all rebalancing state");
cleanAllRebalancingState();
} else {
put(REBALANCING_STEAL_INFO, stealInfoList);
initCache(REBALANCING_STEAL_INFO);
}
}
}

public List<Version> getVersions(ByteArray key) {
List<Versioned<byte[]>> values = get(key);
List<Version> versions = new ArrayList<Version>(values.size());
Expand Down

0 comments on commit 141b9ac

Please sign in to comment.