Skip to content

Commit

Permalink
Rebalancing patch 2.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Apr 26, 2009
1 parent 2d1b21b commit 5ba2b99
Show file tree
Hide file tree
Showing 37 changed files with 647 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
import org.mongodb.driver.ts.DBCollection;
import org.mongodb.driver.ts.DBCursor;
import org.mongodb.driver.ts.Doc;
import org.mongodb.driver.ts.IndexInfo;
import org.mongodb.driver.ts.Mongo;
import org.mongodb.driver.ts.MongoSelector;
import org.mongodb.driver.ts.IndexInfo;
import org.mongodb.driver.util.BSONObject;
import org.mongodb.driver.util.types.BSONBytes;

import voldemort.VoldemortException;
import voldemort.store.NoSuchCapabilityException;
import voldemort.store.StorageEngine;
import voldemort.store.StoreCapabilityType;
import voldemort.store.StoreUtils;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
Expand Down Expand Up @@ -105,14 +107,14 @@ public MongoDBStorageEngine(String name) throws MongoDBException {
init();
}

protected final void init() throws MongoDBException {
protected final void init() throws MongoDBException {
_mongoDB = new Mongo("127.0.0.1", 27017);
_db = _mongoDB.getDB(DB_NAME);
_coll = _db.getCollection(_collectionName);

_coll.createIndex(new IndexInfo("k_1", KEY));
}

public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
try {
return new MongoDBClosableIterator();
Expand Down Expand Up @@ -142,11 +144,10 @@ public List<Versioned<byte[]>> get(ByteArray key) throws VoldemortException {
new VectorClock(d.getBytes(CLOCK)));
list.add(val);
}
} catch (MongoDBIOException mioe) {
} catch(MongoDBIOException mioe) {
try {
init();
}
catch(MongoDBException ee) {
} catch(MongoDBException ee) {
ee.printStackTrace();
}
throw new VoldemortException(mioe);
Expand Down Expand Up @@ -203,8 +204,7 @@ public void put(ByteArray key, Versioned<byte[]> value) throws VoldemortExceptio

if(occured == Occured.BEFORE) {
throw new ObsoleteVersionException("Key '" + strKey + " is obsolete.");
}
else if(occured == Occured.AFTER) {
} else if(occured == Occured.AFTER) {
_coll.remove(new MongoSelector(d));
}

Expand All @@ -220,11 +220,10 @@ else if(occured == Occured.AFTER) {
newData.put(CLOCK, ((VectorClock) value.getVersion()).toBytes());

_coll.insert(newData);
} catch (MongoDBIOException mioe) {
} catch(MongoDBIOException mioe) {
try {
init();
}
catch(MongoDBException ee) {
} catch(MongoDBException ee) {
ee.printStackTrace();
}
throw new VoldemortException(mioe);
Expand Down Expand Up @@ -255,11 +254,10 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
}

return deleted;
} catch (MongoDBIOException mioe) {
} catch(MongoDBIOException mioe) {
try {
init();
}
catch(MongoDBException ee) {
} catch(MongoDBException ee) {
ee.printStackTrace();
}
throw new VoldemortException(mioe);
Expand Down Expand Up @@ -316,7 +314,7 @@ private void closeCursor(DBCursor cur) {
* Driver was designed w/ a set of expectations for threading and therefore
* buffer management (for perf reasons) that don't exactly align w/ the use
* case here
*
*
* @return the current TLS
*/
private DirectBufferTLS getTLS() {
Expand All @@ -329,14 +327,19 @@ private DirectBufferTLS getTLS() {
return tls;
}

public Object getCapability(StoreCapabilityType capability) {
throw new NoSuchCapabilityException(capability, getName());
}

public class MongoDBClosableIterator implements
ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {

BSONObject _bo = new BSONObject();
protected DBCursor _cursor;

public MongoDBClosableIterator() throws MongoDBException {
getTLS(); // TODO - will be a problem if someone hands this iterator across threads
getTLS(); // TODO - will be a problem if someone hands this iterator
// across threads
_cursor = _coll.find();
}

Expand Down
22 changes: 12 additions & 10 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return getStoreClient(storeName, null);
}

@SuppressWarnings("unchecked")
public <K, V> StoreClient<K, V> getStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> inconsistencyResolver) {
InconsistencyResolver<Versioned<V>> resolver) {

return new DefaultStoreClient<K, V>(storeName, resolver, this, 3);
}

@SuppressWarnings("unchecked")
public <K, V> Store<K, V> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
// Get cluster and store metadata
String clusterXml = bootstrapMetadata(MetadataStore.CLUSTER_KEY, bootstrapUrls);
Cluster cluster = clusterMapper.readCluster(new StringReader(clusterXml));
Expand All @@ -117,7 +123,7 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
node.getHost(),
getPort(node));
if(enableVerboseLogging)
store = new LoggingStore(store);
store = new LoggingStore<ByteArray, byte[]>(store);
clientMapping.put(node.getId(), store);
}

Expand All @@ -144,16 +150,12 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,

// Add inconsistency resolving decorator, using their inconsistency
// resolver (if they gave us one)
InconsistencyResolver<Versioned<V>> secondaryResolver = inconsistencyResolver == null ? new TimeBasedInconsistencyResolver()
: inconsistencyResolver;
InconsistencyResolver<Versioned<V>> secondaryResolver = resolver == null ? new TimeBasedInconsistencyResolver()
: resolver;
Store<K, V> resolvingStore = new InconsistencyResolvingStore<K, V>(serializingStore,
new ChainedResolver<Versioned<V>>(new VectorClockInconsistencyResolver(),
secondaryResolver));

return new DefaultStoreClient<K, V>(resolvingStore,
keySerializer,
valueSerializer,
routingStrategy);
return resolvingStore;
}

private String bootstrapMetadata(String key, URI[] urls) {
Expand Down
107 changes: 75 additions & 32 deletions src/java/voldemort/client/DefaultStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.serialization.Serializer;
import voldemort.store.InvalidMetadataException;
import voldemort.store.Store;
import voldemort.store.StoreCapabilityType;
import voldemort.utils.Utils;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.InconsistentDataException;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
Expand All @@ -49,34 +52,53 @@ public class DefaultStoreClient<K, V> implements StoreClient<K, V> {

private final Versioned<V> NOT_FOUND = new Versioned<V>(null, null);

private final Store<K, V> store;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final RoutingStrategy routingStragy;
private final StoreClientFactory storeFactory;

public DefaultStoreClient(Store<K, V> store,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
RoutingStrategy routingStrategy) {
this.store = Utils.notNull(store);
this.keySerializer = Utils.notNull(keySerializer);
this.valueSerializer = Utils.notNull(valueSerializer);
this.routingStragy = routingStrategy;
private final int metadataRefreshAttempts;
private final String storeName;
private final InconsistencyResolver<Versioned<V>> resolver;
private volatile Store<K, V> store;

public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts) {
this.storeName = Utils.notNull(storeName);
this.resolver = resolver;
this.storeFactory = Utils.notNull(storeFactory);
this.metadataRefreshAttempts = maxMetadataRefreshAttempts;
reinit();
}

private void reinit() {
this.store = storeFactory.getRawStore(storeName, resolver);
}

public boolean delete(K key) {
Versioned<V> versioned = get(key);
if(versioned == null)
return false;
return store.delete(key, versioned.getVersion());
return delete(key, versioned.getVersion());
}

public boolean delete(K key, Version version) {
return store.delete(key, version);
for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
try {
return store.delete(key, version);
} catch(InvalidMetadataException e) {
reinit();
}
}
throw new InvalidMetadataException(this.metadataRefreshAttempts
+ " metadata refresh attempts failed.");
}

public V getValue(K key, V defaultValue) {
return get(key).getValue();
Versioned<V> versioned = get(key);
if(versioned == null)
return null;
else
return versioned.getValue();
}

public V getValue(K key) {
Expand All @@ -88,8 +110,16 @@ public V getValue(K key) {
}

public Versioned<V> get(K key, Versioned<V> defaultValue) {
List<Versioned<V>> items = store.get(key);
return getItemOrThrow(key, defaultValue, items);
for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
try {
List<Versioned<V>> items = store.get(key);
return getItemOrThrow(key, defaultValue, items);
} catch(InvalidMetadataException e) {
reinit();
}
}
throw new InvalidMetadataException(this.metadataRefreshAttempts
+ " metadata refresh attempts failed.");
}

private Versioned<V> getItemOrThrow(K key, Versioned<V> defaultValue, List<Versioned<V>> items) {
Expand All @@ -107,7 +137,18 @@ public Versioned<V> get(K key) {
}

public Map<K, Versioned<V>> getAll(Iterable<K> keys) {
Map<K, List<Versioned<V>>> items = store.getAll(keys);
Map<K, List<Versioned<V>>> items = null;
for(int attempts = 0;; attempts++) {
if(attempts < this.metadataRefreshAttempts)
throw new InvalidMetadataException(this.metadataRefreshAttempts
+ " metadata refresh attempts failed.");
try {
items = store.getAll(keys);
break;
} catch(InvalidMetadataException e) {
reinit();
}
}
Map<K, Versioned<V>> result = Maps.newHashMapWithExpectedSize(items.size());

for(Entry<K, List<Versioned<V>>> mapEntry: items.entrySet()) {
Expand All @@ -122,20 +163,29 @@ public void put(K key, V value) {
if(versioned == NOT_FOUND)
versioned = new Versioned<V>(value, new VectorClock());
versioned.setObject(value);
store.put(key, versioned);
put(key, versioned);
}

public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
try {
store.put(key, versioned);
put(key, versioned);
return true;
} catch(ObsoleteVersionException e) {
return false;
}
}

public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
store.put(key, versioned);
for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
try {
store.put(key, versioned);
return;
} catch(InvalidMetadataException e) {
reinit();
}
}
throw new InvalidMetadataException(this.metadataRefreshAttempts
+ " metadata refresh attempts failed.");
}

public boolean applyUpdate(UpdateAction<K, V> action) {
Expand Down Expand Up @@ -164,18 +214,11 @@ public boolean applyUpdate(UpdateAction<K, V> action, int maxTries) {
return false;
}

public Serializer<K> getKeySerializer() {
return this.keySerializer;
}

public Serializer<V> getValueSerializer() {
return this.valueSerializer;
}

public List<Node> getResponsibleNodes(K key) {
if(this.routingStragy == null)
throw new UnsupportedOperationException("This store client has no routing strategy.");
return this.routingStragy.routeRequest(keySerializer.toBytes(key));
RoutingStrategy strategy = (RoutingStrategy) store.getCapability(StoreCapabilityType.ROUTING_STRATEGY);
@SuppressWarnings("unchecked")
Serializer<K> keySerializer = (Serializer<K>) store.getCapability(StoreCapabilityType.KEY_SERIALIZER);
return strategy.routeRequest(keySerializer.toBytes(key));
}

}
Loading

0 comments on commit 5ba2b99

Please sign in to comment.