Permalink
Browse files

initial commit - new duplicate handling

  • Loading branch information...
1 parent 5a021db commit 692b63fc5ada7dd00a949763d69e739e5ca4d33d @vinothchandar vinothchandar committed Jul 27, 2012
View
38 src/java/voldemort/VoldemortAdminTool.java
@@ -252,6 +252,14 @@ public static void main(String[] args) throws Exception {
.describedAs("query-keys")
.withValuesSeparatedBy(',')
.ofType(String.class);
+ parser.accepts("mirror-url", "Cluster url to mirror data from")
+ .withRequiredArg()
+ .describedAs("mirror-cluster-bootstrap-url")
+ .ofType(String.class);
+ parser.accepts("mirror-node", "Node id in the mirror cluster to mirror from")
+ .withRequiredArg()
+ .describedAs("id-of-mirror-node")
+ .ofType(Integer.class);
OptionSet options = parser.parse(args);
@@ -263,6 +271,8 @@ public static void main(String[] args) throws Exception {
Set<String> missing = CmdUtils.missing(options, "url", "node");
if(missing.size() > 0) {
// Not the most elegant way to do this
+ // basically check if only "node" is missing for these set of
+ // options; all these can live without explicit node ids
if(!(missing.equals(ImmutableSet.of("node"))
&& (options.has("add-stores") || options.has("delete-store")
|| options.has("ro-metadata") || options.has("set-metadata")
@@ -363,11 +373,20 @@ public static void main(String[] args) throws Exception {
ops += "q";
}
+ if(options.has("mirror-url")) {
+ if(!options.has("mirror-node")) {
+ Utils.croak("Specify the mirror node to fetch from");
+ }
+ if(!options.has("stores")) {
+ Utils.croak("Specify the list of stores to mirror");
+ }
+ ops += "h";
+ }
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
- + "repair-job, native-backup, rollback, reserve-memory, verify-metadata-version) must be specified");
+ + "repair-job, native-backup, rollback, reserve-memory, mirror-url, verify-metadata-version) must be specified");
}
List<String> storeNames = null;
@@ -583,6 +602,21 @@ public static void main(String[] args) throws Exception {
}
executeQueryKeys(nodeId, adminClient, storeNames, keyList);
}
+ if(ops.contains("h")) {
+ if(nodeId == -1) {
+ System.err.println("Cannot run mirroring without node id");
+ System.exit(1);
+ }
+ Integer mirrorNodeId = CmdUtils.valueOf(options, "mirror-node", -1);
+ if(mirrorNodeId == -1) {
+ System.err.println("Cannot run mirroring without mirror node id");
+ System.exit(1);
+ }
+ adminClient.mirrorData(nodeId,
+ mirrorNodeId,
+ storeNames,
+ (String) options.valueOf("mirror-url"));
+ }
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
@@ -775,6 +809,8 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]");
stream.println("\t10) Query stores for a set of keys on a specific node.");
stream.println("\t\t./bin/voldemort-admin-tool.sh --query-keys [comma-separated list of keys] --url [url] --node [node-id] --stores [comma-separated list of store names]");
+ stream.println("\t11) Mirror data from another voldemort server");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
View
98 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -35,6 +35,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -2704,4 +2705,101 @@ public void reserveMemory(int nodeId, List<String> stores, long sizeInMB) {
logger.info("Finished reserving memory for store : " + storeName);
}
}
+
+ /**
+ * Mirror data from another voldemort server
+ *
+ * @param nodeId node in the current cluster to mirror to
+ * @param mirrorNodeId node from which to mirror data
+ * @param stores set of stores to be mirrored
+ * @param mirrorUrl cluster bootstrap url to mirror from
+ */
+ public void mirrorData(final int nodeId,
+ final int mirrorNodeId,
+ List<String> stores,
+ final String mirrorUrl) {
+ final AdminClient mirrorAdminClient = new AdminClient(mirrorUrl, new AdminClientConfig());
+ final AdminClient currentAdminClient = this;
+
+ // determine the partitions residing on the mirror node
+ Node mirrorNode = mirrorAdminClient.getAdminClientCluster().getNodeById(mirrorNodeId);
+
+ if(mirrorNode == null) {
+ logger.error("Mirror node specified does not exist in the mirror cluster");
+ return;
+ }
+
+ // compare the mirror-from and mirrored-to nodes have same set of stores
+ List<StoreDefinition> currentStoreList = currentAdminClient.getRemoteStoreDefList(nodeId)
+ .getValue();
+ int numStoresCurr = 0;
+ for(StoreDefinition storeDef: currentStoreList) {
+ if(stores.contains(storeDef.getName()))
+ numStoresCurr++;
+ }
+ List<StoreDefinition> mirrorStoreList = mirrorAdminClient.getRemoteStoreDefList(mirrorNodeId)
+ .getValue();
+ int numStoresMirror = 0;
+ for(StoreDefinition storeDef: mirrorStoreList) {
+ if(stores.contains(storeDef.getName()))
+ numStoresMirror++;
+ }
+ if(numStoresCurr != stores.size() || numStoresMirror != stores.size()) {
+ logger.error("Make sure the set of stores specified exist on both sides");
+ return;
+ }
+
+ ExecutorService executors = Executors.newFixedThreadPool(stores.size(),
+ new ThreadFactory() {
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName("mirror-data-thread");
+ return thread;
+ }
+ });
+
+ final List<Integer> partitionIdList = mirrorNode.getPartitionIds();
+ final CyclicBarrier barrier = new CyclicBarrier(stores.size() + 1);
+ try {
+ for(final String storeName: stores)
+ executors.submit(new Runnable() {
+
+ public void run() {
+ try {
+ logger.info("Mirroring data for store " + storeName + " from node "
+ + mirrorNodeId + "(" + mirrorUrl + ") to node " + nodeId
+ + " partitions:" + partitionIdList);
+
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = mirrorAdminClient.fetchEntries(mirrorNodeId,
+ storeName,
+ partitionIdList,
+ null,
+ false);
+ currentAdminClient.updateEntries(nodeId, storeName, iterator, null);
+
+ logger.info("Mirroring data for store:" + storeName + " from node "
+ + mirrorNodeId + " completed.");
+ } catch(Exception e) {
+ logger.error("Mirroring operation for store " + storeName
+ + "from node " + mirrorNodeId + " failed.", e);
+ } finally {
+ try {
+ barrier.await();
+ } catch(Exception e) {
+ logger.error("Error waiting for barrier while mirroring for "
+ + storeName, e);
+ }
+ }
+ }
+ });
+ barrier.await();
+ } catch(Exception e) {
+ logger.error("Mirroring operation failed.", e);
+ } finally {
+ executors.shutdown();
+ logger.info("Finished mirroring data.");
+ }
+
+ }
}
View
2 src/java/voldemort/server/VoldemortConfig.java
@@ -218,7 +218,7 @@ public VoldemortConfig(Props props) {
this.bdbBtreeFanout = props.getInt("bdb.btree.fanout", 512);
this.bdbCheckpointBytes = props.getLong("bdb.checkpoint.interval.bytes", 20 * 1024 * 1024);
this.bdbCheckpointMs = props.getLong("bdb.checkpoint.interval.ms", 30 * Time.MS_PER_SECOND);
- this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", true);
+ this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", false);
this.bdbOneEnvPerStore = props.getBoolean("bdb.one.env.per.store", false);
this.bdbCleanerMinFileUtilization = props.getInt("bdb.cleaner.min.file.utilization", 5);
this.bdbCleanerMinUtilization = props.getInt("bdb.cleaner.minUtilization", 50);
View
83 src/java/voldemort/store/StoreBinaryFormat.java
@@ -0,0 +1,83 @@
+package voldemort.store;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import voldemort.VoldemortException;
+import voldemort.utils.ByteUtils;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Versioned;
+
+/**
+ * Defines a generic on-disk data format for versioned voldemort data
+ *
+ * The format of the values stored on disk. The format is
+ * VERSION - 1 byte
+ * ------------repeating-------------------
+ * CLOCK - variable length, self delimiting
+ * NUM_CLOCK_ENTRIES - 2 bytes (short)
+ * VERSION_SIZE - 1 byte
+ * --------------- repeating ----------
+ * NODE_ID - 2 bytes (short)
+ * VERSION - VERSION_SIZE bytes
+ * ------------------------------------
+ * VALUE - variable length
+ * VALUE_SIZE - 4 bytes
+ * VALUE_BYTES - VALUE_SIZE bytes
+ * ----------------------------------------
+ */
+public class StoreBinaryFormat {
+
+ /* In the future we can use this to handle format changes */
+ private static final byte VERSION = 0;
+
+ public static byte[] toByteArray(List<Versioned<byte[]>> values) {
+ int size = 1;
+ for(Versioned<byte[]> v: values) {
+ size += ((VectorClock) v.getVersion()).sizeInBytes();
+ size += 4;
+ size += v.getValue().length;
+ }
+ byte[] bytes = new byte[size];
+ int pos = 1;
+ bytes[0] = VERSION;
+ for(Versioned<byte[]> v: values) {
+ //byte[] clock = ((VectorClock) v.getVersion()).toBytes();
+ //System.arraycopy(clock, 0, bytes, pos, clock.length);
+ pos += ((VectorClock) v.getVersion()).toBytes(bytes,pos);
+ //pos += clock.length;
+ int len = v.getValue().length;
+ ByteUtils.writeInt(bytes, len, pos);
+ pos += ByteUtils.SIZE_OF_INT;
+ System.arraycopy(v.getValue(), 0, bytes, pos, len);
+ pos += len;
+ }
+ if(pos != bytes.length)
+ throw new VoldemortException((bytes.length - pos)
+ + " straggling bytes found in value (this should not be possible)!");
+ return bytes;
+ }
+
+ public static List<Versioned<byte[]>> fromByteArray(byte[] bytes) {
+ if(bytes.length < 1)
+ throw new VoldemortException("Invalid value length: " + bytes.length);
+ if(bytes[0] != VERSION)
+ throw new VoldemortException("Unexpected version number in value: " + bytes[0]);
+ int pos = 1;
+ List<Versioned<byte[]>> vals = new ArrayList<Versioned<byte[]>>(2);
+ while(pos < bytes.length) {
+ VectorClock clock = new VectorClock(bytes, pos);
+ pos += clock.sizeInBytes();
+ int valueSize = ByteUtils.readInt(bytes, pos);
+ pos += ByteUtils.SIZE_OF_INT;
+ byte[] val = new byte[valueSize];
+ System.arraycopy(bytes, pos, val, 0, valueSize);
+ pos += valueSize;
+ vals.add(Versioned.value(val, clock));
+ }
+ if(pos != bytes.length)
+ throw new VoldemortException((bytes.length - pos)
+ + " straggling bytes found in value (this should not be possible)!");
+ return vals;
+ }
+}
View
482 src/java/voldemort/store/bdb/BdbStorageEngine.java
@@ -17,24 +17,26 @@
package voldemort.store.bdb;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.Hex;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxOperation;
-import voldemort.serialization.IdentitySerializer;
-import voldemort.serialization.Serializer;
-import voldemort.serialization.VersionedSerializer;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.store.NoSuchCapabilityException;
import voldemort.store.PersistenceFailureException;
import voldemort.store.StorageEngine;
import voldemort.store.StorageInitializationException;
import voldemort.store.Store;
+import voldemort.store.StoreBinaryFormat;
import voldemort.store.StoreCapabilityType;
import voldemort.store.StoreUtils;
import voldemort.store.backup.NativeBackupable;
@@ -46,11 +48,9 @@
import voldemort.utils.Utils;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Occurred;
-import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
-import com.google.common.collect.Lists;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
@@ -65,7 +65,6 @@
/**
* A store that uses BDB for persistence
*
- *
*/
public class BdbStorageEngine implements StorageEngine<ByteArray, byte[], byte[]>, NativeBackupable {
@@ -75,10 +74,8 @@
private final String name;
private Database bdbDatabase;
private final Environment environment;
- private final VersionedSerializer<byte[]> versionedSerializer;
private final AtomicBoolean isOpen;
private final LockMode readLockMode;
- private final Serializer<Version> versionSerializer;
private final BdbEnvironmentStats bdbEnvironmentStats;
private final AtomicBoolean isTruncating = new AtomicBoolean(false);
@@ -89,17 +86,6 @@ public BdbStorageEngine(String name,
this.name = Utils.notNull(name);
this.bdbDatabase = Utils.notNull(database);
this.environment = Utils.notNull(environment);
- this.versionedSerializer = new VersionedSerializer<byte[]>(new IdentitySerializer());
- this.versionSerializer = new Serializer<Version>() {
-
- public byte[] toBytes(Version object) {
- return ((VectorClock) object).toBytes();
- }
-
- public Version toObject(byte[] bytes) {
- return versionedSerializer.getVersion(bytes);
- }
- };
this.isOpen = new AtomicBoolean(true);
this.readLockMode = config.getLockMode();
this.bdbEnvironmentStats = new BdbEnvironmentStats(environment,
@@ -151,9 +137,7 @@ public void truncate() {
throw new VoldemortException("Failed to truncate Bdb store " + getName(), e);
} finally {
-
commitOrAbort(succeeded, transaction);
-
// reopen the bdb database for future queries.
if(reopenBdbDatabase()) {
isTruncating.compareAndSet(true, false);
@@ -196,37 +180,46 @@ private boolean reopenBdbDatabase() {
}
}
+ /**
+ * truncate() operation mandates that all opened Database be closed before
+ * attempting truncation.
+ * <p>
+ * This method throws an exception while truncation is happening to any
+ * request attempting in parallel with store truncation.
+ *
+ * @return
+ */
+ private Database getBdbDatabase() {
+ if(isTruncating.get()) {
+ throw new VoldemortException("Bdb Store " + getName()
+ + " is currently truncating cannot serve any request.");
+ }
+ return bdbDatabase;
+ }
+
public List<Version> getVersions(ByteArray key) {
- return get(key, null, readLockMode, versionSerializer);
+ return StoreUtils.getVersions(get(key, null));
}
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms)
throws PersistenceFailureException {
- return get(key, transforms, readLockMode, versionedSerializer);
- }
-
- private <T> List<T> get(ByteArray key,
- @SuppressWarnings("unused") byte[] transforms,
- LockMode lockMode,
- Serializer<T> serializer) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
+ DatabaseEntry keyEntry = new DatabaseEntry(key.get());
+ DatabaseEntry valueEntry = new DatabaseEntry();
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
- Cursor cursor = null;
try {
- cursor = getBdbDatabase().openCursor(null, null);
- List<T> result = get(cursor, key, lockMode, serializer);
-
- // If null, try again in different locking mode to
- // avoid null result due to gap between delete and new write
- if(result.size() == 0 && lockMode != LockMode.DEFAULT) {
- return get(cursor, key, LockMode.DEFAULT, serializer);
+ // uncommitted reads are perfectly fine now, since we have no
+ // je-delete() in put()
+ OperationStatus status = getBdbDatabase().get(null, keyEntry, valueEntry, readLockMode);
+ if(OperationStatus.SUCCESS == status) {
+ return StoreBinaryFormat.fromByteArray(valueEntry.getData());
} else {
- return result;
+ return Collections.emptyList();
}
} catch(DatabaseException e) {
logger.error(e);
@@ -238,142 +231,90 @@ private boolean reopenBdbDatabase() {
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
-
- attemptClose(cursor);
}
}
- /**
- * truncate() operation mandates that all opened Database be closed before
- * attempting truncation.
- * <p>
- * This method throws an exception while truncation is happening to any
- * request attempting in parallel with store truncation.
- *
- * @return
- */
- private Database getBdbDatabase() {
- if(isTruncating.get()) {
- throw new VoldemortException("Bdb Store " + getName()
- + " is currently truncating cannot serve any request.");
- }
-
- return bdbDatabase;
- }
-
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
-
+ StoreUtils.assertValidKeys(keys);
+ Map<ByteArray, List<Versioned<byte[]>>> results = null;
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
-
- StoreUtils.assertValidKeys(keys);
- Map<ByteArray, List<Versioned<byte[]>>> result = StoreUtils.newEmptyHashMap(keys);
- Cursor cursor = null;
-
- String keyStr = "";
-
try {
- cursor = getBdbDatabase().openCursor(null, null);
- for(ByteArray key: keys) {
-
- if(logger.isTraceEnabled())
- keyStr += ByteUtils.toHexString(key.get()) + " ";
-
- List<Versioned<byte[]>> values = get(cursor, key, readLockMode, versionedSerializer);
- if(!values.isEmpty())
- result.put(key, values);
- }
- } catch(DatabaseException e) {
- logger.error(e);
- throw new PersistenceFailureException(e);
+ results = StoreUtils.getAll(this, keys, transforms);
+ } catch(PersistenceFailureException pfe) {
+ throw pfe;
} finally {
- attemptClose(cursor);
- }
-
- if(logger.isTraceEnabled())
- logger.trace("Completed GETALL from keys " + keyStr + " in "
- + (System.nanoTime() - startTimeNs) + " ns at "
- + System.currentTimeMillis());
-
- return result;
- }
-
- private static <T> List<T> get(Cursor cursor,
- ByteArray key,
- LockMode lockMode,
- Serializer<T> serializer) throws DatabaseException {
- StoreUtils.assertValidKey(key);
-
- long startTimeNs = -1;
-
- if(logger.isTraceEnabled())
- startTimeNs = System.nanoTime();
-
- DatabaseEntry keyEntry = new DatabaseEntry(key.get());
- DatabaseEntry valueEntry = new DatabaseEntry();
- List<T> results = Lists.newArrayList();
-
- for(OperationStatus status = cursor.getSearchKey(keyEntry, valueEntry, lockMode); status == OperationStatus.SUCCESS; status = cursor.getNextDup(keyEntry,
- valueEntry,
- lockMode)) {
- results.add(serializer.toObject(valueEntry.getData()));
- }
-
- if(logger.isTraceEnabled()) {
- logger.trace("Completed GET from key " + ByteUtils.toHexString(key.get()) + " in "
- + (System.nanoTime() - startTimeNs) + " ns at "
- + System.currentTimeMillis());
+ if(logger.isTraceEnabled()) {
+ String keyStr = "";
+ for(ByteArray key: keys)
+ keyStr += key + " ";
+ logger.trace("Completed GETALL from keys " + keyStr + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
}
return results;
}
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws PersistenceFailureException {
- StoreUtils.assertValidKey(key);
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
+ StoreUtils.assertValidKey(key);
DatabaseEntry keyEntry = new DatabaseEntry(key.get());
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
boolean succeeded = false;
Transaction transaction = null;
- Cursor cursor = null;
- try {
- transaction = this.environment.beginTransaction(null, null);
+ List<Versioned<byte[]>> vals = null;
- // Check existing values
- // if there is a version obsoleted by this value delete it
- // if there is a version later than this one, throw an exception
- DatabaseEntry valueEntry = new DatabaseEntry();
- cursor = getBdbDatabase().openCursor(transaction, null);
- for(OperationStatus status = cursor.getSearchKey(keyEntry, valueEntry, LockMode.RMW); status == OperationStatus.SUCCESS; status = cursor.getNextDup(keyEntry,
- valueEntry,
- LockMode.RMW)) {
- VectorClock clock = new VectorClock(valueEntry.getData());
- Occurred occurred = value.getVersion().compare(clock);
- if(occurred == Occurred.BEFORE)
- throw new ObsoleteVersionException("Key "
- + new String(hexCodec.encode(key.get()))
- + " "
- + value.getVersion().toString()
- + " is obsolete, it is no greater than the current version of "
- + clock + ".");
- else if(occurred == Occurred.AFTER)
- // best effort delete of obsolete previous value!
- cursor.delete();
+ try {
+ transaction = environment.beginTransaction(null, null);
+
+ // do a get for the existing values
+ OperationStatus status = getBdbDatabase().get(transaction,
+ keyEntry,
+ valueEntry,
+ LockMode.RMW);
+ if(OperationStatus.SUCCESS == status) {
+ // update
+ vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ // compare vector clocks and throw out old ones, for updates
+
+ Iterator<Versioned<byte[]>> iter = vals.iterator();
+ while(iter.hasNext()) {
+ Versioned<byte[]> curr = iter.next();
+ Occurred occurred = value.getVersion().compare(curr.getVersion());
+ if(occurred == Occurred.BEFORE)
+ throw new ObsoleteVersionException("Key "
+ + new String(hexCodec.encode(key.get()))
+ + " "
+ + value.getVersion().toString()
+ + " is obsolete, it is no greater than the current version of "
+ + curr.getVersion().toString() + ".");
+ else if(occurred == Occurred.AFTER)
+ iter.remove();
+ }
+ } else {
+ // insert
+ vals = new ArrayList<Versioned<byte[]>>();
}
- // Okay so we cleaned up all the prior stuff, so now we are good to
- // insert the new thing
- valueEntry = new DatabaseEntry(versionedSerializer.toBytes(value));
- OperationStatus status = cursor.put(keyEntry, valueEntry);
+ // update the new value
+ vals.add(value);
+
+ valueEntry.setData(StoreBinaryFormat.toByteArray(vals));
+ status = getBdbDatabase().put(transaction, keyEntry, valueEntry);
+
if(status != OperationStatus.SUCCESS)
throw new PersistenceFailureException("Put operation failed with status: " + status);
succeeded = true;
@@ -382,66 +323,91 @@ else if(occurred == Occurred.AFTER)
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
- attemptClose(cursor);
if(succeeded)
attemptCommit(transaction);
else
attemptAbort(transaction);
- }
-
- if(logger.isTraceEnabled()) {
- logger.trace("Completed PUT to key " + ByteUtils.toHexString(key.get()) + " (keyRef: "
- + System.identityHashCode(key) + " value " + value + " in "
- + (System.nanoTime() - startTimeNs) + " ns at "
- + System.currentTimeMillis());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Completed PUT to key " + key + " (keyRef: "
+ + System.identityHashCode(key) + " value " + value + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
}
}
public boolean delete(ByteArray key, Version version) throws PersistenceFailureException {
+
StoreUtils.assertValidKey(key);
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
- boolean deletedSomething = false;
- Cursor cursor = null;
Transaction transaction = null;
try {
transaction = this.environment.beginTransaction(null, null);
DatabaseEntry keyEntry = new DatabaseEntry(key.get());
- DatabaseEntry valueEntry = new DatabaseEntry();
- cursor = getBdbDatabase().openCursor(transaction, null);
- OperationStatus status = cursor.getSearchKey(keyEntry,
- valueEntry,
- LockMode.READ_UNCOMMITTED);
- while(status == OperationStatus.SUCCESS) {
- // if version is null no comparison is necessary
- if(new VectorClock(valueEntry.getData()).compare(version) == Occurred.BEFORE) {
- cursor.delete();
- deletedSomething = true;
+
+ if(version == null) {
+ // unversioned delete. Just blow away the whole thing
+ OperationStatus status = getBdbDatabase().delete(transaction, keyEntry);
+ if(OperationStatus.SUCCESS == status)
+ return true;
+ else
+ return false;
+ } else {
+ // versioned deletes; need to determine what to delete
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
+ // do a get for the existing values
+ OperationStatus status = getBdbDatabase().get(transaction,
+ keyEntry,
+ valueEntry,
+ readLockMode);
+ // key does not exist to begin with.
+ if(OperationStatus.NOTFOUND == status)
+ return false;
+
+ List<Versioned<byte[]>> vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ Iterator<Versioned<byte[]>> iter = vals.iterator();
+ int numVersions = vals.size();
+ int numDeletedVersions = 0;
+
+ // go over the versions and remove everything before the
+ // supplied version
+ while(iter.hasNext()) {
+ Versioned<byte[]> curr = iter.next();
+ Version currentVersion = curr.getVersion();
+ if(currentVersion.compare(version) == Occurred.BEFORE) {
+ iter.remove();
+ numDeletedVersions++;
+ }
}
- status = cursor.getNextDup(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
+
+ if(numDeletedVersions < numVersions) {
+ // we still have some valid versions
+ valueEntry.setData(StoreBinaryFormat.toByteArray(vals));
+ getBdbDatabase().put(transaction, keyEntry, valueEntry);
+ } else {
+ // we have deleted all the versions; so get rid of the entry
+ // in the database
+ getBdbDatabase().delete(transaction, keyEntry);
+ }
+ return numDeletedVersions > 0;
}
- return deletedSomething;
} catch(DatabaseException e) {
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
-
+ attemptCommit(transaction);
if(logger.isTraceEnabled()) {
logger.trace("Completed DELETE of key " + ByteUtils.toHexString(key.get())
+ " (keyRef: " + System.identityHashCode(key) + ") in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
-
- try {
- attemptClose(cursor);
- } finally {
- attemptCommit(transaction);
- }
}
}
@@ -491,16 +457,6 @@ private void attemptCommit(Transaction transaction) {
}
}
- private static void attemptClose(Cursor cursor) {
- try {
- if(cursor != null)
- cursor.close();
- } catch(DatabaseException e) {
- logger.error("Error closing cursor.", e);
- throw new PersistenceFailureException(e.getMessage(), e);
- }
- }
-
public DatabaseStats getStats(boolean setFast) {
try {
StatsConfig config = new StatsConfig();
@@ -530,121 +486,127 @@ public BdbEnvironmentStats getBdbEnvironmentStats() {
private static abstract class BdbIterator<T> implements ClosableIterator<T> {
- private final boolean noValues;
- final Cursor cursor;
-
- private T current;
private volatile boolean isOpen;
+ final Cursor cursor;
- public BdbIterator(Cursor cursor, boolean noValues) {
+ BdbIterator(Cursor cursor) {
this.cursor = cursor;
isOpen = true;
- this.noValues = noValues;
- DatabaseEntry keyEntry = new DatabaseEntry();
- DatabaseEntry valueEntry = new DatabaseEntry();
- if(noValues)
- valueEntry.setPartial(true);
- try {
- cursor.getFirst(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
- } catch(DatabaseException e) {
- logger.error(e);
- throw new PersistenceFailureException(e);
- }
- if(keyEntry.getData() != null)
- current = get(keyEntry, valueEntry);
}
- protected abstract T get(DatabaseEntry key, DatabaseEntry value);
-
- protected abstract void moveCursor(DatabaseEntry key, DatabaseEntry value)
- throws DatabaseException;
-
- public final boolean hasNext() {
- return current != null;
- }
-
- public final T next() {
- if(!isOpen)
- throw new PersistenceFailureException("Call to next() on a closed iterator.");
-
- DatabaseEntry keyEntry = new DatabaseEntry();
- DatabaseEntry valueEntry = new DatabaseEntry();
- if(noValues)
- valueEntry.setPartial(true);
+ public final void close() {
try {
- moveCursor(keyEntry, valueEntry);
+ if(isOpen) {
+ cursor.close();
+ isOpen = false;
+ }
} catch(DatabaseException e) {
logger.error(e);
- throw new PersistenceFailureException(e);
}
- T previous = current;
- if(keyEntry.getData() == null)
- current = null;
- else
- current = get(keyEntry, valueEntry);
-
- return previous;
}
public final void remove() {
- throw new UnsupportedOperationException("No removal y'all.");
- }
-
- public final void close() {
- try {
- cursor.close();
- isOpen = false;
- } catch(DatabaseException e) {
- logger.error(e);
- }
+ throw new UnsupportedOperationException("No removal");
}
@Override
protected final void finalize() {
if(isOpen) {
- logger.error("Failure to close cursor, will be forcably closed.");
+ logger.error("Failure to close cursor, will be forcibly closed.");
close();
}
-
}
}
- private static class BdbKeysIterator extends BdbIterator<ByteArray> {
+ private static class BdbEntriesIterator extends BdbIterator<Pair<ByteArray, Versioned<byte[]>>> {
- public BdbKeysIterator(Cursor cursor) {
- super(cursor, true);
+ private List<Pair<ByteArray, Versioned<byte[]>>> cache;
+
+ public BdbEntriesIterator(Cursor cursor) {
+ super(cursor);
+ this.cache = new ArrayList<Pair<ByteArray, Versioned<byte[]>>>();
}
- @Override
- protected ByteArray get(DatabaseEntry key, DatabaseEntry value) {
- return new ByteArray(key.getData());
+ public boolean hasNext() {
+ // we have a next element if there is at least one cached
+ // element or we can make more
+ return cache.size() > 0 || makeMore();
}
- @Override
- protected void moveCursor(DatabaseEntry key, DatabaseEntry value) throws DatabaseException {
- cursor.getNextNoDup(key, value, LockMode.READ_UNCOMMITTED);
+ public Pair<ByteArray, Versioned<byte[]>> next() {
+ if(cache.size() == 0) {
+ if(!makeMore())
+ throw new NoSuchElementException("Iterated to end.");
+ }
+ // must now have at least one thing in the cache
+ return cache.remove(cache.size() - 1);
}
+ protected boolean makeMore() {
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ try {
+ OperationStatus status = cursor.getNext(keyEntry,
+ valueEntry,
+ LockMode.READ_UNCOMMITTED);
+
+ if(OperationStatus.NOTFOUND == status) {
+ // we have reached the end of the cursor
+ return false;
+ }
+
+ ByteArray key = new ByteArray(keyEntry.getData());
+ for(Versioned<byte[]> val: StoreBinaryFormat.fromByteArray(valueEntry.getData()))
+ this.cache.add(Pair.create(key, val));
+ return true;
+ } catch(DatabaseException e) {
+ logger.error(e);
+ throw new PersistenceFailureException(e);
+ }
+ }
}
- private static class BdbEntriesIterator extends BdbIterator<Pair<ByteArray, Versioned<byte[]>>> {
+ private static class BdbKeysIterator extends BdbIterator<ByteArray> {
- public BdbEntriesIterator(Cursor cursor) {
- super(cursor, false);
+ ByteArray current = null;
+
+ public BdbKeysIterator(Cursor cursor) {
+ super(cursor);
}
- @Override
- protected Pair<ByteArray, Versioned<byte[]>> get(DatabaseEntry key, DatabaseEntry value) {
- VectorClock clock = new VectorClock(value.getData());
- byte[] bytes = ByteUtils.copy(value.getData(),
- clock.sizeInBytes(),
- value.getData().length);
- return Pair.create(new ByteArray(key.getData()), new Versioned<byte[]>(bytes, clock));
+ public boolean hasNext() {
+ return current != null || fetchNextKey();
}
- @Override
- protected void moveCursor(DatabaseEntry key, DatabaseEntry value) throws DatabaseException {
- cursor.getNext(key, value, LockMode.READ_UNCOMMITTED);
+ public ByteArray next() {
+ ByteArray result = null;
+ if(current == null) {
+ if(!fetchNextKey())
+ throw new NoSuchElementException("Iterated to end.");
+ }
+ result = current;
+ current = null;
+ return result;
+ }
+
+ private boolean fetchNextKey() {
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ valueEntry.setPartial(true);
+ try {
+ OperationStatus status = cursor.getNext(keyEntry,
+ valueEntry,
+ LockMode.READ_UNCOMMITTED);
+ if(OperationStatus.NOTFOUND == status) {
+ // we have reached the end of the cursor
+ return false;
+ }
+ current = new ByteArray(keyEntry.getData());
+ return true;
+ } catch(DatabaseException e) {
+ logger.error(e);
+ throw new PersistenceFailureException(e);
+ }
}
}
View
23 src/java/voldemort/versioning/VectorClock.java
@@ -114,24 +114,26 @@ public VectorClock(byte[] bytes, int offset) {
public byte[] toBytes() {
byte[] serialized = new byte[sizeInBytes()];
+ toBytes(serialized, 0);
+ return serialized;
+ }
+
+ public int toBytes(byte[] buf, int offset) {
// write the number of versions
- ByteUtils.writeShort(serialized, (short) versions.size(), 0);
+ ByteUtils.writeShort(buf, (short) versions.size(), offset);
// write the size of each version in bytes
byte versionSize = ByteUtils.numberOfBytesRequired(getMaxVersion());
- serialized[2] = versionSize;
+ buf[offset + 2] = versionSize;
int clockEntrySize = ByteUtils.SIZE_OF_SHORT + versionSize;
- int start = 3;
+ int start = offset + 3;
for(ClockEntry v: versions) {
- ByteUtils.writeShort(serialized, v.getNodeId(), start);
- ByteUtils.writeBytes(serialized,
- v.getVersion(),
- start + ByteUtils.SIZE_OF_SHORT,
- versionSize);
+ ByteUtils.writeShort(buf, v.getNodeId(), start);
+ ByteUtils.writeBytes(buf, v.getVersion(), start + ByteUtils.SIZE_OF_SHORT, versionSize);
start += clockEntrySize;
}
- ByteUtils.writeLong(serialized, this.timestamp, start);
- return serialized;
+ ByteUtils.writeLong(buf, this.timestamp, start);
+ return sizeInBytes();
}
public int sizeInBytes() {
@@ -225,6 +227,7 @@ public String toString() {
builder.append(this.versions.get(this.versions.size() - 1));
}
builder.append(")");
+ builder.append(" ts:" + timestamp);
return builder.toString();
}
View
2 test/unit/voldemort/store/bdb/BdbStorageEngineTest.java
@@ -71,7 +71,7 @@ protected void setUp() throws Exception {
this.databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setTransactional(true);
- databaseConfig.setSortedDuplicates(true);
+ databaseConfig.setSortedDuplicates(false);
this.database = environment.openDatabase(null, "test", databaseConfig);
this.runtimeConfig = new BdbRuntimeConfig();
runtimeConfig.setLockMode(LOCK_MODE);

0 comments on commit 692b63f

Please sign in to comment.