Skip to content

Commit

Permalink
Merge remote-tracking branch 'sj/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Aug 10, 2012
2 parents b647f8f + d7231a1 commit 9b2fa63
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 82 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -20,4 +20,5 @@ NANNY
_release
*.zip
.lein-deps-sum
*.iml

4 changes: 2 additions & 2 deletions src/jvm/storm/trident/state/OpaqueValue.java
Expand Up @@ -17,14 +17,14 @@ public OpaqueValue(Long currTxid, T val) {
this(currTxid, val, null);
}

public OpaqueValue update(Long batchTxid, T newVal) {
public OpaqueValue<T> update(Long batchTxid, T newVal) {
T prev;
if(batchTxid!=null && batchTxid.equals(this.currTxid)) {
prev = this.prev;
} else {
prev = this.curr;
}
return new OpaqueValue(batchTxid, newVal, prev);
return new OpaqueValue<T>(batchTxid, newVal, prev);
}

public T get(Long batchTxid) {
Expand Down
10 changes: 5 additions & 5 deletions src/jvm/storm/trident/state/map/CachedBatchReadsMap.java
Expand Up @@ -7,11 +7,11 @@
import storm.trident.state.ValueUpdater;

public class CachedBatchReadsMap<T> implements MapState<T> {
Map<List<Object>, Object> _cached = new HashMap();
Map<List<Object>, T> _cached = new HashMap<List<Object>, T>();

public MapState _delegate;
public MapState<T> _delegate;

public CachedBatchReadsMap(MapState delegate) {
public CachedBatchReadsMap(MapState<T> delegate) {
_delegate = delegate;
}

Expand All @@ -23,15 +23,15 @@ public List<T> multiGet(List<List<Object>> keys) {
for(int i=0; i<keys.size(); i++) {
List<Object> key = keys.get(i);
if(_cached.containsKey(key)) {
ret.set(i, (T) _cached.get(key));
ret.set(i, _cached.get(key));
}
}
}
return ret;
}

@Override
public List multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<T> vals = _delegate.multiUpdate(keys, updaters);
cache(keys, vals);
return vals;
Expand Down
38 changes: 19 additions & 19 deletions src/jvm/storm/trident/state/map/CachedMap.java
Expand Up @@ -9,24 +9,24 @@
/**
* Useful to layer over a map that communicates with a database. you generally layer opaque map over this over your database store
* @author nathan
* @param <T>
* @param <T>
*/
public class CachedMap<T> implements IBackingMap<T> {
LRUMap _cache;
IBackingMap _delegate;
public CachedMap(IBackingMap delegate, int cacheSize) {
_cache = new LRUMap(cacheSize);
LRUMap<List<Object>, T> _cache;
IBackingMap<T> _delegate;

public CachedMap(IBackingMap<T> delegate, int cacheSize) {
_cache = new LRUMap<List<Object>, T>(cacheSize);
_delegate = delegate;
}
}

@Override
public List<T> multiGet(List<List<Object>> keys) {
Map<List<Object>, T> results = new HashMap();
List<List<Object>> toGet = new ArrayList();
Map<List<Object>, T> results = new HashMap<List<Object>, T>();
List<List<Object>> toGet = new ArrayList<List<Object>>();
for(List<Object> key: keys) {
if(_cache.containsKey(key)) {
results.put(key, (T) _cache.get(key));
results.put(key, _cache.get(key));
} else {
toGet.add(key);
}
Expand All @@ -39,23 +39,23 @@ public List<T> multiGet(List<List<Object>> keys) {
_cache.put(key, val);
results.put(key, val);
}
List<T> ret = new ArrayList(keys.size());

List<T> ret = new ArrayList<T>(keys.size());
for(List<Object> key: keys) {
ret.add(results.get(key));
}
return ret;
}

@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
cache(keys, vals);
_delegate.multiPut(keys, vals);
public void multiPut(List<List<Object>> keys, List<T> values) {
cache(keys, values);
_delegate.multiPut(keys, values);
}
private void cache(List<List<Object>> keys, List<T> vals) {

private void cache(List<List<Object>> keys, List<T> values) {
for(int i=0; i<keys.size(); i++) {
_cache.put(keys.get(i), vals.get(i));
_cache.put(keys.get(i), values.get(i));
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/jvm/storm/trident/state/map/NonTransactionalMap.java
@@ -1,14 +1,14 @@
package storm.trident.state.map;

import storm.trident.state.ValueUpdater;

import java.util.ArrayList;
import java.util.List;
import storm.trident.state.TransactionalValue;
import storm.trident.state.ValueUpdater;


public class NonTransactionalMap<T> implements MapState<T> {
public static MapState build(IBackingMap<TransactionalValue> backing) {
return new NonTransactionalMap(backing);
public static <T> MapState<T> build(IBackingMap<T> backing) {
return new NonTransactionalMap<T>(backing);
}

IBackingMap<T> _backing;
Expand All @@ -25,11 +25,11 @@ public List<T> multiGet(List<List<Object>> keys) {
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<T> curr = _backing.multiGet(keys);
List<T> ret = new ArrayList(curr.size());
List<T> ret = new ArrayList<T>(curr.size());
for(int i=0; i<curr.size(); i++) {
T currVal = curr.get(i);
ValueUpdater updater = updaters.get(i);
ret.add((T) updater.update(currVal));
ValueUpdater<T> updater = updaters.get(i);
ret.add(updater.update(currVal));
}
_backing.multiPut(keys, ret);
return ret;
Expand Down
57 changes: 29 additions & 28 deletions src/jvm/storm/trident/state/map/OpaqueMap.java
@@ -1,30 +1,31 @@
package storm.trident.state.map;

import java.util.ArrayList;
import java.util.List;
import storm.trident.state.OpaqueValue;
import storm.trident.state.ValueUpdater;

import java.util.ArrayList;
import java.util.List;


public class OpaqueMap<T> implements MapState<T> {
public static MapState build(IBackingMap<OpaqueValue> backing) {
return new CachedBatchReadsMap(new OpaqueMap(backing));
public static <T> MapState<T> build(IBackingMap<OpaqueValue<T>> backing) {
return new CachedBatchReadsMap<T>(new OpaqueMap<T>(backing));
}

IBackingMap<OpaqueValue> _backing;
IBackingMap<OpaqueValue<T>> _backing;
Long _currTx;

protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
protected OpaqueMap(IBackingMap<OpaqueValue<T>> backing) {
_backing = backing;
}

@Override
public List<T> multiGet(List<List<Object>> keys) {
List<OpaqueValue> curr = _backing.multiGet(keys);
List<T> ret = new ArrayList(curr.size());
for(OpaqueValue val: curr) {
List<OpaqueValue<T>> curr = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(curr.size());
for(OpaqueValue<T> val: curr) {
if(val!=null) {
ret.add((T)val.get(_currTx));
ret.add(val.get(_currTx));
} else {
ret.add(null);
}
Expand All @@ -34,23 +35,23 @@ public List<T> multiGet(List<List<Object>> keys) {

@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<OpaqueValue> curr = _backing.multiGet(keys);
List<OpaqueValue> newVals = new ArrayList(curr.size());
List<T> ret = new ArrayList();
List<OpaqueValue<T>> curr = _backing.multiGet(keys);
List<OpaqueValue<T>> newVals = new ArrayList<OpaqueValue<T>>(curr.size());
List<T> ret = new ArrayList<T>();
for(int i=0; i<curr.size(); i++) {
OpaqueValue val = curr.get(i);
ValueUpdater updater = updaters.get(i);
Object prev;
OpaqueValue<T> val = curr.get(i);
ValueUpdater<T> updater = updaters.get(i);
T prev;
if(val==null) {
prev = null;
} else {
prev = val.get(_currTx);
}
Object newVal = updater.update(prev);
ret.add((T)newVal);
OpaqueValue newOpaqueVal;
T newVal = updater.update(prev);
ret.add(newVal);
OpaqueValue<T> newOpaqueVal;
if(val==null) {
newOpaqueVal = new OpaqueValue(_currTx, newVal);
newOpaqueVal = new OpaqueValue<T>(_currTx, newVal);
} else {
newOpaqueVal = val.update(_currTx, newVal);
}
Expand All @@ -62,9 +63,9 @@ public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)

@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
List<ValueUpdater> updaters = new ArrayList(vals.size());
List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size());
for(T val: vals) {
updaters.add(new ReplaceUpdater(val));
updaters.add(new ReplaceUpdater<T>(val));
}
multiUpdate(keys, updaters);
}
Expand All @@ -79,16 +80,16 @@ public void commit(Long txid) {
_currTx = null;
}

static class ReplaceUpdater implements ValueUpdater {
Object _o;
static class ReplaceUpdater<T> implements ValueUpdater<T> {
T _t;

public ReplaceUpdater(Object o) {
_o = o;
public ReplaceUpdater(T t) {
_t = t;
}

@Override
public Object update(Object stored) {
return _o;
public T update(Object stored) {
return _t;
}
}
}
41 changes: 21 additions & 20 deletions src/jvm/storm/trident/state/map/TransactionalMap.java
@@ -1,30 +1,31 @@
package storm.trident.state.map;

import java.util.ArrayList;
import java.util.List;
import storm.trident.state.TransactionalValue;
import storm.trident.state.ValueUpdater;

import java.util.ArrayList;
import java.util.List;


public class TransactionalMap<T> implements MapState<T> {
public static MapState build(IBackingMap<TransactionalValue> backing) {
return new CachedBatchReadsMap(new TransactionalMap(backing));
public static <T> MapState<T> build(IBackingMap<TransactionalValue<T>> backing) {
return new CachedBatchReadsMap<T>(new TransactionalMap<T>(backing));
}

IBackingMap<TransactionalValue> _backing;
IBackingMap<TransactionalValue<T>> _backing;
Long _currTx;

protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
protected TransactionalMap(IBackingMap<TransactionalValue<T>> backing) {
_backing = backing;
}

@Override
public List<T> multiGet(List<List<Object>> keys) {
List<TransactionalValue> vals = _backing.multiGet(keys);
List<TransactionalValue<T>> vals = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(vals.size());
for(TransactionalValue v: vals) {
for(TransactionalValue<T> v: vals) {
if(v!=null) {
ret.add((T) v.getVal());
ret.add(v.getVal());
} else {
ret.add(null);
}
Expand All @@ -34,23 +35,23 @@ public List<T> multiGet(List<List<Object>> keys) {

@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<TransactionalValue> curr = _backing.multiGet(keys);
List<TransactionalValue> newVals = new ArrayList(curr.size());
List<T> ret = new ArrayList();
List<TransactionalValue<T>> curr = _backing.multiGet(keys);
List<TransactionalValue<T>> newVals = new ArrayList<TransactionalValue<T>>(curr.size());
List<T> ret = new ArrayList<T>();
for(int i=0; i<curr.size(); i++) {
TransactionalValue val = curr.get(i);
ValueUpdater updater = updaters.get(i);
TransactionalValue newVal;
TransactionalValue<T> val = curr.get(i);
ValueUpdater<T> updater = updaters.get(i);
TransactionalValue<T> newVal;
if(val==null) {
newVal = new TransactionalValue(_currTx, updater.update(null));
newVal = new TransactionalValue<T>(_currTx, updater.update(null));
} else {
if(_currTx!=null && _currTx.equals(val.getTxid())) {
newVal = val;
} else {
newVal = new TransactionalValue(_currTx, updater.update(val.getVal()));
newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
}
}
ret.add((T)newVal.getVal());
ret.add(newVal.getVal());
newVals.add(newVal);
}
_backing.multiPut(keys, newVals);
Expand All @@ -59,9 +60,9 @@ public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)

@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());
List<TransactionalValue<T>> newVals = new ArrayList<TransactionalValue<T>>(vals.size());
for(T val: vals) {
newVals.add(new TransactionalValue(_currTx, val));
newVals.add(new TransactionalValue<T>(_currTx, val));
}
_backing.multiPut(keys, newVals);
}
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/storm/trident/testing/LRUMemoryMapState.java
Expand Up @@ -50,7 +50,7 @@ public LRUMemoryMapState(int cacheSize, String id) {

@Override
public List<T> multiGet(List<List<Object>> keys) {
List<T> ret = new ArrayList();
List<T> ret = new ArrayList<T>();
for(List<Object> key: keys) {
ret.add(db.get(key));
}
Expand Down

0 comments on commit 9b2fa63

Please sign in to comment.