From 7234b65bba50c2604f39c69125935ab7836fd645 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 9 Apr 2013 18:05:00 -0700 Subject: [PATCH] Tool to forklift data over for store migrations --- .../protocol/admin/StreamingClient.java | 11 +- src/java/voldemort/store/StoreUtils.java | 10 + .../utils/AbstractConsistencyFixer.java | 337 ++++++++++ .../voldemort/utils/ClusterForkLiftTool.java | 574 ++++++++++++++++++ .../voldemort/utils/ConsistencyFixWorker.java | 271 +-------- 5 files changed, 935 insertions(+), 268 deletions(-) create mode 100644 src/java/voldemort/utils/AbstractConsistencyFixer.java create mode 100644 src/java/voldemort/utils/ClusterForkLiftTool.java diff --git a/src/java/voldemort/client/protocol/admin/StreamingClient.java b/src/java/voldemort/client/protocol/admin/StreamingClient.java index 4de4dda835..50430d2c40 100644 --- a/src/java/voldemort/client/protocol/admin/StreamingClient.java +++ b/src/java/voldemort/client/protocol/admin/StreamingClient.java @@ -105,8 +105,8 @@ public class StreamingClient { protected EventThrottler throttler; - AdminClient adminClient; - AdminClientConfig adminClientConfig; + private AdminClient adminClient; + private AdminClientConfig adminClientConfig; String bootstrapURL; @@ -135,7 +135,12 @@ public StreamingClient(StreamingClientConfig config) { this.bootstrapURL = config.getBootstrapURL(); CHECKPOINT_COMMIT_SIZE = config.getBatchSize(); THROTTLE_QPS = config.getThrottleQPS(); + adminClientConfig = new AdminClientConfig(); + adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig()); + } + public AdminClient getAdminClient() { + return adminClient; } public synchronized void updateThrottleLimit(int throttleQPS) { @@ -297,8 +302,6 @@ public synchronized void initStreamingSessions(List stores, List blackListedNodes) { logger.info("Initializing a streaming session"); - adminClientConfig = new AdminClientConfig(); - adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig()); this.checkpointCallback = checkpointCallback; this.recoveryCallback = recoveryCallback; this.allowMerge = allowMerge; diff --git a/src/java/voldemort/store/StoreUtils.java b/src/java/voldemort/store/StoreUtils.java index d4122bad1f..7cb2cdb6bf 100644 --- a/src/java/voldemort/store/StoreUtils.java +++ b/src/java/voldemort/store/StoreUtils.java @@ -215,4 +215,14 @@ public static List getStoreNames(List list, boolean ign storeNameSet.add(def.getName()); return storeNameSet; } + + public static HashMap getStoreDefsAsMap(List storeDefs) { + if(storeDefs == null) + return null; + + HashMap storeDefMap = new HashMap(); + for(StoreDefinition def: storeDefs) + storeDefMap.put(def.getName(), def); + return storeDefMap; + } } diff --git a/src/java/voldemort/utils/AbstractConsistencyFixer.java b/src/java/voldemort/utils/AbstractConsistencyFixer.java new file mode 100644 index 0000000000..b14da40e3a --- /dev/null +++ b/src/java/voldemort/utils/AbstractConsistencyFixer.java @@ -0,0 +1,337 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.QueryKeyResult; +import voldemort.store.routed.NodeValue; +import voldemort.store.routed.ReadRepairer; +import voldemort.utils.ConsistencyFix.BadKey; +import voldemort.utils.ConsistencyFix.Status; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; + +abstract class AbstractConsistencyFixer { + + private static final Logger logger = Logger.getLogger(AbstractConsistencyFixer.class); + private static final int fakeNodeID = Integer.MIN_VALUE; + + protected final BadKey badKey; + protected final StoreInstance storeInstance; + protected final AdminClient adminClient; + protected final QueryKeyResult orphanedValues; + + /** + * Normal use case constructor. + * + * @param keyInHexFormat + * @param consistencyFix + * @param badKeyQOut + */ + AbstractConsistencyFixer(BadKey badKey, StoreInstance storeInstance, AdminClient adminClient) { + this(badKey, storeInstance, adminClient, null); + } + + /** + * Constructor for "orphaned values" use case. I.e., there are values for + * the specific key that exist somewhere and may need to be written to the + * nodes which actually host the key. + * + * @param keyInHexFormat + * @param consistencyFix + * @param badKeyQOut + * @param orphanedValues Set to null if no orphaned values to be included. + */ + AbstractConsistencyFixer(BadKey badKey, + StoreInstance storeInstance, + AdminClient adminClient, + QueryKeyResult orphanedValues) { + this.badKey = badKey; + this.storeInstance = storeInstance; + this.adminClient = adminClient; + this.orphanedValues = orphanedValues; + } + + public Status doConsistencyFix() { + // Initialization. + byte[] keyInBytes; + List nodeIdList = null; + int masterPartitionId = -1; + try { + keyInBytes = ByteUtils.fromHexString(badKey.getKeyInHexFormat()); + masterPartitionId = this.storeInstance.getMasterPartitionId(keyInBytes); + nodeIdList = this.storeInstance.getReplicationNodeList(masterPartitionId); + } catch(Exception exception) { + logger.info("Aborting fixKey due to bad init."); + if(logger.isDebugEnabled()) { + exception.printStackTrace(); + } + return Status.BAD_INIT; + } + ByteArray keyAsByteArray = new ByteArray(keyInBytes); + + // Do the reads + Map nodeIdToKeyValues = doReads(nodeIdList, + keyInBytes, + badKey.getKeyInHexFormat()); + + // Process read replies (i.e., nodeIdToKeyValues) + ProcessReadRepliesResult result = processReadReplies(nodeIdList, + keyAsByteArray, + badKey.getKeyInHexFormat(), + nodeIdToKeyValues); + if(result.status != Status.SUCCESS) { + return result.status; + } + + // Resolve conflicts indicated in nodeValues + List> toReadRepair = resolveReadConflicts(result.nodeValues); + if(logger.isTraceEnabled()) { + if(toReadRepair.size() == 0) { + logger.trace("Nothing to repair"); + } + for(NodeValue nodeValue: toReadRepair) { + logger.trace(nodeValue.getNodeId() + " --- " + nodeValue.getKey().toString()); + } + } + + // Do the repairs + Status status = doRepairPut(toReadRepair); + + // return status of last operation (success or otherwise) + return status; + } + + /** + * + * @param nodeIdList + * @param keyInBytes + * @param keyInHexFormat + * @return + */ + private Map doReads(final List nodeIdList, + final byte[] keyInBytes, + final String keyInHexFormat) { + Map nodeIdToKeyValues = new HashMap(); + + ByteArray key = new ByteArray(keyInBytes); + for(int nodeId: nodeIdList) { + List> values = null; + try { + values = this.adminClient.storeOps.getNodeKey(this.storeInstance.getStoreDefinition() + .getName(), + nodeId, + key); + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); + } catch(VoldemortException ve) { + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); + } + } + + return nodeIdToKeyValues; + } + + /** + * Result of an invocation of processReadReplies + */ + private class ProcessReadRepliesResult { + + public final Status status; + public final List> nodeValues; + + /** + * Constructor for error status + */ + ProcessReadRepliesResult(Status status) { + this.status = status; + this.nodeValues = null; + } + + /** + * Constructor for success + */ + ProcessReadRepliesResult(List> nodeValues) { + this.status = Status.SUCCESS; + this.nodeValues = nodeValues; + } + } + + /** + * + * @param nodeIdList + * @param keyAsByteArray + * @param keyInHexFormat + * @param nodeIdToKeyValues + * @param nodeValues Effectively the output of this method. Must pass in a + * non-null object to be populated by this method. + * @return + */ + private ProcessReadRepliesResult processReadReplies(final List nodeIdList, + final ByteArray keyAsByteArray, + final String keyInHexFormat, + final Map nodeIdToKeyValues) { + List> nodeValues = new ArrayList>(); + boolean exceptionsEncountered = false; + for(int nodeId: nodeIdList) { + QueryKeyResult keyValue; + if(nodeIdToKeyValues.containsKey(nodeId)) { + keyValue = nodeIdToKeyValues.get(nodeId); + + if(keyValue.hasException()) { + logger.debug("Exception encountered while fetching key " + keyInHexFormat + + " from node with nodeId " + nodeId + " : " + + keyValue.getException().getMessage()); + exceptionsEncountered = true; + } else { + if(keyValue.getValues().isEmpty()) { + Versioned versioned = new Versioned(null); + nodeValues.add(new NodeValue(nodeId, + keyValue.getKey(), + versioned)); + + } else { + for(Versioned value: keyValue.getValues()) { + nodeValues.add(new NodeValue(nodeId, + keyValue.getKey(), + value)); + } + } + } + } else { + logger.debug("No key-value returned from node with id:" + nodeId); + Versioned versioned = new Versioned(null); + nodeValues.add(new NodeValue(nodeId, keyAsByteArray, versioned)); + } + } + if(exceptionsEncountered) { + logger.info("Aborting fixKey because exceptions were encountered when fetching key-values."); + return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION); + } + + if(logger.isDebugEnabled()) { + for(NodeValue nkv: nodeValues) { + logger.debug("\tRead NodeKeyValue : " + ByteUtils.toHexString(nkv.getKey().get()) + + " on node with id " + nkv.getNodeId() + " for version " + + nkv.getVersion()); + } + } + + return new ProcessReadRepliesResult(nodeValues); + } + + /** + * Decide on the specific key-value to write everywhere. + * + * @param nodeValues + * @return The subset of entries from nodeValues that need to be repaired. + */ + private List> resolveReadConflicts(final List> nodeValues) { + + if(logger.isTraceEnabled()) { + logger.trace("NodeValues passed into resolveReadConflicts."); + if(nodeValues.size() == 0) { + logger.trace("Empty nodeValues passed to resolveReadConflicts"); + } + for(NodeValue nodeValue: nodeValues) { + logger.trace("\t" + nodeValue.getNodeId() + " - " + nodeValue.getKey().toString() + + " - " + nodeValue.getVersion().toString()); + } + } + + // If orphaned values exist, add them to fake nodes to be processed by + // "getRepairs" + int currentFakeNodeId = fakeNodeID; + if(this.orphanedValues != null) { + for(Versioned value: this.orphanedValues.getValues()) { + nodeValues.add(new NodeValue(currentFakeNodeId, + this.orphanedValues.getKey(), + value)); + currentFakeNodeId++; + } + } + + // Some cut-paste-and-modify coding from + // store/routed/action/AbstractReadRepair.java and + // store/routed/ThreadPoolRoutedStore.java + ReadRepairer readRepairer = new ReadRepairer(); + List> nodeKeyValues = readRepairer.getRepairs(nodeValues); + + if(logger.isTraceEnabled()) { + if(nodeKeyValues.size() == 0) { + logger.trace("\treadRepairer returned an empty list."); + } + for(NodeValue nodeKeyValue: nodeKeyValues) { + logger.trace("\tNodeKeyValue result from readRepairer.getRepairs : " + + ByteUtils.toHexString(nodeKeyValue.getKey().get()) + + " on node with id " + nodeKeyValue.getNodeId() + " for version " + + nodeKeyValue.getVersion()); + } + } + + List> toReadRepair = Lists.newArrayList(); + for(NodeValue v: nodeKeyValues) { + if(v.getNodeId() > currentFakeNodeId) { + // Only copy repairs intended for real nodes. + Versioned versioned = Versioned.value(v.getVersioned().getValue(), + ((VectorClock) v.getVersion()).clone()); + toReadRepair.add(new NodeValue(v.getNodeId(), + v.getKey(), + versioned)); + } else { + if(logger.isDebugEnabled()) { + logger.debug("\tIgnoring repair to fake node: " + + ByteUtils.toHexString(v.getKey().get()) + " on node with id " + + v.getNodeId() + " for version " + v.getVersion()); + } + } + + } + + if(logger.isTraceEnabled()) { + if(toReadRepair.size() == 0) { + logger.trace("\ttoReadRepair is empty."); + } + for(NodeValue nodeKeyValue: toReadRepair) { + logger.trace("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get()) + + " on node with id " + nodeKeyValue.getNodeId() + " for version " + + nodeKeyValue.getVersion()); + + } + } + return toReadRepair; + } + + /** + * Override this method to place whatever logic you have to handle the + * resolved value + * + * @param toReadRepair Effectively the output of this method. Must pass in a + * non-null object to be populated by this method. + * @return + */ + public abstract Status doRepairPut(final List> toReadRepair); +} \ No newline at end of file diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java new file mode 100644 index 0000000000..827a859e41 --- /dev/null +++ b/src/java/voldemort/utils/ClusterForkLiftTool.java @@ -0,0 +1,574 @@ +package voldemort.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.client.ClientConfig; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.client.protocol.admin.QueryKeyResult; +import voldemort.client.protocol.admin.StreamingClient; +import voldemort.client.protocol.admin.StreamingClientConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.store.StoreDefinition; +import voldemort.store.StoreUtils; +import voldemort.versioning.ChainedResolver; +import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.TimeBasedInconsistencyResolver; +import voldemort.versioning.VectorClock; +import voldemort.versioning.VectorClockInconsistencyResolver; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; + +/** + * Tool to fork lift data over from a source cluster to a destination cluster. + * When used in conjunction with a client that "double writes" to both the + * clusters, this can be a used as a feasible store migration tool to move an + * existing store to a new cluster. + * + * There are two modes around how the divergent versions of a key are + * consolidated from the source cluster. : + * + * 1) Primary only Resolution ({@link SinglePartitionForkLiftTask}: The entries + * on the primary partition are moved over to the destination cluster with empty + * vector clocks. if any key has multiple versions on the primary, they are + * resolved. This approach is fast and is best suited if you deem the replicas + * being very much in sync with each other. This is the DEFAULT mode + * + * 2) Global Resolution ({@link SinglePartitionGloballyResolvingForkLiftTask} : + * The keys belonging to a partition are fetched out of the primary replica, and + * for each such key, the corresponding values are obtained from all other + * replicas, using get(..) operations. These versions are then resolved and + * written back to the destination cluster as before. This approach is slow + * since it involves several roundtrips to the server for each key (some + * potentially cross colo) and hence should be used when thorough version + * resolution is neccessary or the admin deems the replicas being fairly + * out-of-sync + * + * + * In both mode, the default chained resolver ( + * {@link VectorClockInconsistencyResolver} + + * {@link TimeBasedInconsistencyResolver} is used to determine a final resolved + * version. + * + * NOTES: + * + * 1) If the tool fails for some reason in the middle, the admin can restart the + * tool for the failed partitions alone. The keys that were already written in + * the failed partitions, will all experience {@link ObsoleteVersionException} + * and the un-inserted keys will be inserted. + * + * 2) Since the forklift writes are issued with empty vector clocks, they will + * always yield to online writes happening on the same key, before or during the + * forklift window. Of course, after the forklift window, the destination + * cluster resumes normal operation. + * + */ +public class ClusterForkLiftTool implements Runnable { + + private static Logger logger = Logger.getLogger(ClusterForkLiftTool.class); + private static final int DEFAULT_MAX_PUTS_PER_SEC = 500; + private static final int DEFAULT_PROGRESS_PERIOD_OPS = 100000; + private static final int DEFAULT_PARTITION_PARALLELISM = 8; + private static final int DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS = 5; + + private final AdminClient srcAdminClient; + private final StreamingClient dstStreamingClient; + private final List storesList; + private final ExecutorService workerPool; + private final int progressOps; + private final HashMap srcStoreDefMap; + private final List partitionList; + private final boolean globalResolution; + + public ClusterForkLiftTool(String srcBootstrapUrl, + String dstBootstrapUrl, + int maxPutsPerSecond, + int partitionParallelism, + int progressOps, + List storesList, + List partitions, + boolean globalResolution) { + // set up AdminClient on source cluster + this.srcAdminClient = new AdminClient(srcBootstrapUrl, + new AdminClientConfig(), + new ClientConfig()); + + // set up streaming client to the destination cluster + Props property = new Props(); + property.put("streaming.platform.bootstrapURL", dstBootstrapUrl); + property.put("streaming.platform.throttle.qps", maxPutsPerSecond); + StreamingClientConfig config = new StreamingClientConfig(property); + this.dstStreamingClient = new StreamingClient(config); + + // determine and verify final list of stores to be forklifted over + if(storesList != null) { + this.storesList = storesList; + } else { + this.storesList = StoreUtils.getStoreNames(srcAdminClient.metadataMgmtOps.getRemoteStoreDefList(0) + .getValue(), + true); + } + this.srcStoreDefMap = checkStoresOnBothSides(); + + // determine the partitions to be fetched + if(partitions != null) { + this.partitionList = partitions; + } else { + this.partitionList = new ArrayList(srcAdminClient.getAdminClientCluster() + .getNumberOfPartitions()); + for(Node node: srcAdminClient.getAdminClientCluster().getNodes()) + this.partitionList.addAll(node.getPartitionIds()); + if(this.partitionList.size() > srcAdminClient.getAdminClientCluster() + .getNumberOfPartitions()) { + throw new VoldemortException("Incorrect partition mapping in source cluster"); + } + } + + // set up thread pool to parallely forklift partitions + this.workerPool = Executors.newFixedThreadPool(partitionParallelism); + this.progressOps = progressOps; + this.globalResolution = globalResolution; + } + + private HashMap checkStoresOnBothSides() { + List srcStoreDefs = srcAdminClient.metadataMgmtOps.getRemoteStoreDefList(0) + .getValue(); + HashMap srcStoreDefMap = StoreUtils.getStoreDefsAsMap(srcStoreDefs); + List dstStoreDefs = dstStreamingClient.getAdminClient().metadataMgmtOps.getRemoteStoreDefList(0) + .getValue(); + HashMap dstStoreDefMap = StoreUtils.getStoreDefsAsMap(dstStoreDefs); + + Set storesToSkip = new HashSet(); + for(String store: storesList) { + if(!srcStoreDefMap.containsKey(store)) { + logger.warn("Store " + store + " does not exist in source cluster "); + storesToSkip.add(store); + } + if(!dstStoreDefMap.containsKey(store)) { + logger.warn("Store " + store + " does not exist in destination cluster "); + storesToSkip.add(store); + } + } + logger.warn("List of stores that will be skipped :" + storesToSkip); + storesList.removeAll(storesToSkip); + return srcStoreDefMap; + } + + abstract class SinglePartitionForkLiftTask { + + protected int partitionId; + protected CountDownLatch latch; + protected StoreInstance storeInstance; + protected String workName; + + SinglePartitionForkLiftTask(StoreInstance storeInstance, + int partitionId, + CountDownLatch latch) { + this.partitionId = partitionId; + this.latch = latch; + this.storeInstance = storeInstance; + workName = "[Store: " + storeInstance.getStoreDefinition().getName() + ", Partition: " + + this.partitionId + "] "; + } + } + + /** + * Fetches keys belonging the primary partition, and then fetches values for + * that key from all replicas in a non-streaming fashion, applies the + * default resolver and writes it back to the destination cluster + * + * TODO a streaming N way merge is the more efficient & correct solution. + * Without this, the resolving can be very slow due to cross data center + * get(..) + */ + class SinglePartitionGloballyResolvingForkLiftTask extends SinglePartitionForkLiftTask + implements Runnable { + + SinglePartitionGloballyResolvingForkLiftTask(StoreInstance storeInstance, + int partitionId, + CountDownLatch latch) { + super(storeInstance, partitionId, latch); + } + + /** + * For now, we will fallback to fetching the key from the primary + * replica, fetch the values out manually, resolve and write it back. + * PitFalls : primary somehow does not have the key. + * + * Two scenarios. + * + * 1) Key active after double writes: the situation is the result of + * slop not propagating to the primary. But double writes would write + * the key back to destination cluster anyway. We are good. + * + * 2) Key inactive after double writes: This indicates a problem + * elsewhere. This is a base guarantee voldemort should offer. + * + */ + public void run() { + String storeName = this.storeInstance.getStoreDefinition().getName(); + long entriesForkLifted = 0; + try { + logger.info(workName + "Starting processing"); + ChainedResolver> resolver = new ChainedResolver>(new VectorClockInconsistencyResolver(), + new TimeBasedInconsistencyResolver()); + Iterator keyItr = srcAdminClient.bulkFetchOps.fetchKeys(storeInstance.getNodeIdForPartitionId(this.partitionId), + storeName, + Lists.newArrayList(this.partitionId), + null, + true); + List nodeList = storeInstance.getReplicationNodeList(this.partitionId); + while(keyItr.hasNext()) { + ByteArray keyToResolve = keyItr.next(); + Map valuesMap = doReads(nodeList, keyToResolve.get()); + List> values = new ArrayList>(valuesMap.size()); + for(Map.Entry entry: valuesMap.entrySet()) { + int nodeId = entry.getKey(); + QueryKeyResult result = entry.getValue(); + + if(result.hasException()) { + logger.error(workName + "key fetch failed for key " + + ByteUtils.toHexString(keyToResolve.get()) + + " on node " + nodeId, + result.getException()); + break; + } + values.addAll(result.getValues()); + } + + List> resolvedVersions = resolver.resolveConflicts(values); + // after timestamp based resolving there should be only one + // version. Insert that to the destination cluster with + // empty vector clock + if(resolvedVersions.size() > 1) { + throw new VoldemortException("More than one resolved versions, key: " + + ByteUtils.toHexString(keyToResolve.get()) + + " vals:" + resolvedVersions); + } + dstStreamingClient.streamingPut(keyToResolve, + new Versioned(resolvedVersions.get(0) + .getValue())); + + entriesForkLifted++; + if(entriesForkLifted % progressOps == 0) { + logger.info(workName + " fork lifted " + entriesForkLifted + + " entries successfully"); + } + } + logger.info(workName + "Completed processing " + entriesForkLifted + " records"); + } catch(Exception e) { + // all work should stop if we get here + logger.error(workName + "Error forklifting data ", e); + } finally { + latch.countDown(); + } + } + + /** + * + * @param nodeIdList + * @param keyInBytes + * @return + */ + private Map doReads(final List nodeIdList, + final byte[] keyInBytes) { + Map nodeIdToKeyValues = new HashMap(); + + ByteArray key = new ByteArray(keyInBytes); + for(int nodeId: nodeIdList) { + List> values = null; + try { + values = srcAdminClient.storeOps.getNodeKey(storeInstance.getStoreDefinition() + .getName(), + nodeId, + key); + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); + } catch(VoldemortException ve) { + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); + } + } + return nodeIdToKeyValues; + } + } + + /** + * Simply fetches the data for the partition from the primary replica and + * writes it into the destination cluster. Works well when the replicas are + * fairly consistent. + * + */ + class SinglePartitionPrimaryResolvingForkLiftTask extends SinglePartitionForkLiftTask implements + Runnable { + + SinglePartitionPrimaryResolvingForkLiftTask(StoreInstance storeInstance, + int partitionId, + CountDownLatch latch) { + super(storeInstance, partitionId, latch); + } + + @Override + public void run() { + String storeName = this.storeInstance.getStoreDefinition().getName(); + long entriesForkLifted = 0; + ChainedResolver> resolver = new ChainedResolver>(new VectorClockInconsistencyResolver(), + new TimeBasedInconsistencyResolver()); + try { + logger.info(workName + "Starting processing"); + Iterator>> entryItr = srcAdminClient.bulkFetchOps.fetchEntries(storeInstance.getNodeIdForPartitionId(this.partitionId), + storeName, + Lists.newArrayList(this.partitionId), + null, + true); + ByteArray prevKey = null; + List> vals = new ArrayList>(); + + while(entryItr.hasNext()) { + Pair> record = entryItr.next(); + ByteArray key = record.getFirst(); + Versioned versioned = record.getSecond(); + + if(prevKey != null && !prevKey.equals(key)) { + // resolve and write, if you see a new key + List> resolvedVersions = resolver.resolveConflicts(vals); + if(resolvedVersions.size() > 1) { + throw new VoldemortException("More than one resolved versions, key: " + + ByteUtils.toHexString(prevKey.get()) + + " vals:" + resolvedVersions); + } + Versioned resolvedVersioned = resolvedVersions.get(0); + // an empty vector clock will ensure, online traffic + // will always win over the forklift writes + Versioned newEntry = new Versioned(resolvedVersioned.getValue(), + new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp())); + + dstStreamingClient.streamingPut(prevKey, newEntry); + entriesForkLifted++; + if(entriesForkLifted % progressOps == 0) { + logger.info(workName + " fork lifted " + entriesForkLifted + + " entries successfully"); + } + vals = new ArrayList>(); + } + vals.add(versioned); + prevKey = key; + } + + // process the last record + if(vals.size() > 0) { + List> resolvedVals = resolver.resolveConflicts(vals); + assert resolvedVals.size() == 1; + Versioned resolvedVersioned = resolvedVals.get(0); + Versioned newEntry = new Versioned(resolvedVersioned.getValue(), + new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp())); + dstStreamingClient.streamingPut(prevKey, newEntry); + entriesForkLifted++; + } + + logger.info(workName + "Completed processing " + entriesForkLifted + " records"); + } catch(Exception e) { + // if for some reason this partition fails, we will have retry + // again for those partitions alone. + logger.error(workName + "Error forklifting data ", e); + } finally { + latch.countDown(); + } + } + } + + @Override + public void run() { + final Cluster srcCluster = srcAdminClient.getAdminClientCluster(); + try { + // process stores one-by-one + for(String store: storesList) { + logger.info("Processing store " + store); + dstStreamingClient.initStreamingSession(store, new Callable() { + + @Override + public Object call() throws Exception { + + return null; + } + }, new Callable() { + + @Override + public Object call() throws Exception { + + return null; + } + }, true); + + final CountDownLatch latch = new CountDownLatch(srcCluster.getNumberOfPartitions()); + StoreInstance storeInstance = new StoreInstance(srcCluster, + srcStoreDefMap.get(store)); + + // submit work on every partition that is to be forklifted + for(Integer partitionId: partitionList) { + if(this.globalResolution) { + // do thorough global resolution across replicas + SinglePartitionGloballyResolvingForkLiftTask work = new SinglePartitionGloballyResolvingForkLiftTask(storeInstance, + partitionId, + latch); + workerPool.submit(work); + } else { + // do the less cleaner, but much faster route + SinglePartitionPrimaryResolvingForkLiftTask work = new SinglePartitionPrimaryResolvingForkLiftTask(storeInstance, + partitionId, + latch); + workerPool.submit(work); + } + } + + // wait till all the partitions are processed + latch.await(); + dstStreamingClient.closeStreamingSession(); + logger.info("Finished processing store " + store); + } + } catch(Exception e) { + logger.error("Exception running forklift tool", e); + } finally { + workerPool.shutdown(); + try { + workerPool.awaitTermination(DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS, + TimeUnit.MINUTES); + } catch(InterruptedException ie) { + logger.error("InterruptedException while waiting for worker pool to shutdown", ie); + } + srcAdminClient.close(); + dstStreamingClient.getAdminClient().close(); + // TODO cleanly shut down the threadpool + System.exit(0); + } + } + + /** + * Return args parser + * + * @return program parser + * */ + private static OptionParser getParser() { + OptionParser parser = new OptionParser(); + parser.accepts("help", "print help information"); + parser.accepts("src-url", "[REQUIRED] bootstrap URL of source cluster") + .withRequiredArg() + .describedAs("source-bootstrap-url") + .ofType(String.class); + parser.accepts("dst-url", "[REQUIRED] bootstrap URL of destination cluster") + .withRequiredArg() + .describedAs("destination-bootstrap-url") + .ofType(String.class); + parser.accepts("stores", + "Store names to forklift. Comma delimited list or singleton. [Default: ALL SOURCE STORES]") + .withRequiredArg() + .describedAs("stores") + .withValuesSeparatedBy(',') + .ofType(String.class); + parser.accepts("partitions", + "partitions to forklift. Comma delimited list or singleton. [Default: ALL SOURCE PARTITIONS]") + .withRequiredArg() + .describedAs("partitions") + .withValuesSeparatedBy(',') + .ofType(Integer.class); + parser.accepts("max-puts-per-second", + "Maximum number of put(...) operations issued against destination cluster per second. [Default: " + + DEFAULT_MAX_PUTS_PER_SEC + " ]") + .withRequiredArg() + .describedAs("maxPutsPerSecond") + .ofType(Integer.class); + parser.accepts("progress-period-ops", + "Number of operations between progress info is displayed. [Default: " + + DEFAULT_PROGRESS_PERIOD_OPS + " ]") + .withRequiredArg() + .describedAs("progressPeriodOps") + .ofType(Integer.class); + parser.accepts("parallelism", + "Number of partitions to fetch in parallel. [Default: " + + DEFAULT_PARTITION_PARALLELISM + " ]") + .withRequiredArg() + .describedAs("partitionParallelism") + .ofType(Integer.class); + parser.accepts("global-resolution", + "Determines if a thorough global resolution needs to be done, by comparing all replicas. [Default: Fetch from primary alone ]"); + return parser; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + OptionParser parser = null; + OptionSet options = null; + try { + parser = getParser(); + options = parser.parse(args); + } catch(Exception oe) { + logger.error("Exception processing command line options", oe); + parser.printHelpOn(System.out); + return; + } + + /* validate options */ + if(options.has("help")) { + parser.printHelpOn(System.out); + return; + } + + if(!options.has("src-url") || !options.has("dst-url")) { + logger.error("Both 'src-url' and 'dst-url' options are mandatory"); + parser.printHelpOn(System.out); + return; + } + + String srcBootstrapUrl = (String) options.valueOf("src-url"); + String dstBootstrapUrl = (String) options.valueOf("dst-url"); + int maxPutsPerSecond = DEFAULT_MAX_PUTS_PER_SEC; + if(options.has("max-puts-per-second")) + maxPutsPerSecond = (Integer) options.valueOf("max-puts-per-second"); + List storesList = null; + if(options.has("stores")) { + storesList = (List) options.valuesOf("stores"); + } + List partitions = null; + if(options.has("partitions")) { + partitions = (List) options.valuesOf("partitions"); + } + + int partitionParallelism = DEFAULT_PARTITION_PARALLELISM; + if(options.has("parallelism")) { + partitionParallelism = (Integer) options.valueOf("parallelism"); + } + int progressOps = DEFAULT_PROGRESS_PERIOD_OPS; + if(options.has("progress-period-ops")) { + progressOps = (Integer) options.valueOf("progress-period-ops"); + } + + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootstrapUrl, + dstBootstrapUrl, + maxPutsPerSecond, + partitionParallelism, + progressOps, + storesList, + partitions, + options.has("global-resolution")); + forkLiftTool.run(); + } +} diff --git a/src/java/voldemort/utils/ConsistencyFixWorker.java b/src/java/voldemort/utils/ConsistencyFixWorker.java index 63b6a0b97c..f04c4fe14d 100644 --- a/src/java/voldemort/utils/ConsistencyFixWorker.java +++ b/src/java/voldemort/utils/ConsistencyFixWorker.java @@ -16,10 +16,7 @@ package voldemort.utils; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.log4j.Logger; @@ -27,25 +24,17 @@ import voldemort.VoldemortException; import voldemort.client.protocol.admin.QueryKeyResult; import voldemort.store.routed.NodeValue; -import voldemort.store.routed.ReadRepairer; import voldemort.utils.ConsistencyFix.BadKey; import voldemort.utils.ConsistencyFix.BadKeyStatus; import voldemort.utils.ConsistencyFix.Status; import voldemort.versioning.ObsoleteVersionException; -import voldemort.versioning.VectorClock; -import voldemort.versioning.Versioned; -import com.google.common.collect.Lists; - -class ConsistencyFixWorker implements Runnable { +class ConsistencyFixWorker extends AbstractConsistencyFixer implements Runnable { private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class); - private static final int fakeNodeID = Integer.MIN_VALUE; - private final BadKey badKey; private final ConsistencyFix consistencyFix; private final BlockingQueue badKeyQOut; - private final QueryKeyResult orphanedValues; /** * Normal use case constructor. @@ -74,10 +63,12 @@ class ConsistencyFixWorker implements Runnable { ConsistencyFix consistencyFix, BlockingQueue badKeyQOut, QueryKeyResult orphanedValues) { - this.badKey = badKey; + super(badKey, + consistencyFix.getStoreInstance(), + consistencyFix.getAdminClient(), + orphanedValues); this.consistencyFix = consistencyFix; this.badKeyQOut = badKeyQOut; - this.orphanedValues = orphanedValues; } private String myName() { @@ -87,7 +78,7 @@ private String myName() { @Override public void run() { logger.trace("About to process key " + badKey + " (" + myName() + ")"); - Status status = doConsistencyFix(badKey); + Status status = doConsistencyFix(); logger.trace("Finished processing key " + badKey + " (" + myName() + ")"); consistencyFix.getStats().incrementFixCount(); @@ -101,261 +92,13 @@ public void run() { } } - public Status doConsistencyFix(BadKey badKey) { - // Initialization. - byte[] keyInBytes; - List nodeIdList = null; - int masterPartitionId = -1; - try { - keyInBytes = ByteUtils.fromHexString(badKey.getKeyInHexFormat()); - masterPartitionId = consistencyFix.getStoreInstance().getMasterPartitionId(keyInBytes); - nodeIdList = consistencyFix.getStoreInstance() - .getReplicationNodeList(masterPartitionId); - } catch(Exception exception) { - logger.info("Aborting fixKey due to bad init."); - if(logger.isDebugEnabled()) { - exception.printStackTrace(); - } - return Status.BAD_INIT; - } - ByteArray keyAsByteArray = new ByteArray(keyInBytes); - - // Do the reads - Map nodeIdToKeyValues = doReads(nodeIdList, - keyInBytes, - badKey.getKeyInHexFormat()); - - // Process read replies (i.e., nodeIdToKeyValues) - ProcessReadRepliesResult result = processReadReplies(nodeIdList, - keyAsByteArray, - badKey.getKeyInHexFormat(), - nodeIdToKeyValues); - if(result.status != Status.SUCCESS) { - return result.status; - } - - // Resolve conflicts indicated in nodeValues - List> toReadRepair = resolveReadConflicts(result.nodeValues); - if(logger.isTraceEnabled()) { - if(toReadRepair.size() == 0) { - logger.trace("Nothing to repair"); - } - for(NodeValue nodeValue: toReadRepair) { - logger.trace(nodeValue.getNodeId() + " --- " + nodeValue.getKey().toString()); - } - } - - // Do the repairs - Status status = doRepairPut(toReadRepair); - - // return status of last operation (success or otherwise) - return status; - } - - /** - * - * @param nodeIdList - * @param keyInBytes - * @param keyInHexFormat - * @return - */ - private Map doReads(final List nodeIdList, - final byte[] keyInBytes, - final String keyInHexFormat) { - Map nodeIdToKeyValues = new HashMap(); - - ByteArray key = new ByteArray(keyInBytes); - for(int nodeId: nodeIdList) { - List> values = null; - try { - values = consistencyFix.getAdminClient().storeOps.getNodeKey(consistencyFix.getStoreName(), - nodeId, - key); - nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); - } catch(VoldemortException ve) { - nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); - } - } - - return nodeIdToKeyValues; - } - - /** - * Result of an invocation of processReadReplies - */ - private class ProcessReadRepliesResult { - - public final Status status; - public final List> nodeValues; - - /** - * Constructor for error status - */ - ProcessReadRepliesResult(Status status) { - this.status = status; - this.nodeValues = null; - } - - /** - * Constructor for success - */ - ProcessReadRepliesResult(List> nodeValues) { - this.status = Status.SUCCESS; - this.nodeValues = nodeValues; - } - } - - /** - * - * @param nodeIdList - * @param keyAsByteArray - * @param keyInHexFormat - * @param nodeIdToKeyValues - * @param nodeValues Effectively the output of this method. Must pass in a - * non-null object to be populated by this method. - * @return - */ - private ProcessReadRepliesResult processReadReplies(final List nodeIdList, - final ByteArray keyAsByteArray, - final String keyInHexFormat, - final Map nodeIdToKeyValues) { - List> nodeValues = new ArrayList>(); - boolean exceptionsEncountered = false; - for(int nodeId: nodeIdList) { - QueryKeyResult keyValue; - if(nodeIdToKeyValues.containsKey(nodeId)) { - keyValue = nodeIdToKeyValues.get(nodeId); - - if(keyValue.hasException()) { - logger.debug("Exception encountered while fetching key " + keyInHexFormat - + " from node with nodeId " + nodeId + " : " - + keyValue.getException().getMessage()); - exceptionsEncountered = true; - } else { - if(keyValue.getValues().isEmpty()) { - Versioned versioned = new Versioned(null); - nodeValues.add(new NodeValue(nodeId, - keyValue.getKey(), - versioned)); - - } else { - for(Versioned value: keyValue.getValues()) { - nodeValues.add(new NodeValue(nodeId, - keyValue.getKey(), - value)); - } - } - } - } else { - logger.debug("No key-value returned from node with id:" + nodeId); - Versioned versioned = new Versioned(null); - nodeValues.add(new NodeValue(nodeId, keyAsByteArray, versioned)); - } - } - if(exceptionsEncountered) { - logger.info("Aborting fixKey because exceptions were encountered when fetching key-values."); - return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION); - } - - if(logger.isDebugEnabled()) { - for(NodeValue nkv: nodeValues) { - logger.debug("\tRead NodeKeyValue : " + ByteUtils.toHexString(nkv.getKey().get()) - + " on node with id " + nkv.getNodeId() + " for version " - + nkv.getVersion()); - } - } - - return new ProcessReadRepliesResult(nodeValues); - } - - /** - * Decide on the specific key-value to write everywhere. - * - * @param nodeValues - * @return The subset of entries from nodeValues that need to be repaired. - */ - private List> resolveReadConflicts(final List> nodeValues) { - - if(logger.isTraceEnabled()) { - logger.trace("NodeValues passed into resolveReadConflicts."); - if(nodeValues.size() == 0) { - logger.trace("Empty nodeValues passed to resolveReadConflicts"); - } - for(NodeValue nodeValue: nodeValues) { - logger.trace("\t" + nodeValue.getNodeId() + " - " + nodeValue.getKey().toString() - + " - " + nodeValue.getVersion().toString()); - } - } - - // If orphaned values exist, add them to fake nodes to be processed by - // "getRepairs" - int currentFakeNodeId = fakeNodeID; - if(this.orphanedValues != null) { - for(Versioned value: this.orphanedValues.getValues()) { - nodeValues.add(new NodeValue(currentFakeNodeId, - this.orphanedValues.getKey(), - value)); - currentFakeNodeId++; - } - } - - // Some cut-paste-and-modify coding from - // store/routed/action/AbstractReadRepair.java and - // store/routed/ThreadPoolRoutedStore.java - ReadRepairer readRepairer = new ReadRepairer(); - List> nodeKeyValues = readRepairer.getRepairs(nodeValues); - - if(logger.isTraceEnabled()) { - if(nodeKeyValues.size() == 0) { - logger.trace("\treadRepairer returned an empty list."); - } - for(NodeValue nodeKeyValue: nodeKeyValues) { - logger.trace("\tNodeKeyValue result from readRepairer.getRepairs : " - + ByteUtils.toHexString(nodeKeyValue.getKey().get()) - + " on node with id " + nodeKeyValue.getNodeId() + " for version " - + nodeKeyValue.getVersion()); - } - } - - List> toReadRepair = Lists.newArrayList(); - for(NodeValue v: nodeKeyValues) { - if(v.getNodeId() > currentFakeNodeId) { - // Only copy repairs intended for real nodes. - Versioned versioned = Versioned.value(v.getVersioned().getValue(), - ((VectorClock) v.getVersion()).clone()); - toReadRepair.add(new NodeValue(v.getNodeId(), - v.getKey(), - versioned)); - } else { - if(logger.isDebugEnabled()) { - logger.debug("\tIgnoring repair to fake node: " - + ByteUtils.toHexString(v.getKey().get()) + " on node with id " - + v.getNodeId() + " for version " + v.getVersion()); - } - } - - } - - if(logger.isTraceEnabled()) { - if(toReadRepair.size() == 0) { - logger.trace("\ttoReadRepair is empty."); - } - for(NodeValue nodeKeyValue: toReadRepair) { - logger.trace("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get()) - + " on node with id " + nodeKeyValue.getNodeId() + " for version " - + nodeKeyValue.getVersion()); - - } - } - return toReadRepair; - } - /** * * @param toReadRepair Effectively the output of this method. Must pass in a * non-null object to be populated by this method. * @return */ + @Override public Status doRepairPut(final List> toReadRepair) { if(this.consistencyFix.isDryRun()) { logger.debug("Returning success from ConsistencyFixWorker.doRepairPut because this is a dry run.");