Skip to content

Commit

Permalink
Tools depend on Node 0 to be available
Browse files Browse the repository at this point in the history
Added a new override for getRemoteStoreDefList which takes no parameters
and identifies one of the nodes, for it to be used.

Fixed all the tools (not used in production code path but could be used
by SREs) code, which used NodeId 0 as default to use the new overload.

VoldemortMultiStoreBuildAndPushJob seems to have the issue, but not
sure, whether it is used in production, so just added the comment.
  • Loading branch information
arunthirupathi committed Aug 22, 2014
1 parent c07b777 commit 105915d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 58 deletions.
Expand Up @@ -108,6 +108,8 @@ public VoldemortMultiStoreBuildAndPushJob(String name, Props props) throws IOExc
super(name);
this.props = props;
this.log = Logger.getLogger(name);
// TODO: this is a bug, it needs to initialize with -1 as 0 can be an
// invalid node id. but not touching it, as this is product code.
this.nodeId = props.getInt("check.node", 0);

// Get the input directories
Expand Down
43 changes: 18 additions & 25 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -788,6 +788,16 @@ public static void main(String[] args) throws Exception {
}
}

private static List<StoreDefinition> getStoreDefinitions(AdminClient adminClient, int nodeId) {
Versioned<List<StoreDefinition>> storeDefs = null;
if(nodeId >= 0) {
storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId);
} else {
storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList();
}
return storeDefs.getValue();
}

private static void executeUpdateStoreDefinitions(Integer nodeId,
AdminClient adminClient,
List<StoreDefinition> storesList) {
Expand Down Expand Up @@ -1422,9 +1432,8 @@ private static void executeROMetadata(Integer nodeId,
if(storeNames == null) {
// Retrieve list of read-only stores
storeNames = Lists.newArrayList();
for(StoreDefinition storeDef: adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId > 0 ? nodeId
: 0)
.getValue()) {

for(StoreDefinition storeDef: getStoreDefinitions(adminClient, nodeId)) {
if(storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0) {
storeNames.add(storeDef.getName());
}
Expand Down Expand Up @@ -1588,8 +1597,7 @@ private static void executeFetchEntries(Integer nodeId,
boolean useAscii,
boolean fetchOrphaned) throws IOException {

List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId)
.getValue();
List<StoreDefinition> storeDefinitionList = getStoreDefinitions(adminClient, nodeId);
HashMap<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
Expand Down Expand Up @@ -1747,8 +1755,7 @@ private static void executeUpdateEntries(Integer nodeId,
AdminClient adminClient,
List<String> storeNames,
String inputDirPath) throws IOException {
List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId)
.getValue();
List<StoreDefinition> storeDefinitionList = getStoreDefinitions(adminClient, nodeId);
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
Expand Down Expand Up @@ -1834,8 +1841,7 @@ private static void executeFetchKeys(Integer nodeId,
List<String> storeNames,
boolean useAscii,
boolean fetchOrphaned) throws IOException {
List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId)
.getValue();
List<StoreDefinition> storeDefinitionList = getStoreDefinitions(adminClient, nodeId);
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
Expand Down Expand Up @@ -2005,8 +2011,7 @@ private static void executeDeletePartitions(Integer nodeId,
List<String> stores = storeNames;
if(stores == null) {
stores = Lists.newArrayList();
List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId)
.getValue();
List<StoreDefinition> storeDefinitionList = getStoreDefinitions(adminClient, nodeId);
for(StoreDefinition storeDefinition: storeDefinitionList) {
stores.add(storeDefinition.getName());
}
Expand All @@ -2024,17 +2029,6 @@ private static void executeQueryKey(final Integer nodeId,
List<String> storeNames,
String keyString,
String keyFormat) throws IOException {
// decide queryNode for storeDef
int storeDefNodeId;
if(nodeId < 0) {
Iterator<Node> nodeIterator = adminClient.getAdminClientCluster().getNodes().iterator();
if(!nodeIterator.hasNext()) {
throw new VoldemortException("No nodes in this cluster");
}
storeDefNodeId = nodeIterator.next().getId();
} else {
storeDefNodeId = nodeId;
}

// decide queryingNode(s) for Key
List<Integer> queryingNodes = new ArrayList<Integer>();
Expand All @@ -2047,8 +2041,7 @@ private static void executeQueryKey(final Integer nodeId,
}

// get basic info
List<StoreDefinition> storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(storeDefNodeId)
.getValue();
List<StoreDefinition> storeDefinitionList = getStoreDefinitions(adminClient, nodeId);
Map<String, StoreDefinition> storeDefinitions = new HashMap<String, StoreDefinition>();
for(StoreDefinition storeDef: storeDefinitionList) {
storeDefinitions.put(storeDef.getName(), storeDef);
Expand Down Expand Up @@ -2240,7 +2233,7 @@ private static void executeShowRoutingPlan(AdminClient adminClient,
List<String> keyList) throws DecoderException {

Cluster cluster = adminClient.getAdminClientCluster();
List<StoreDefinition> storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList(0)
List<StoreDefinition> storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList()
.getValue();
StoreDefinition storeDef = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs,
storeName);
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/VoldemortAvroClientShell.java
Expand Up @@ -105,7 +105,7 @@ private static Pair<Schema, Schema> getLatestKeyValueSchema(String url, String s
AdminClient adminClient = null;
try {
adminClient = new AdminClient(url, new AdminClientConfig(), new ClientConfig());
List<StoreDefinition> storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList(0)
List<StoreDefinition> storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList()
.getValue();

for(StoreDefinition storeDef: storeDefs) {
Expand Down
8 changes: 8 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1350,6 +1350,14 @@ public Versioned<List<StoreDefinition>> getRemoteStoreDefList(int nodeId)
false);
return new Versioned<List<StoreDefinition>>(storeList, value.getVersion());
}

public Versioned<List<StoreDefinition>> getRemoteStoreDefList() throws VoldemortException {
Integer nodeId = AdminClient.this.getAdminClientCluster()
.getNodeIds()
.iterator()
.next();
return getRemoteStoreDefList(nodeId);
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/tools/KeySamplerCLI.java
Expand Up @@ -95,7 +95,7 @@ public KeySamplerCLI(String url,
}
this.adminClient = new AdminClient(url, new AdminClientConfig(), new ClientConfig());
this.cluster = adminClient.getAdminClientCluster();
this.storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0).getValue();
this.storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList().getValue();
this.storeNameSet = new HashSet<String>();
for(StoreDefinition storeDefinition: storeDefinitions) {
String storeName = storeDefinition.getName();
Expand Down
4 changes: 2 additions & 2 deletions src/java/voldemort/tools/admin/command/AdminCommandDebug.java
Expand Up @@ -252,7 +252,7 @@ public static void doDebugQueryKeys(AdminClient adminClient,
// decide queryNode for storeDef
Integer storeDefNodeId = nodeIds.get(0);
Map<String, StoreDefinition> storeDefinitions = AdminToolUtils.getUserStoreDefMapOnNode(adminClient,
storeDefNodeId);
storeDefNodeId);

BufferedWriter out = new BufferedWriter(new OutputStreamWriter(System.out));

Expand Down Expand Up @@ -573,7 +573,7 @@ public static void doDebugRoute(AdminClient adminClient,
List<String> keyStrings,
String keyType) throws DecoderException {
Cluster cluster = adminClient.getAdminClientCluster();
List<StoreDefinition> storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList(0)
List<StoreDefinition> storeDefs = adminClient.metadataMgmtOps.getRemoteStoreDefList()
.getValue();
StoreDefinition storeDef = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs,
storeName);
Expand Down
53 changes: 25 additions & 28 deletions src/java/voldemort/utils/ClusterForkLiftTool.java
Expand Up @@ -46,17 +46,17 @@
* When used in conjunction with a client that "double writes" to both the
* clusters, this can be a used as a feasible store migration tool to move an
* existing store to a new cluster.
*
*
* There are two modes around how the divergent versions of a key are
* consolidated from the source cluster. :
*
*
* 1) Primary only Resolution (
* {@link ClusterForkLiftTool#SinglePartitionForkLiftTask}: The entries on the
* primary partition are moved over to the destination cluster with empty vector
* clocks. if any key has multiple versions on the primary, they are resolved.
* This approach is fast and is best suited if you deem the replicas being very
* much in sync with each other. This is the DEFAULT mode
*
*
* 2) Global Resolution (
* {@link ClusterForkLiftTool#SinglePartitionGloballyResolvingForkLiftTask} :
* The keys belonging to a partition are fetched out of the primary replica, and
Expand All @@ -67,48 +67,48 @@
* potentially cross colo) and hence should be used when thorough version
* resolution is neccessary or the admin deems the replicas being fairly
* out-of-sync
*
*
*
*
* In both mode, the default chained resolver (
* {@link VectorClockInconsistencyResolver} +
* {@link TimeBasedInconsistencyResolver} is used to determine a final resolved
* version.
*
*
* NOTES:
*
*
* 1) If the tool fails for some reason in the middle, the admin can restart the
* tool for the failed partitions alone. The keys that were already written in
* the failed partitions, will all experience {@link ObsoleteVersionException}
* and the un-inserted keys will be inserted.
*
*
* 2) Since the forklift writes are issued with empty vector clocks, they will
* always yield to online writes happening on the same key, before or during the
* forklift window. Of course, after the forklift window, the destination
* cluster resumes normal operation.
*
*
* 3) For now, we will fallback to fetching the key from the primary replica,
* fetch the values out manually, resolve and write it back. PitFalls : primary
* somehow does not have the key.
*
*
* Two scenarios.
*
*
* 1) Key active after double writes: the situation is the result of slop not
* propagating to the primary. But double writes would write the key back to
* destination cluster anyway. We are good.
*
*
* 2) Key inactive after double writes: This indicates a problem elsewhere. This
* is a base guarantee voldemort should offer.
*
*
* 4) Zoned <-> Non Zoned forklift implications.
*
*
* When forklifting data from a non-zoned to zoned cluster, both destination
* zones will be populated with data, by simply running the tool once with the
* respective bootstrap urls. If you need to forklift data from zoned to
* non-zoned clusters (i.e your replication between datacenters is not handled
* by Voldemort), then you need to run the tool twice for each destination
* non-zoned cluster. Zoned -> Zoned and Non-Zoned -> Non-Zoned forklifts are
* trivial.
*
*
*/
public class ClusterForkLiftTool implements Runnable {

Expand Down Expand Up @@ -146,9 +146,7 @@ enum ForkLiftTaskMode {
private final Boolean overwrite;

private static List<StoreDefinition> getStoreDefinitions(AdminClient adminClient) {
Cluster cluster = adminClient.getAdminClientCluster();
Integer nodeId = cluster.getNodeIds().iterator().next();
return adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId).getValue();
return adminClient.metadataMgmtOps.getRemoteStoreDefList().getValue();
}

public ClusterForkLiftTool(String srcBootstrapUrl,
Expand Down Expand Up @@ -178,8 +176,7 @@ public ClusterForkLiftTool(String srcBootstrapUrl,
if(storesList != null) {
this.storesList = storesList;
} else {
this.storesList = StoreUtils.getStoreNames(getStoreDefinitions(srcAdminClient),
true);
this.storesList = StoreUtils.getStoreNames(getStoreDefinitions(srcAdminClient), true);
}
this.srcStoreDefMap = checkStoresOnBothSides();

Expand Down Expand Up @@ -235,7 +232,7 @@ private HashMap<String, StoreDefinition> checkStoresOnBothSides() {
* TODO this base class can potentially provide some framework of execution
* for the subclasses, to yield a better objected oriented design (progress
* tracking etc)
*
*
*/
abstract class SinglePartitionForkLiftTask {

Expand Down Expand Up @@ -282,7 +279,7 @@ void printSummary() {
* Fetches keys belonging the primary partition, and then fetches values for
* that key from all replicas in a non-streaming fashion, applies the
* default resolver and writes it back to the destination cluster
*
*
* TODO a streaming N way merge is the more efficient & correct solution.
* Without this, the resolving can be very slow due to cross data center
* get(..)
Expand Down Expand Up @@ -335,7 +332,8 @@ public void run() {
+ ByteUtils.toHexString(keyToResolve.get())
+ " vals:" + resolvedVersions);
}
Versioned<byte[]> value = new Versioned<byte[]>(resolvedVersions.get(0).getValue());
Versioned<byte[]> value = new Versioned<byte[]>(resolvedVersions.get(0)
.getValue());
streamingPut(keyToResolve, value);
}
printSummary();
Expand All @@ -348,7 +346,7 @@ public void run() {
}

/**
*
*
* @param nodeIdList
* @param keyInBytes
* @return
Expand Down Expand Up @@ -378,7 +376,7 @@ private Map<Integer, QueryKeyResult> doReads(final List<Integer> nodeIdList,
* Simply fetches the data for the partition from the primary replica and
* writes it into the destination cluster. Works well when the replicas are
* fairly consistent.
*
*
*/
class SinglePartitionPrimaryResolvingForkLiftTask extends SinglePartitionForkLiftTask implements
Runnable {
Expand Down Expand Up @@ -455,7 +453,7 @@ public void run() {
* Simply fetches the data for the partition from the primary replica and
* writes it into the destination cluster without resolving any of the
* conflicting values
*
*
*/
class SinglePartitionNoResolutionForkLiftTask extends SinglePartitionForkLiftTask implements
Runnable {
Expand Down Expand Up @@ -568,7 +566,7 @@ public Object call() throws Exception {

/**
* Return args parser
*
*
* @return program parser
* */
private static OptionParser getParser() {
Expand Down Expand Up @@ -617,7 +615,6 @@ private static OptionParser getParser() {
+ ForkLiftTaskMode.primary_resolution.toString()
+ " Fetch from primary alone ]");


parser.accepts(OVERWRITE_OPTION, OVERWRITE_WARNING_MESSAGE)
.withOptionalArg()
.describedAs("overwriteExistingValue")
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -78,7 +78,7 @@ public class ConsistencyFix {
Cluster cluster = adminClient.getAdminClientCluster();
logger.info("Cluster determined to be: " + cluster.getName());

Versioned<List<StoreDefinition>> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0);
Versioned<List<StoreDefinition>> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList();
List<StoreDefinition> storeDefs = storeDefinitions.getValue();
StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs,
storeName);
Expand Down

0 comments on commit 105915d

Please sign in to comment.