Permalink
Browse files

Fix TransactionalMap and OpaqueMap to correctly do multiple updates t…

…o the same key in the same batch
  • Loading branch information...
1 parent 141aad4 commit c876e719c2f8edf77e2c800a49586a1c048805fa @nathanmarz committed Feb 18, 2013
View
@@ -13,6 +13,7 @@
* Bug fix: Supervisor provides full path to workers to logging config rather than relative path (thanks revans2)
* Bug fix: Call ReducerAggregator#init properly when used within persistentAggregate (thanks lorcan)
* Bug fix: Set component-specific configs correctly for Trident spouts
+ * Bug fix: Fix TransactionalMap and OpaqueMap to correctly do multiple updates to the same key in the same batch
## 0.8.2
@@ -6,38 +6,45 @@
import java.util.Map;
import storm.trident.state.ValueUpdater;
-public class CachedBatchReadsMap<T> implements MapState<T> {
+
+public class CachedBatchReadsMap<T> {
+ public static class RetVal<T> {
+ public boolean cached;
+ public T val;
+
+ public RetVal(T v, boolean c) {
+ val = v;
+ cached = c;
+ }
+ }
+
Map<List<Object>, T> _cached = new HashMap<List<Object>, T>();
- public MapState<T> _delegate;
+ public IBackingMap<T> _delegate;
- public CachedBatchReadsMap(MapState<T> delegate) {
+ public CachedBatchReadsMap(IBackingMap<T> delegate) {
_delegate = delegate;
}
+
+ public void reset() {
+ _cached.clear();
+ }
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<T> ret = _delegate.multiGet(keys);
- if(!_cached.isEmpty()) {
- ret = new ArrayList<T>(ret);
- for(int i=0; i<keys.size(); i++) {
- List<Object> key = keys.get(i);
- if(_cached.containsKey(key)) {
- ret.set(i, _cached.get(key));
- }
+ public List<RetVal<T>> multiGet(List<List<Object>> keys) {
+ // TODO: can optimize further by only querying backing map for keys not in the cache
+ List<T> vals = _delegate.multiGet(keys);
+ List<RetVal<T>> ret = new ArrayList(vals.size());
+ for(int i=0; i<keys.size(); i++) {
+ List<Object> key = keys.get(i);
+ if(_cached.containsKey(key)) {
+ ret.add(new RetVal(_cached.get(key), true));
+ } else {
+ ret.add(new RetVal(vals.get(i), false));
}
}
return ret;
}
- @Override
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<T> vals = _delegate.multiUpdate(keys, updaters);
- cache(keys, vals);
- return vals;
- }
-
- @Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
_delegate.multiPut(keys, vals);
cache(keys, vals);
@@ -51,16 +58,6 @@ private void cache(List<List<Object>> keys, List<T> vals) {
}
}
- @Override
- public void beginCommit(Long txid) {
- _cached.clear(); //if a commit was pending and failed, we need to make sure to clear the cache
- _delegate.beginCommit(txid);
- }
- @Override
- public void commit(Long txid) {
- _cached.clear();
- _delegate.commit(txid);
- }
}
@@ -9,21 +9,22 @@
public class OpaqueMap<T> implements MapState<T> {
public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {
- return new CachedBatchReadsMap<T>(new OpaqueMap<T>(backing));
+ return new OpaqueMap<T>(backing);
}
- IBackingMap<OpaqueValue> _backing;
+ CachedBatchReadsMap<OpaqueValue> _backing;
Long _currTx;
protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
- _backing = backing;
+ _backing = new CachedBatchReadsMap(backing);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
- List<OpaqueValue> curr = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(curr.size());
- for(OpaqueValue val: curr) {
+ for(CachedBatchReadsMap.RetVal<OpaqueValue> retval: curr) {
+ OpaqueValue val = retval.val;
if(val!=null) {
ret.add((T) val.get(_currTx));
} else {
@@ -35,17 +36,22 @@ protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<OpaqueValue> curr = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size());
List<T> ret = new ArrayList<T>();
for(int i=0; i<curr.size(); i++) {
- OpaqueValue<T> val = curr.get(i);
+ CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i);
+ OpaqueValue<T> val = retval.val;
ValueUpdater<T> updater = updaters.get(i);
T prev;
if(val==null) {
prev = null;
} else {
- prev = val.get(_currTx);
+ if(retval.cached) {
+ prev = val.getCurr();
+ } else {
+ prev = val.get(_currTx);
+ }
}
T newVal = updater.update(prev);
ret.add(newVal);
@@ -73,11 +79,13 @@ public void multiPut(List<List<Object>> keys, List<T> vals) {
@Override
public void beginCommit(Long txid) {
_currTx = txid;
+ _backing.reset();
}
@Override
public void commit(Long txid) {
_currTx = null;
+ _backing.reset();
}
static class ReplaceUpdater<T> implements ValueUpdater<T> {
@@ -9,21 +9,22 @@
public class TransactionalMap<T> implements MapState<T> {
public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
- return new CachedBatchReadsMap<T>(new TransactionalMap<T>(backing));
+ return new TransactionalMap<T>(backing);
}
-
- IBackingMap<TransactionalValue> _backing;
+
+ CachedBatchReadsMap<TransactionalValue> _backing;
Long _currTx;
protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
- _backing = backing;
+ _backing = new CachedBatchReadsMap(backing);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
- List<TransactionalValue> vals = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(vals.size());
- for(TransactionalValue v: vals) {
+ for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {
+ TransactionalValue v = retval.val;
if(v!=null) {
ret.add((T) v.getVal());
} else {
@@ -35,26 +36,36 @@ protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<TransactionalValue> curr = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
+ List<List<Object>> newKeys = new ArrayList();
List<T> ret = new ArrayList<T>();
for(int i=0; i<curr.size(); i++) {
- TransactionalValue<T> val = curr.get(i);
+ CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
+ TransactionalValue<T> val = retval.val;
ValueUpdater<T> updater = updaters.get(i);
TransactionalValue<T> newVal;
+ boolean changed = false;
if(val==null) {
newVal = new TransactionalValue<T>(_currTx, updater.update(null));
+ changed = true;
} else {
- if(_currTx!=null && _currTx.equals(val.getTxid())) {
+ if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {
newVal = val;
} else {
newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
- }
+ changed = true;
+ }
}
ret.add(newVal.getVal());
- newVals.add(newVal);
+ if(changed) {
+ newVals.add(newVal);
+ newKeys.add(keys.get(i));
+ }
+ }
+ if(!newKeys.isEmpty()) {
+ _backing.multiPut(newKeys, newVals);
}
- _backing.multiPut(keys, newVals);
return ret;
}
@@ -70,10 +81,12 @@ public void multiPut(List<List<Object>> keys, List<T> vals) {
@Override
public void beginCommit(Long txid) {
_currTx = txid;
+ _backing.reset();
}
@Override
public void commit(Long txid) {
_currTx = null;
- }
+ _backing.reset();
+ }
}
@@ -0,0 +1,30 @@
+package storm.trident.testing;
+
+import storm.trident.state.map.IBackingMap;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MemoryBackingMap implements IBackingMap<Object> {
+ Map _vals = new HashMap();
+
+ @Override
+ public List<Object> multiGet(List<List<Object>> keys) {
+ List ret = new ArrayList();
+ for(List key: keys) {
+ ret.add(_vals.get(key));
+ }
+ return ret;
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keys, List<Object> vals) {
+ for(int i=0; i<keys.size(); i++) {
+ List key = keys.get(i);
+ Object val = vals.get(i);
+ _vals.put(key, val);
+ }
+ }
+}

0 comments on commit c876e71

Please sign in to comment.