Skip to content

Commit

Permalink
Addressing code review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Aug 29, 2013
1 parent af695ed commit 06b450d
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 355 deletions.
230 changes: 115 additions & 115 deletions clients/python/voldemort/protocol/voldemort_admin_pb2.py

Large diffs are not rendered by default.

45 changes: 35 additions & 10 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -2097,7 +2097,31 @@ public void updateEntries(int nodeId,
String storeName, String storeName,
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator, Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator,
VoldemortFilter filter) { VoldemortFilter filter) {
updateEntries(nodeId, storeName, entryIterator, filter, false); streamingUpdateEntries(nodeId, storeName, entryIterator, filter, false);
}

/**
* Update a stream of key/value entries at the given node in the same
* way as
* {@link StreamingOperations#updateEntries(int, String, Iterator, VoldemortFilter)}
*
* The only difference being the resolving on the server will happen
* based on timestamp and not the vector clock.
*
* @param nodeId Id of the remote node (where we wish to update the
* entries)
* @param storeName Store name for the entries
* @param entryIterator Iterator of key-value pairs for the entries
* @param filter Custom filter implementation to filter out entries
* which should not be updated.
*
* @throws VoldemortException
*/
public void updateEntriesTimeBased(int nodeId,
String storeName,
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator,
VoldemortFilter filter) {
streamingUpdateEntries(nodeId, storeName, entryIterator, filter, true);
} }


/** /**
Expand All @@ -2121,15 +2145,16 @@ public void updateEntries(int nodeId,
* @param entryIterator Iterator of key-value pairs for the entries * @param entryIterator Iterator of key-value pairs for the entries
* @param filter Custom filter implementation to filter out entries * @param filter Custom filter implementation to filter out entries
* which should not be updated. * which should not be updated.
* @param overWriteIfLatest overwrite the existing value if the supplied * @param overWriteIfLatestTs if true overwrite the existing value if
* version has greater timestamp * the supplied version has greater timestamp; else use vector
* clocks
* @throws VoldemortException * @throws VoldemortException
*/ */
public void updateEntries(int nodeId, private void streamingUpdateEntries(int nodeId,
String storeName, String storeName,
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator, Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator,
VoldemortFilter filter, VoldemortFilter filter,
boolean overWriteIfLatest) { boolean overWriteIfLatestTs) {
Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId); Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
SocketDestination destination = new SocketDestination(node.getHost(), SocketDestination destination = new SocketDestination(node.getHost(),
node.getAdminPort(), node.getAdminPort(),
Expand All @@ -2151,11 +2176,11 @@ public void updateEntries(int nodeId,
.build(); .build();
VAdminProto.UpdatePartitionEntriesRequest.Builder updateRequest = null; VAdminProto.UpdatePartitionEntriesRequest.Builder updateRequest = null;


if(overWriteIfLatest) { if(overWriteIfLatestTs) {
updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder() updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder()
.setStore(storeName) .setStore(storeName)
.setPartitionEntry(partitionEntry) .setPartitionEntry(partitionEntry)
.setOverwriteIfLatest(overWriteIfLatest); .setOverwriteIfLatestTs(overWriteIfLatestTs);
} else { } else {
updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder() updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder()
.setStore(storeName) .setStore(storeName)
Expand Down
422 changes: 211 additions & 211 deletions src/java/voldemort/client/protocol/pb/VAdminProto.java

Large diffs are not rendered by default.

Expand Up @@ -558,7 +558,7 @@ public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdateParti
StorageEngine<ByteArray, byte[], byte[]> storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository, StorageEngine<ByteArray, byte[], byte[]> storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository,
request.getStore()); request.getStore());


if(request.hasOverwriteIfLatest() && request.getOverwriteIfLatest()) { if(request.hasOverwriteIfLatestTs() && request.getOverwriteIfLatestTs()) {
// Resolve based on timestamp if specified. // Resolve based on timestamp if specified.
return new TimeBasedUpdatePartitionEntriesStreamRequestHandler(request, return new TimeBasedUpdatePartitionEntriesStreamRequestHandler(request,
errorCodeMapper, errorCodeMapper,
Expand Down
Expand Up @@ -82,10 +82,8 @@ protected FetchPartitionFileStreamRequestHandler(VAdminProto.FetchPartitionFiles
StoreRepository storeRepository) { StoreRepository storeRepository) {
this.request = request; this.request = request;
// TODO (Sid) : Confirm if keeping metadatastore as a class property is // TODO (Sid) : Confirm if keeping metadatastore as a class property is
// the best // the best way to do this. metadataStore is used later in the
// way to do this. metadataStore is used later in the // handleNextPartition() to create object plan.
// handleNextPartition() to create
// object plan.
this.metadataStore = metadataStore; this.metadataStore = metadataStore;


StoreDefinition storeDef = metadataStore.getStoreDef(request.getStoreName()); StoreDefinition storeDef = metadataStore.getStoreDef(request.getStoreName());
Expand Down
Expand Up @@ -9,7 +9,6 @@
import voldemort.server.VoldemortConfig; import voldemort.server.VoldemortConfig;
import voldemort.server.storage.KeyLockHandle; import voldemort.server.storage.KeyLockHandle;
import voldemort.store.ErrorCodeMapper; import voldemort.store.ErrorCodeMapper;
import voldemort.store.PersistenceFailureException;
import voldemort.store.StorageEngine; import voldemort.store.StorageEngine;
import voldemort.utils.ByteArray; import voldemort.utils.ByteArray;
import voldemort.utils.NetworkClassLoader; import voldemort.utils.NetworkClassLoader;
Expand Down Expand Up @@ -76,12 +75,12 @@ protected void processEntry(ByteArray key, Versioned<byte[]> value) throws IOExc
// greater than what we are trying to write in // greater than what we are trying to write in
storageEngine.releaseLock(handle); storageEngine.releaseLock(handle);
} }
} catch(PersistenceFailureException pfe) { } catch(Exception e) {
if(handle != null && !handle.isClosed()) { if(handle != null && !handle.isClosed()) {
storageEngine.releaseLock(handle); storageEngine.releaseLock(handle);
} }
logger.error("Error in time based update entries", pfe); logger.error("Error in time based update entries", e);
throw new IOException(pfe); throw new IOException(e);
} }
} }


Expand Down
Expand Up @@ -26,10 +26,7 @@
import voldemort.utils.ByteUtils; import voldemort.utils.ByteUtils;
import voldemort.utils.EventThrottler; import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader; import voldemort.utils.NetworkClassLoader;
<<<<<<< HEAD
=======
import voldemort.utils.Time; import voldemort.utils.Time;
>>>>>>> Timestamp based resolving for UpdateEntries
import voldemort.utils.Utils; import voldemort.utils.Utils;
import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Versioned; import voldemort.versioning.Versioned;
Expand All @@ -38,8 +35,14 @@
* UpdatePartitionEntriesStreamRequestHandler implements the streaming logic for * UpdatePartitionEntriesStreamRequestHandler implements the streaming logic for
* updating partition entries. * updating partition entries.
* *
* This is the base class, which simply reads a versioned entry from network and * This is the base class, which abstracts network IO to get an entry off the
* issues a storage engine put. * wire, and provides a hook
* {@link UpdatePartitionEntriesStreamRequestHandler#processEntry(ByteArray, Versioned)}
* to implement custom logic (if needed) to manage how the entry will be written
* to storage.
*
* The default implementation of processEntry(..) simply issues a storage engine
* put.
* *
*/ */


Expand Down
11 changes: 8 additions & 3 deletions src/java/voldemort/store/StorageEngine.java
Expand Up @@ -138,10 +138,14 @@ public interface StorageEngine<K, V, T> extends Store<K, V, T> {
/** /**
* Returns the list of versions stored for the key, at the same time locking * Returns the list of versions stored for the key, at the same time locking
* the key for any writes until * the key for any writes until
* {@link StorageEngine#putAndUnlock(Object, KeyLockHandle)} is called with * {@link StorageEngine#putAndUnlock(Object, KeyLockHandle)} or
* the same lock handle. The idea here is to facilitate custom atomic * {@link StorageEngine#releaseLock(KeyLockHandle)} is called with the same
* lock handle. The idea here is to facilitate custom atomic
* Read-Modify-Write logic outside the storage engine * Read-Modify-Write logic outside the storage engine
* *
* NOTE : An invocation of getAndLock should be followed by EXACTLY ONE call
* to either putAndLock or releaseLock, for resources to be freed properly
*
* @param key * @param key
* @return * @return
*/ */
Expand All @@ -160,7 +164,8 @@ public interface StorageEngine<K, V, T> extends Store<K, V, T> {


/** /**
* Release any lock held by a prior * Release any lock held by a prior
* {@link AbstractStorageEngine#getAndLock(Object)} call * {@link AbstractStorageEngine#getAndLock(Object)} call. Helpful for
* exception handling during a read-modify-cycle
* *
* @param handle * @param handle
*/ */
Expand Down
3 changes: 3 additions & 0 deletions src/java/voldemort/store/bdb/BdbStorageEngine.java
Expand Up @@ -690,6 +690,9 @@ public KeyLockHandle<byte[]> getAndLock(ByteArray key) {
handle = new KeyLockHandle<byte[]>(vals, transaction); handle = new KeyLockHandle<byte[]>(vals, transaction);
} catch(DatabaseException e) { } catch(DatabaseException e) {
this.bdbEnvironmentStats.reportException(e); this.bdbEnvironmentStats.reportException(e);
// Unless we return out properly from this method, we need to ensure
// the transaction handle is closed on exception..
attemptAbort(transaction);
logger.error("Error in getAndLock for store " + this.getName(), e); logger.error("Error in getAndLock for store " + this.getName(), e);
throw new PersistenceFailureException(e); throw new PersistenceFailureException(e);
} finally { } finally {
Expand Down
2 changes: 1 addition & 1 deletion src/proto/voldemort-admin.proto
Expand Up @@ -43,7 +43,7 @@ message UpdatePartitionEntriesRequest {
required string store = 1; required string store = 1;
required PartitionEntry partition_entry = 2; required PartitionEntry partition_entry = 2;
optional VoldemortFilter filter = 3; optional VoldemortFilter filter = 3;
optional bool overwriteIfLatest = 4; optional bool overwriteIfLatestTs = 4;
} }


message UpdatePartitionEntriesResponse { message UpdatePartitionEntriesResponse {
Expand Down
2 changes: 1 addition & 1 deletion test/unit/voldemort/client/AdminServiceBasicTest.java
Expand Up @@ -1587,7 +1587,7 @@ protected Pair<ByteArray, Versioned<byte[]>> computeNext() {
} }
}; };


getAdminClient().streamingOps.updateEntries(0, storeName, iterator, null, true); getAdminClient().streamingOps.updateEntriesTimeBased(0, storeName, iterator, null);


// check updated values // check updated values
for(int i = 0; i < keys.size(); i++) { for(int i = 0; i < keys.size(); i++) {
Expand Down

0 comments on commit 06b450d

Please sign in to comment.