Skip to content

Commit

Permalink
initial commit - new duplicate handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Oct 19, 2012
1 parent 5a021db commit 692b63f
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 273 deletions.
38 changes: 37 additions & 1 deletion src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -252,6 +252,14 @@ public static void main(String[] args) throws Exception {
.describedAs("query-keys")
.withValuesSeparatedBy(',')
.ofType(String.class);
parser.accepts("mirror-url", "Cluster url to mirror data from")
.withRequiredArg()
.describedAs("mirror-cluster-bootstrap-url")
.ofType(String.class);
parser.accepts("mirror-node", "Node id in the mirror cluster to mirror from")
.withRequiredArg()
.describedAs("id-of-mirror-node")
.ofType(Integer.class);

OptionSet options = parser.parse(args);

Expand All @@ -263,6 +271,8 @@ public static void main(String[] args) throws Exception {
Set<String> missing = CmdUtils.missing(options, "url", "node");
if(missing.size() > 0) {
// Not the most elegant way to do this
// basically check if only "node" is missing for these set of
// options; all these can live without explicit node ids
if(!(missing.equals(ImmutableSet.of("node"))
&& (options.has("add-stores") || options.has("delete-store")
|| options.has("ro-metadata") || options.has("set-metadata")
Expand Down Expand Up @@ -363,11 +373,20 @@ public static void main(String[] args) throws Exception {
ops += "q";
}

if(options.has("mirror-url")) {
if(!options.has("mirror-node")) {
Utils.croak("Specify the mirror node to fetch from");
}
if(!options.has("stores")) {
Utils.croak("Specify the list of stores to mirror");
}
ops += "h";
}
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
+ "repair-job, native-backup, rollback, reserve-memory, verify-metadata-version) must be specified");
+ "repair-job, native-backup, rollback, reserve-memory, mirror-url, verify-metadata-version) must be specified");
}

List<String> storeNames = null;
Expand Down Expand Up @@ -583,6 +602,21 @@ public static void main(String[] args) throws Exception {
}
executeQueryKeys(nodeId, adminClient, storeNames, keyList);
}
if(ops.contains("h")) {
if(nodeId == -1) {
System.err.println("Cannot run mirroring without node id");
System.exit(1);
}
Integer mirrorNodeId = CmdUtils.valueOf(options, "mirror-node", -1);
if(mirrorNodeId == -1) {
System.err.println("Cannot run mirroring without mirror node id");
System.exit(1);
}
adminClient.mirrorData(nodeId,
mirrorNodeId,
storeNames,
(String) options.valueOf("mirror-url"));
}
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
Expand Down Expand Up @@ -775,6 +809,8 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]");
stream.println("\t10) Query stores for a set of keys on a specific node.");
stream.println("\t\t./bin/voldemort-admin-tool.sh --query-keys [comma-separated list of keys] --url [url] --node [node-id] --stores [comma-separated list of store names]");
stream.println("\t11) Mirror data from another voldemort server");
stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
Expand Down
98 changes: 98 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -35,6 +35,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -2704,4 +2705,101 @@ public void reserveMemory(int nodeId, List<String> stores, long sizeInMB) {
logger.info("Finished reserving memory for store : " + storeName);
}
}

/**
* Mirror data from another voldemort server
*
* @param nodeId node in the current cluster to mirror to
* @param mirrorNodeId node from which to mirror data
* @param stores set of stores to be mirrored
* @param mirrorUrl cluster bootstrap url to mirror from
*/
public void mirrorData(final int nodeId,
final int mirrorNodeId,
List<String> stores,
final String mirrorUrl) {
final AdminClient mirrorAdminClient = new AdminClient(mirrorUrl, new AdminClientConfig());
final AdminClient currentAdminClient = this;

// determine the partitions residing on the mirror node
Node mirrorNode = mirrorAdminClient.getAdminClientCluster().getNodeById(mirrorNodeId);

if(mirrorNode == null) {
logger.error("Mirror node specified does not exist in the mirror cluster");
return;
}

// compare the mirror-from and mirrored-to nodes have same set of stores
List<StoreDefinition> currentStoreList = currentAdminClient.getRemoteStoreDefList(nodeId)
.getValue();
int numStoresCurr = 0;
for(StoreDefinition storeDef: currentStoreList) {
if(stores.contains(storeDef.getName()))
numStoresCurr++;
}
List<StoreDefinition> mirrorStoreList = mirrorAdminClient.getRemoteStoreDefList(mirrorNodeId)
.getValue();
int numStoresMirror = 0;
for(StoreDefinition storeDef: mirrorStoreList) {
if(stores.contains(storeDef.getName()))
numStoresMirror++;
}
if(numStoresCurr != stores.size() || numStoresMirror != stores.size()) {
logger.error("Make sure the set of stores specified exist on both sides");
return;
}

ExecutorService executors = Executors.newFixedThreadPool(stores.size(),
new ThreadFactory() {

public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("mirror-data-thread");
return thread;
}
});

final List<Integer> partitionIdList = mirrorNode.getPartitionIds();
final CyclicBarrier barrier = new CyclicBarrier(stores.size() + 1);
try {
for(final String storeName: stores)
executors.submit(new Runnable() {

public void run() {
try {
logger.info("Mirroring data for store " + storeName + " from node "
+ mirrorNodeId + "(" + mirrorUrl + ") to node " + nodeId
+ " partitions:" + partitionIdList);

Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = mirrorAdminClient.fetchEntries(mirrorNodeId,
storeName,
partitionIdList,
null,
false);
currentAdminClient.updateEntries(nodeId, storeName, iterator, null);

logger.info("Mirroring data for store:" + storeName + " from node "
+ mirrorNodeId + " completed.");
} catch(Exception e) {
logger.error("Mirroring operation for store " + storeName
+ "from node " + mirrorNodeId + " failed.", e);
} finally {
try {
barrier.await();
} catch(Exception e) {
logger.error("Error waiting for barrier while mirroring for "
+ storeName, e);
}
}
}
});
barrier.await();
} catch(Exception e) {
logger.error("Mirroring operation failed.", e);
} finally {
executors.shutdown();
logger.info("Finished mirroring data.");
}

}
}
2 changes: 1 addition & 1 deletion src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -218,7 +218,7 @@ public VoldemortConfig(Props props) {
this.bdbBtreeFanout = props.getInt("bdb.btree.fanout", 512);
this.bdbCheckpointBytes = props.getLong("bdb.checkpoint.interval.bytes", 20 * 1024 * 1024);
this.bdbCheckpointMs = props.getLong("bdb.checkpoint.interval.ms", 30 * Time.MS_PER_SECOND);
this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", true);
this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", false);
this.bdbOneEnvPerStore = props.getBoolean("bdb.one.env.per.store", false);
this.bdbCleanerMinFileUtilization = props.getInt("bdb.cleaner.min.file.utilization", 5);
this.bdbCleanerMinUtilization = props.getInt("bdb.cleaner.minUtilization", 50);
Expand Down
83 changes: 83 additions & 0 deletions src/java/voldemort/store/StoreBinaryFormat.java
@@ -0,0 +1,83 @@
package voldemort.store;

import java.util.ArrayList;
import java.util.List;

import voldemort.VoldemortException;
import voldemort.utils.ByteUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

/**
* Defines a generic on-disk data format for versioned voldemort data
*
* The format of the values stored on disk. The format is
* VERSION - 1 byte
* ------------repeating-------------------
* CLOCK - variable length, self delimiting
* NUM_CLOCK_ENTRIES - 2 bytes (short)
* VERSION_SIZE - 1 byte
* --------------- repeating ----------
* NODE_ID - 2 bytes (short)
* VERSION - VERSION_SIZE bytes
* ------------------------------------
* VALUE - variable length
* VALUE_SIZE - 4 bytes
* VALUE_BYTES - VALUE_SIZE bytes
* ----------------------------------------
*/
public class StoreBinaryFormat {

/* In the future we can use this to handle format changes */
private static final byte VERSION = 0;

public static byte[] toByteArray(List<Versioned<byte[]>> values) {
int size = 1;
for(Versioned<byte[]> v: values) {
size += ((VectorClock) v.getVersion()).sizeInBytes();
size += 4;
size += v.getValue().length;
}
byte[] bytes = new byte[size];
int pos = 1;
bytes[0] = VERSION;
for(Versioned<byte[]> v: values) {
//byte[] clock = ((VectorClock) v.getVersion()).toBytes();
//System.arraycopy(clock, 0, bytes, pos, clock.length);
pos += ((VectorClock) v.getVersion()).toBytes(bytes,pos);
//pos += clock.length;
int len = v.getValue().length;
ByteUtils.writeInt(bytes, len, pos);
pos += ByteUtils.SIZE_OF_INT;
System.arraycopy(v.getValue(), 0, bytes, pos, len);
pos += len;
}
if(pos != bytes.length)
throw new VoldemortException((bytes.length - pos)
+ " straggling bytes found in value (this should not be possible)!");
return bytes;
}

public static List<Versioned<byte[]>> fromByteArray(byte[] bytes) {
if(bytes.length < 1)
throw new VoldemortException("Invalid value length: " + bytes.length);
if(bytes[0] != VERSION)
throw new VoldemortException("Unexpected version number in value: " + bytes[0]);
int pos = 1;
List<Versioned<byte[]>> vals = new ArrayList<Versioned<byte[]>>(2);
while(pos < bytes.length) {
VectorClock clock = new VectorClock(bytes, pos);
pos += clock.sizeInBytes();
int valueSize = ByteUtils.readInt(bytes, pos);
pos += ByteUtils.SIZE_OF_INT;
byte[] val = new byte[valueSize];
System.arraycopy(bytes, pos, val, 0, valueSize);
pos += valueSize;
vals.add(Versioned.value(val, clock));
}
if(pos != bytes.length)
throw new VoldemortException((bytes.length - pos)
+ " straggling bytes found in value (this should not be possible)!");
return vals;
}
}

0 comments on commit 692b63f

Please sign in to comment.