Skip to content

Commit

Permalink
Separate simple admin ops from streaming ops and clean up ConsistencyFix
Browse files Browse the repository at this point in the history
src/java/voldemort/client/protocol/admin/AdminClient.java
- separated simple StoerOperations from StreamingOperations
- note: renamed 'StreamingStoreOperations storeOps' to 'StreamingOperations streamingOps'
- pulled the exception handling logic out of queryKeys/repairEntries and reanmed to more generic names (getNodeKey and putNodeKeyValue).

src/java/voldemort/client/protocol/admin/QueryKeyResult.java
- made CTors public

src/java/voldemort/utils/ConsistencyFix.java
- significant clean up & refactoring
- match changes in voldemortadmintool

other
- some other TODOS
- other files touched due to renames within AdminClient
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 3c2fe12 commit 02e8063
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 195 deletions.
4 changes: 2 additions & 2 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -1363,7 +1363,7 @@ private static void executeUpdateEntries(Integer nodeId,
for(String storeName: storeNames) {
Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = readEntriesBinary(inputDir,
storeName);
adminClient.storeOps.updateEntries(nodeId, storeName, iterator, null);
adminClient.streamingOps.updateEntries(nodeId, storeName, iterator, null);
}

}
Expand Down Expand Up @@ -1623,7 +1623,7 @@ private static void executeQueryKeys(final Integer nodeId,
listKeys.add(new ByteArray(serializer.toBytes(key)));
}
for(final String storeName: storeNames) {
final Iterator<QueryKeyResult> iterator = adminClient.storeOps.queryKeys(nodeId.intValue(),
final Iterator<QueryKeyResult> iterator = adminClient.streamingOps.queryKeys(nodeId.intValue(),
storeName,
listKeys.iterator());
List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId)
Expand Down
102 changes: 47 additions & 55 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -158,7 +158,8 @@ public class AdminClient {
final public AdminClient.StoreManagementOperations storeMgmtOps;
final public AdminClient.StoreMaintenanceOperations storeMntOps;
final public AdminClient.BulkStreamingFetchOperations bulkFetchOps;
final public AdminClient.StreamingStoreOperations storeOps;
final public AdminClient.StreamingOperations streamingOps;
final public AdminClient.StoreOperations storeOps;
final public AdminClient.RestoreOperations restoreOps;
final public AdminClient.RebalancingOperations rebalanceOps;
final public AdminClient.ReadOnlySpecificOperations readonlyOps;
Expand All @@ -175,7 +176,8 @@ private AdminClient(AdminClientConfig adminClientConfig) {
this.storeMgmtOps = this.new StoreManagementOperations();
this.storeMntOps = this.new StoreMaintenanceOperations();
this.bulkFetchOps = this.new BulkStreamingFetchOperations();
this.storeOps = this.new StreamingStoreOperations();
this.streamingOps = this.new StreamingOperations();
this.storeOps = this.new StoreOperations();
this.restoreOps = this.new RestoreOperations();
this.rebalanceOps = this.new RebalancingOperations();
this.readonlyOps = this.new ReadOnlySpecificOperations();
Expand Down Expand Up @@ -1913,8 +1915,8 @@ private class NodeStore {

private final ConcurrentMap<NodeStore, SocketStore> nodeStoreSocketCache;

// TODO: Pass in a ClientConfig or a AdminClientConfig?
AdminStoreClient() {
// TODO: Pass in a ClientConfig or a AdminClientConfig?
this.clientConfig = new ClientConfig();
clientPool = new ClientRequestExecutorPool(clientConfig.getSelectors(),
clientConfig.getMaxConnectionsPerNode(),
Expand Down Expand Up @@ -1956,15 +1958,47 @@ public void stop() {
}
}

// TODO: Rename StreamingStoreOperations to StoreOperations? Or pull out the
// query/update/repair stuff that operates on individual keys into new inner
// class StoreOperations.
public class StoreOperations {

/**
* This method updates exactly one key/value for a specific store on a
* specific node.
*
* @param storeName Name of the store
* @param nodeKeyValue A specific key/value to update on a specific
* node.
* @return RepairEntryResult with success/exception details.
*/
public void putNodeKeyValue(String storeName, NodeValue<ByteArray, byte[]> nodeKeyValue) {
SocketStore socketStore = adminStoreClient.getSocketStore(nodeKeyValue.getNodeId(),
storeName);

socketStore.put(nodeKeyValue.getKey(), nodeKeyValue.getVersioned(), null);
}

/**
* Fetch key/value tuple for given key for a specific store on specified
* node.
*
* @param storeName Name of the store
* @param nodeId Id of the node to query from
* @param key for which to query
* @return List<Versioned<byte[]>> of values for the specified NodeKey.
*/
public List<Versioned<byte[]>> getNodeKey(String storeName, int nodeId, ByteArray key) {
SocketStore socketStore = adminStoreClient.getSocketStore(nodeId, storeName);
return socketStore.get(key, null);
}

// As needed, add 'getall', 'delete', and so on interfaces...
}

/**
* Encapsulates all steaming operations that actually read and write
* key-value pairs into the cluster
*
*/
public class StreamingStoreOperations {
public class StreamingOperations {

/**
* Update a stream of key/value entries at the given node. The iterator
Expand Down Expand Up @@ -2054,50 +2088,8 @@ public void updateEntries(int nodeId,
}
}

/**
* This method updates exactly one key/value for a specific store on a
* specific node.
*
* @param storeName Name of the store
* @param nodeKeyValue A specific key/value to update on a specific
* node.
* @return RepairEntryResult with success/exception details.
*/
public RepairEntryResult repairEntry(String storeName,
NodeValue<ByteArray, byte[]> nodeKeyValue) {
SocketStore socketStore = adminStoreClient.getSocketStore(nodeKeyValue.getNodeId(),
storeName);

try {
socketStore.put(nodeKeyValue.getKey(), nodeKeyValue.getVersioned(), null);
return new RepairEntryResult();
} catch(VoldemortException ve) {
return new RepairEntryResult(ve);
}
}

/**
* Fetch key/value tuple for given key on specified node
*
* @param storeName Name of the store
* @param nodeId Id of the node to query from
* @param key for which to query
* @return QueryKeyResult with key & value or key & exception, depending
* on result.
*/
public QueryKeyResult queryKey(String storeName, int nodeId, ByteArray key) {
SocketStore socketStore = adminStoreClient.getSocketStore(nodeId, storeName);

List<Versioned<byte[]>> value = null;
try {
value = socketStore.get(key, null);
return new QueryKeyResult(key, value);
} catch(VoldemortException ve) {
return new QueryKeyResult(key, ve);
}
}

// TODO: Use queryKey method (and so adminStoreClient too)?
// TODO: Use storeOperation.getNodeKey()? Or some other way of using
// adminStoreClient?
/**
* Fetch key/value tuples belonging to a node with given key values
*
Expand Down Expand Up @@ -2760,10 +2752,10 @@ public void run() {
partitionIdList,
null,
false);
currentAdminClient.storeOps.updateEntries(nodeId,
storeName,
iterator,
null);
currentAdminClient.streamingOps.updateEntries(nodeId,
storeName,
iterator,
null);

logger.info("Mirroring data for store:" + storeName + " from node "
+ nodeIdToMirrorFrom + " completed.");
Expand Down
4 changes: 2 additions & 2 deletions src/java/voldemort/client/protocol/admin/QueryKeyResult.java
Expand Up @@ -15,13 +15,13 @@ public class QueryKeyResult {
private final List<Versioned<byte[]>> values;
private final Exception exception;

QueryKeyResult(ByteArray key, List<Versioned<byte[]>> values) {
public QueryKeyResult(ByteArray key, List<Versioned<byte[]>> values) {
this.key = key;
this.values = values;
this.exception = null;
}

QueryKeyResult(ByteArray key, Exception exception) {
public QueryKeyResult(ByteArray key, Exception exception) {
this.key = key;
this.values = null;
this.exception = exception;
Expand Down
54 changes: 0 additions & 54 deletions src/java/voldemort/client/protocol/admin/RepairEntryResult.java

This file was deleted.

Expand Up @@ -44,7 +44,7 @@ public void run() throws VoldemortException {
while(!nodeIterator.done) {
try {
nodeIterator.reset();
adminClient.storeOps.updateEntries(nodeId, storeName, nodeIterator, null);
adminClient.streamingOps.updateEntries(nodeId, storeName, nodeIterator, null);
nodeIterator.purge();
} catch(VoldemortException e) {
if(e.getCause() instanceof IOException) {
Expand Down
Expand Up @@ -422,7 +422,7 @@ public void run() {
}
this.startTime = System.currentTimeMillis();
iterator = new SlopIterator(slopQueue, current);
adminClient.storeOps.updateSlopEntries(nodeId, iterator);
adminClient.streamingOps.updateSlopEntries(nodeId, iterator);
} while(!iterator.isComplete());

// Clear up both previous and current
Expand Down
2 changes: 2 additions & 0 deletions src/java/voldemort/store/routed/NodeValue.java
Expand Up @@ -35,6 +35,8 @@
*/
public final class NodeValue<K, V> implements Serializable, Cloneable {

// TODO: Rename NodeValue to NodeKeyValue

private static final long serialVersionUID = 1;

private final int nodeId;
Expand Down

0 comments on commit 02e8063

Please sign in to comment.