Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Oct 19, 2012
1 parent 692b63f commit 0d449a7
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 84 deletions.
17 changes: 8 additions & 9 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -252,7 +252,7 @@ public static void main(String[] args) throws Exception {
.describedAs("query-keys")
.withValuesSeparatedBy(',')
.ofType(String.class);
parser.accepts("mirror-url", "Cluster url to mirror data from")
parser.accepts("mirror-from-url", "Cluster url to mirror data from")
.withRequiredArg()
.describedAs("mirror-cluster-bootstrap-url")
.ofType(String.class);
Expand Down Expand Up @@ -373,13 +373,10 @@ public static void main(String[] args) throws Exception {
ops += "q";
}

if(options.has("mirror-url")) {
if(options.has("mirror-from-url")) {
if(!options.has("mirror-node")) {
Utils.croak("Specify the mirror node to fetch from");
}
if(!options.has("stores")) {
Utils.croak("Specify the list of stores to mirror");
}
ops += "h";
}
if(ops.length() < 1) {
Expand Down Expand Up @@ -614,8 +611,8 @@ public static void main(String[] args) throws Exception {
}
adminClient.mirrorData(nodeId,
mirrorNodeId,
storeNames,
(String) options.valueOf("mirror-url"));
(String) options.valueOf("mirror-from-url"),
storeNames);
}
} catch(Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -809,8 +806,10 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]");
stream.println("\t10) Query stores for a set of keys on a specific node.");
stream.println("\t\t./bin/voldemort-admin-tool.sh --query-keys [comma-separated list of keys] --url [url] --node [node-id] --stores [comma-separated list of store names]");
stream.println("\t11) Mirror data from another voldemort server");
stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
stream.println("\t11) Mirror data from another voldemort server (possibly in another cluster) for specified stores");
stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
stream.println("\t12) Mirror data from another voldemort server (possibly in another cluster) for all stores in current cluster");
stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id]");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
Expand Down
78 changes: 41 additions & 37 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -35,7 +35,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -68,6 +68,7 @@
import voldemort.store.ErrorCodeMapper;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreUtils;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.mysql.MysqlStorageConfiguration;
Expand Down Expand Up @@ -2710,42 +2711,51 @@ public void reserveMemory(int nodeId, List<String> stores, long sizeInMB) {
* Mirror data from another voldemort server
*
* @param nodeId node in the current cluster to mirror to
* @param mirrorNodeId node from which to mirror data
* @param nodeIdToMirrorFrom node from which to mirror data
* @param urlToMirrorFrom cluster bootstrap url to mirror from
* @param stores set of stores to be mirrored
* @param mirrorUrl cluster bootstrap url to mirror from
*
*/
public void mirrorData(final int nodeId,
final int mirrorNodeId,
List<String> stores,
final String mirrorUrl) {
final AdminClient mirrorAdminClient = new AdminClient(mirrorUrl, new AdminClientConfig());
final int nodeIdToMirrorFrom,
final String urlToMirrorFrom,
List<String> stores) {
final AdminClient mirrorAdminClient = new AdminClient(urlToMirrorFrom,
new AdminClientConfig());
final AdminClient currentAdminClient = this;

// determine the partitions residing on the mirror node
Node mirrorNode = mirrorAdminClient.getAdminClientCluster().getNodeById(mirrorNodeId);
Node mirrorNode = mirrorAdminClient.getAdminClientCluster().getNodeById(nodeIdToMirrorFrom);
Node currentNode = currentAdminClient.getAdminClientCluster().getNodeById(nodeId);

if(mirrorNode == null) {
logger.error("Mirror node specified does not exist in the mirror cluster");
return;
}

// compare the mirror-from and mirrored-to nodes have same set of stores
List<StoreDefinition> currentStoreList = currentAdminClient.getRemoteStoreDefList(nodeId)
.getValue();
int numStoresCurr = 0;
for(StoreDefinition storeDef: currentStoreList) {
if(stores.contains(storeDef.getName()))
numStoresCurr++;
if(currentNode == null) {
logger.error("node specified does not exist in the current cluster");
return;
}
List<StoreDefinition> mirrorStoreList = mirrorAdminClient.getRemoteStoreDefList(mirrorNodeId)
.getValue();
int numStoresMirror = 0;
for(StoreDefinition storeDef: mirrorStoreList) {
if(stores.contains(storeDef.getName()))
numStoresMirror++;

// compare the mirror-from and mirrored-to nodes have same set of stores
List<String> currentStoreList = StoreUtils.getStoreNames(currentAdminClient.getRemoteStoreDefList(nodeId)
.getValue(),
true);
List<String> mirrorStoreList = StoreUtils.getStoreNames(mirrorAdminClient.getRemoteStoreDefList(nodeIdToMirrorFrom)
.getValue(),
true);
if(stores == null)
stores = currentStoreList;

if(!currentStoreList.containsAll(stores) || !mirrorStoreList.containsAll(stores)) {
logger.error("Make sure the set of stores match on both sides");
return;
}
if(numStoresCurr != stores.size() || numStoresMirror != stores.size()) {
logger.error("Make sure the set of stores specified exist on both sides");

// check if the partitions are same on both the nodes
if(!currentNode.getPartitionIds().equals(mirrorNode.getPartitionIds())) {
logger.error("Make sure the same set of partitions exist on both sides");
return;
}

Expand All @@ -2760,46 +2770,40 @@ public Thread newThread(Runnable r) {
});

final List<Integer> partitionIdList = mirrorNode.getPartitionIds();
final CyclicBarrier barrier = new CyclicBarrier(stores.size() + 1);
final CountDownLatch waitLatch = new CountDownLatch(stores.size());
try {
for(final String storeName: stores)
executors.submit(new Runnable() {

public void run() {
try {
logger.info("Mirroring data for store " + storeName + " from node "
+ mirrorNodeId + "(" + mirrorUrl + ") to node " + nodeId
+ " partitions:" + partitionIdList);
+ nodeIdToMirrorFrom + "(" + urlToMirrorFrom + ") to node "
+ nodeId + " partitions:" + partitionIdList);

Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = mirrorAdminClient.fetchEntries(mirrorNodeId,
Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = mirrorAdminClient.fetchEntries(nodeIdToMirrorFrom,
storeName,
partitionIdList,
null,
false);
currentAdminClient.updateEntries(nodeId, storeName, iterator, null);

logger.info("Mirroring data for store:" + storeName + " from node "
+ mirrorNodeId + " completed.");
+ nodeIdToMirrorFrom + " completed.");
} catch(Exception e) {
logger.error("Mirroring operation for store " + storeName
+ "from node " + mirrorNodeId + " failed.", e);
+ "from node " + nodeIdToMirrorFrom + " failed.", e);
} finally {
try {
barrier.await();
} catch(Exception e) {
logger.error("Error waiting for barrier while mirroring for "
+ storeName, e);
}
waitLatch.countDown();
}
}
});
barrier.await();
waitLatch.await();
} catch(Exception e) {
logger.error("Mirroring operation failed.", e);
} finally {
executors.shutdown();
logger.info("Finished mirroring data.");
}

}
}
10 changes: 0 additions & 10 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -66,7 +66,6 @@ public class VoldemortConfig implements Serializable {
private long bdbCacheSize;
private boolean bdbWriteTransactions;
private boolean bdbFlushTransactions;
private boolean bdbSortedDuplicates;
private String bdbDataDirectory;
private long bdbMaxLogFileSize;
private int bdbBtreeFanout;
Expand Down Expand Up @@ -218,7 +217,6 @@ public VoldemortConfig(Props props) {
this.bdbBtreeFanout = props.getInt("bdb.btree.fanout", 512);
this.bdbCheckpointBytes = props.getLong("bdb.checkpoint.interval.bytes", 20 * 1024 * 1024);
this.bdbCheckpointMs = props.getLong("bdb.checkpoint.interval.ms", 30 * Time.MS_PER_SECOND);
this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", false);
this.bdbOneEnvPerStore = props.getBoolean("bdb.one.env.per.store", false);
this.bdbCleanerMinFileUtilization = props.getInt("bdb.cleaner.min.file.utilization", 5);
this.bdbCleanerMinUtilization = props.getInt("bdb.cleaner.minUtilization", 50);
Expand Down Expand Up @@ -1267,14 +1265,6 @@ public void setBdbWriteTransactions(boolean bdbWriteTransactions) {
this.bdbWriteTransactions = bdbWriteTransactions;
}

public boolean isBdbSortedDuplicatesEnabled() {
return this.bdbSortedDuplicates;
}

public void setBdbSortedDuplicates(boolean enable) {
this.bdbSortedDuplicates = enable;
}

public void setBdbOneEnvPerStore(boolean bdbOneEnvPerStore) {
this.bdbOneEnvPerStore = bdbOneEnvPerStore;
}
Expand Down
Expand Up @@ -356,7 +356,8 @@ protected Versioned<Slop> computeNext() {

writeThrottler.maybeThrottle(writtenLast);
writtenLast = slopSize(head);
deleteBatch.add(Pair.create(head.getValue().makeKey(), head.getVersion()));
deleteBatch.add(Pair.create(head.getValue().makeKey(),
(Version) head.getVersion()));
return head;
}
}
Expand Down
29 changes: 13 additions & 16 deletions src/java/voldemort/store/StoreBinaryFormat.java
Expand Up @@ -14,16 +14,16 @@
* The format of the values stored on disk. The format is
* VERSION - 1 byte
* ------------repeating-------------------
* CLOCK - variable length, self delimiting
* NUM_CLOCK_ENTRIES - 2 bytes (short)
* VERSION_SIZE - 1 byte
* --------------- repeating ----------
* NODE_ID - 2 bytes (short)
* VERSION - VERSION_SIZE bytes
* ------------------------------------
* VALUE - variable length
* VALUE_SIZE - 4 bytes
* VALUE_BYTES - VALUE_SIZE bytes
* CLOCK - variable length, self delimiting
* NUM_CLOCK_ENTRIES - 2 bytes (short)
* VERSION_SIZE - 1 byte
* --------------- repeating ----------
* NODE_ID - 2 bytes (short)
* VERSION - VERSION_SIZE bytes
* ------------------------------------
* VALUE - variable length
* VALUE_SIZE - 4 bytes
* VALUE_BYTES - VALUE_SIZE bytes
* ----------------------------------------
*/
public class StoreBinaryFormat {
Expand All @@ -34,18 +34,15 @@ public class StoreBinaryFormat {
public static byte[] toByteArray(List<Versioned<byte[]>> values) {
int size = 1;
for(Versioned<byte[]> v: values) {
size += ((VectorClock) v.getVersion()).sizeInBytes();
size += 4;
size += v.getVersion().sizeInBytes();
size += ByteUtils.SIZE_OF_INT;
size += v.getValue().length;
}
byte[] bytes = new byte[size];
int pos = 1;
bytes[0] = VERSION;
for(Versioned<byte[]> v: values) {
//byte[] clock = ((VectorClock) v.getVersion()).toBytes();
//System.arraycopy(clock, 0, bytes, pos, clock.length);
pos += ((VectorClock) v.getVersion()).toBytes(bytes,pos);
//pos += clock.length;
pos += v.getVersion().toBytes(bytes, pos);
int len = v.getValue().length;
ByteUtils.writeInt(bytes, len, pos);
pos += ByteUtils.SIZE_OF_INT;
Expand Down
16 changes: 16 additions & 0 deletions src/java/voldemort/store/StoreUtils.java
Expand Up @@ -18,6 +18,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -199,4 +200,19 @@ public static StoreDefinition getStoreDef(List<StoreDefinition> list, String nam
return def;
return null;
}

/**
* Get the list of store names from a list of store definitions
*
* @param list
* @param ignoreViews
* @return list of store names
*/
public static List<String> getStoreNames(List<StoreDefinition> list, boolean ignoreViews) {
List<String> storeNameSet = new ArrayList<String>();
for(StoreDefinition def: list)
if(!def.isView() || !ignoreViews)
storeNameSet.add(def.getName());
return storeNameSet;
}
}
2 changes: 1 addition & 1 deletion src/java/voldemort/store/bdb/BdbStorageConfiguration.java
Expand Up @@ -111,7 +111,7 @@ public BdbStorageConfiguration(VoldemortConfig config) {
environmentConfig.setLockTimeout(config.getBdbLockTimeoutMs(), TimeUnit.MILLISECONDS);
databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
databaseConfig.setSortedDuplicates(false);
databaseConfig.setNodeMaxEntries(config.getBdbBtreeFanout());
databaseConfig.setTransactional(true);
bdbMasterDir = config.getBdbDataDirectory();
Expand Down
5 changes: 3 additions & 2 deletions src/java/voldemort/store/bdb/BdbStorageEngine.java
Expand Up @@ -365,7 +365,7 @@ public boolean delete(ByteArray key, Version version) throws PersistenceFailureE
OperationStatus status = getBdbDatabase().get(transaction,
keyEntry,
valueEntry,
readLockMode);
LockMode.RMW);
// key does not exist to begin with.
if(OperationStatus.NOTFOUND == status)
return false;
Expand Down Expand Up @@ -449,7 +449,8 @@ private void attemptAbort(Transaction transaction) {

private void attemptCommit(Transaction transaction) {
try {
transaction.commit();
if(transaction != null)
transaction.commit();
} catch(DatabaseException e) {
logger.error("Transaction commit failed!", e);
attemptAbort(transaction);
Expand Down
13 changes: 7 additions & 6 deletions src/java/voldemort/versioning/VectorClock.java
Expand Up @@ -121,18 +121,19 @@ public byte[] toBytes() {
public int toBytes(byte[] buf, int offset) {
// write the number of versions
ByteUtils.writeShort(buf, (short) versions.size(), offset);
offset += ByteUtils.SIZE_OF_SHORT;
// write the size of each version in bytes
byte versionSize = ByteUtils.numberOfBytesRequired(getMaxVersion());
buf[offset + 2] = versionSize;
buf[offset] = versionSize;
offset++;

int clockEntrySize = ByteUtils.SIZE_OF_SHORT + versionSize;
int start = offset + 3;
for(ClockEntry v: versions) {
ByteUtils.writeShort(buf, v.getNodeId(), start);
ByteUtils.writeBytes(buf, v.getVersion(), start + ByteUtils.SIZE_OF_SHORT, versionSize);
start += clockEntrySize;
ByteUtils.writeShort(buf, v.getNodeId(), offset);
ByteUtils.writeBytes(buf, v.getVersion(), offset + ByteUtils.SIZE_OF_SHORT, versionSize);
offset += clockEntrySize;
}
ByteUtils.writeLong(buf, this.timestamp, start);
ByteUtils.writeLong(buf, this.timestamp, offset);
return sizeInBytes();
}

Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/versioning/Versioned.java
Expand Up @@ -44,7 +44,7 @@ public Versioned(T object, Version version) {
this.object = object;
}

public Version getVersion() {
public VectorClock getVersion() {
return version;
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/voldemort/CatBdbStore.java
Expand Up @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception {
DatabaseConfig databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setTransactional(config.isBdbWriteTransactionsEnabled());
databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
databaseConfig.setSortedDuplicates(false);
Database database = environment.openDatabase(null, storeName, databaseConfig);
StorageEngine<ByteArray, byte[], byte[]> store = new BdbStorageEngine(storeName,
environment,
Expand Down

0 comments on commit 0d449a7

Please sign in to comment.