Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into derekd-blowfishtupleserializer-test

  • Loading branch information...
commit c9f31839f428df9da4f47c9bd3d259b35136cfcb 2 parents 2f81fe0 + e6eea02
@d2r d2r authored
View
5 CHANGELOG.md
@@ -7,9 +7,14 @@
* Revamped Trident spout interfaces to support more dynamic spouts, such as a spout who reads from a changing set of brokers
* How tuples are serialized is now pluggable (thanks anfeng)
* Added blowfish encryption based tuple serialization (thanks anfeng)
+ * Have storm fall back to installed storm.yaml (thanks revans2)
+ * Improve error message when Storm detects bundled storm.yaml to show the URL's for offending resources (thanks revans2)
+ * Nimbus throws NotAliveException instead of FileNotFoundException from various query methods when topology is no longer alive (thanks revans2)
* Bug fix: Supervisor provides full path to workers to logging config rather than relative path (thanks revans2)
* Bug fix: Call ReducerAggregator#init properly when used within persistentAggregate (thanks lorcan)
* Bug fix: Set component-specific configs correctly for Trident spouts
+ * Bug fix: Fix TransactionalMap and OpaqueMap to correctly do multiple updates to the same key in the same batch
+ * Bug fix: Fix race condition between supervisor and Nimbus that could lead to stormconf.ser errors and infinite crashing of supervisor
## 0.8.2
View
40 bin/storm
@@ -22,11 +22,15 @@ if sys.platform == "cygwin":
else:
normclasspath = identity
-CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.realpath( __file__ ).split("/")[:-2])
+USER_CONF_DIR = os.path.expanduser("~/.storm")
+CLUSTER_CONF_DIR = STORM_DIR + "/conf"
+if (not os.path.isfile(USER_CONF_DIR + "/storm.yaml")):
+ USER_CONF_DIR = CLUSTER_CONF_DIR
CONFIG_OPTS = []
CONFFILE = ""
+
def get_config_opts():
global CONFIG_OPTS
return "-Dstorm.options=" + (','.join(CONFIG_OPTS)).replace(' ', "%%%%")
@@ -73,7 +77,7 @@ def print_localconfvalue(name):
The local Storm configs are the ones in ~/.storm/storm.yaml merged
in with the configs in defaults.yaml.
"""
- print name + ": " + confvalue(name, [CONF_DIR])
+ print name + ": " + confvalue(name, [USER_CONF_DIR])
def print_remoteconfvalue(name):
"""Syntax: [storm remoteconfvalue conf-name]
@@ -84,7 +88,7 @@ def print_remoteconfvalue(name):
This command must be run on a cluster machine.
"""
- print name + ": " + confvalue(name, [STORM_DIR + "/conf"])
+ print name + ": " + confvalue(name, [CLUSTER_CONF_DIR])
def parse_args(string):
r"""Takes a string of whitespace-separated tokens and parses it into a list.
@@ -132,7 +136,7 @@ def jar(jarfile, klass, *args):
exec_storm_class(
klass,
jvmtype="-client",
- extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
+ extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
args=args,
jvmopts=["-Dstorm.jar=" + jarfile])
@@ -150,7 +154,7 @@ def kill(*args):
"backtype.storm.command.kill_topology",
args=args,
jvmtype="-client",
- extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+ extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
def activate(*args):
"""Syntax: [storm activate topology-name]
@@ -161,7 +165,7 @@ def activate(*args):
"backtype.storm.command.activate",
args=args,
jvmtype="-client",
- extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+ extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
def listtopos(*args):
"""Syntax: [storm list]
@@ -172,7 +176,7 @@ def listtopos(*args):
"backtype.storm.command.list",
args=args,
jvmtype="-client",
- extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+ extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
def deactivate(*args):
"""Syntax: [storm deactivate topology-name]
@@ -183,7 +187,7 @@ def deactivate(*args):
"backtype.storm.command.deactivate",
args=args,
jvmtype="-client",
- extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+ extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
def rebalance(*args):
"""Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]
@@ -210,7 +214,7 @@ def rebalance(*args):
"backtype.storm.command.rebalance",
args=args,
jvmtype="-client",
- extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+ extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])
def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
@@ -221,7 +225,7 @@ def shell(resourcesdir, command, *args):
"backtype.storm.command.shell_submission",
args=runnerargs,
jvmtype="-client",
- extrajars=[CONF_DIR],
+ extrajars=[USER_CONF_DIR],
fork=True)
os.system("rm " + tmpjarpath)
@@ -231,7 +235,7 @@ def repl():
Opens up a Clojure REPL with the storm jars and configuration
on the classpath. Useful for debugging.
"""
- cppaths = [STORM_DIR + "/conf"]
+ cppaths = [CLUSTER_CONF_DIR]
exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths)
def nimbus(klass="backtype.storm.daemon.nimbus"):
@@ -243,7 +247,7 @@ def nimbus(klass="backtype.storm.daemon.nimbus"):
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
- cppaths = [STORM_DIR + "/conf"]
+ cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
"-Dlogfile.name=nimbus.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
@@ -263,7 +267,7 @@ def supervisor(klass="backtype.storm.daemon.supervisor"):
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
- cppaths = [STORM_DIR + "/conf"]
+ cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
"-Dlogfile.name=supervisor.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
@@ -284,7 +288,7 @@ def ui():
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
- cppaths = [STORM_DIR + "/conf"]
+ cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
"-Dlogfile.name=ui.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
@@ -293,7 +297,7 @@ def ui():
"backtype.storm.ui.core",
jvmtype="-server",
jvmopts=jvmopts,
- extrajars=[STORM_DIR, STORM_DIR + "/conf"])
+ extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
def drpc():
"""Syntax: [storm drpc]
@@ -312,7 +316,7 @@ def drpc():
"backtype.storm.daemon.drpc",
jvmtype="-server",
jvmopts=jvmopts,
- extrajars=[STORM_DIR + "/conf"])
+ extrajars=[CLUSTER_CONF_DIR])
def dev_zookeeper():
"""Syntax: [storm dev-zookeeper]
@@ -321,11 +325,11 @@ def dev_zookeeper():
"storm.zookeeper.port" as its port. This is only intended for development/testing, the
Zookeeper instance launched is not configured to be used in production.
"""
- cppaths = [STORM_DIR + "/conf"]
+ cppaths = [CLUSTER_CONF_DIR]
exec_storm_class(
"backtype.storm.command.dev_zookeeper",
jvmtype="-server",
- extrajars=[STORM_DIR + "/conf"])
+ extrajars=[CLUSTER_CONF_DIR])
def version():
"""Syntax: [storm version]
View
2  project.clj
@@ -1,4 +1,4 @@
-(defproject storm/storm "0.9.0-wip15"
+(defproject storm/storm "0.9.0-wip16"
:url "http://storm-project.clj"
:description "Distributed and fault-tolerant realtime computation"
:license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"}
View
25 src/clj/backtype/storm/daemon/nimbus.clj
@@ -4,6 +4,7 @@
(:import [org.apache.thrift7 TException])
(:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [java.nio ByteBuffer])
+ (:import [java.io FileNotFoundException])
(:import [java.nio.channels Channels WritableByteChannel])
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
@@ -856,6 +857,22 @@
(throw (InvalidTopologyException.
(str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))))
+(defn- try-read-storm-conf [conf storm-id]
+ (try-cause
+ (read-storm-conf conf storm-id)
+ (catch FileNotFoundException e
+ (throw (NotAliveException. storm-id)))
+ )
+)
+
+(defn- try-read-storm-topology [conf storm-id]
+ (try-cause
+ (read-storm-topology conf storm-id)
+ (catch FileNotFoundException e
+ (throw (NotAliveException. storm-id)))
+ )
+)
+
(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
@@ -1014,13 +1031,13 @@
(to-json (:conf nimbus)))
(^String getTopologyConf [this ^String id]
- (to-json (read-storm-conf conf id)))
+ (to-json (try-read-storm-conf conf id)))
(^StormTopology getTopology [this ^String id]
- (system-topology! (read-storm-conf conf id) (read-storm-topology conf id)))
+ (system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))
(^StormTopology getUserTopology [this ^String id]
- (read-storm-topology conf id))
+ (try-read-storm-topology conf id))
(^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -1063,7 +1080,7 @@
(^TopologyInfo getTopologyInfo [this ^String storm-id]
(let [storm-cluster-state (:storm-cluster-state nimbus)
- task->component (storm-task-info (read-storm-topology conf storm-id) (read-storm-conf conf storm-id))
+ task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id))
base (.storm-base storm-cluster-state storm-id nil)
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
View
45 src/clj/backtype/storm/daemon/supervisor.clj
@@ -20,11 +20,17 @@
(shutdown-all-workers [this])
)
+(defn- assignments-snapshot [storm-cluster-state callback]
+ (let [storm-ids (.assignments storm-cluster-state callback)]
+ (->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid callback)})
+ (apply merge)
+ (filter-val not-nil?)
+ )))
-(defn- read-my-executors [storm-cluster-state storm-id assignment-id callback]
- (let [assignment (.assignment-info storm-cluster-state storm-id callback)
+(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
+ (let [assignment (get assignments-snapshot storm-id)
my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
- (:executor->node+port assignment))
+ (:executor->node+port assignment))
port-executors (apply merge-with
concat
(for [[executor [_ port]] my-executors]
@@ -34,29 +40,18 @@
;; need to cast to int b/c it might be a long (due to how yaml parses things)
;; doall is to avoid serialization/deserialization problems with lazy seqs
[(Integer. port) (LocalAssignment. storm-id (doall executors))]
- ))
- ))
+ ))))
+
(defn- read-assignments
"Returns map from port to struct containing :storm-id and :executors"
- [storm-cluster-state assignment-id callback]
- (let [storm-ids (.assignments storm-cluster-state callback)]
- (apply merge-with
- (fn [& ignored]
- (throw (RuntimeException.
- "Should not have multiple topologies assigned to one port")))
- (dofor [sid storm-ids] (read-my-executors storm-cluster-state sid assignment-id callback))
- )))
+ [assignments-snapshot assignment-id]
+ (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
+ (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
(defn- read-storm-code-locations
- [storm-cluster-state callback]
- (let [storm-ids (.assignments storm-cluster-state callback)]
- (into {}
- (dofor [sid storm-ids]
- [sid (:master-code-dir (.assignment-info storm-cluster-state sid callback))]
- ))
- ))
-
+ [assignments-snapshot]
+ (map-val :master-code-dir assignments-snapshot))
(defn- read-downloaded-storm-ids [conf]
(map #(java.net.URLDecoder/decode %) (read-dir-contents (supervisor-stormdist-root conf)))
@@ -265,12 +260,12 @@
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
- storm-code-map (read-storm-code-locations storm-cluster-state sync-callback)
+ assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
+ storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
all-assignment (read-assignments
- storm-cluster-state
- (:assignment-id supervisor)
- sync-callback)
+ assignments-snapshot
+ (:assignment-id supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
View
52 src/jvm/backtype/storm/security/serialization/BlowfishTupleSerializer.java
@@ -30,45 +30,45 @@
private BlowfishSerializer _serializer;
public BlowfishTupleSerializer(Kryo kryo, Map storm_conf) {
- String encryption_key = null;
- try {
- encryption_key = (String)storm_conf.get(SECRET_KEY);
- LOG.debug("Blowfish serializer being constructed ...");
- if (encryption_key == null) {
- LOG.error("Encryption key not specified");
- throw new RuntimeException("Blowfish encryption key not specified");
- }
- byte[] bytes = Hex.decodeHex(encryption_key.toCharArray());
- _serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes);
- } catch (org.apache.commons.codec.DecoderException ex) {
- LOG.error("Invalid encryption key");
- throw new RuntimeException("Blowfish encryption key invalid");
- }
+ String encryption_key = null;
+ try {
+ encryption_key = (String)storm_conf.get(SECRET_KEY);
+ LOG.debug("Blowfish serializer being constructed ...");
+ if (encryption_key == null) {
+ LOG.error("Encryption key not specified");
+ throw new RuntimeException("Blowfish encryption key not specified");
+ }
+ byte[] bytes = Hex.decodeHex(encryption_key.toCharArray());
+ _serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes);
+ } catch (org.apache.commons.codec.DecoderException ex) {
+ LOG.error("Invalid encryption key", ex);
+ throw new RuntimeException("Blowfish encryption key invalid", ex);
+ }
}
@Override
public void write(Kryo kryo, Output output, ListDelegate object) {
- _serializer.write(kryo, output, object);
+ _serializer.write(kryo, output, object);
}
@Override
public ListDelegate read(Kryo kryo, Input input, Class<ListDelegate> type) {
- return (ListDelegate)_serializer.read(kryo, input, type);
+ return (ListDelegate)_serializer.read(kryo, input, type);
}
/**
* Produce a blowfish key to be used in "Storm jar" command
*/
public static void main(String[] args) {
- try{
- KeyGenerator kgen = KeyGenerator.getInstance("Blowfish");
- SecretKey skey = kgen.generateKey();
- byte[] raw = skey.getEncoded();
- String keyString = new String(Hex.encodeHex(raw));
- System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." );
- } catch (Exception ex) {
- LOG.error(ex.getMessage());
- ex.printStackTrace();
- }
+ try{
+ KeyGenerator kgen = KeyGenerator.getInstance("Blowfish");
+ SecretKey skey = kgen.generateKey();
+ byte[] raw = skey.getEncoded();
+ String keyString = new String(Hex.encodeHex(raw));
+ System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." );
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage());
+ ex.printStackTrace();
+ }
}
}
View
3  src/jvm/backtype/storm/utils/Utils.java
@@ -112,7 +112,8 @@ public static Map findAndReadConfigFile(String name, boolean mustExist) {
else return new HashMap();
}
if(resources.size() > 1) {
- throw new RuntimeException("Found multiple " + name + " resources. You're probably bundling the Storm jars with your topology jar.");
+ throw new RuntimeException("Found multiple " + name + " resources. You're probably bundling the Storm jars with your topology jar. "
+ + resources);
}
URL resource = resources.get(0);
Yaml yaml = new Yaml();
View
59 src/jvm/storm/trident/state/map/CachedBatchReadsMap.java
@@ -6,38 +6,45 @@
import java.util.Map;
import storm.trident.state.ValueUpdater;
-public class CachedBatchReadsMap<T> implements MapState<T> {
+
+public class CachedBatchReadsMap<T> {
+ public static class RetVal<T> {
+ public boolean cached;
+ public T val;
+
+ public RetVal(T v, boolean c) {
+ val = v;
+ cached = c;
+ }
+ }
+
Map<List<Object>, T> _cached = new HashMap<List<Object>, T>();
- public MapState<T> _delegate;
+ public IBackingMap<T> _delegate;
- public CachedBatchReadsMap(MapState<T> delegate) {
+ public CachedBatchReadsMap(IBackingMap<T> delegate) {
_delegate = delegate;
}
+
+ public void reset() {
+ _cached.clear();
+ }
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<T> ret = _delegate.multiGet(keys);
- if(!_cached.isEmpty()) {
- ret = new ArrayList<T>(ret);
- for(int i=0; i<keys.size(); i++) {
- List<Object> key = keys.get(i);
- if(_cached.containsKey(key)) {
- ret.set(i, _cached.get(key));
- }
+ public List<RetVal<T>> multiGet(List<List<Object>> keys) {
+ // TODO: can optimize further by only querying backing map for keys not in the cache
+ List<T> vals = _delegate.multiGet(keys);
+ List<RetVal<T>> ret = new ArrayList(vals.size());
+ for(int i=0; i<keys.size(); i++) {
+ List<Object> key = keys.get(i);
+ if(_cached.containsKey(key)) {
+ ret.add(new RetVal(_cached.get(key), true));
+ } else {
+ ret.add(new RetVal(vals.get(i), false));
}
}
return ret;
}
- @Override
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<T> vals = _delegate.multiUpdate(keys, updaters);
- cache(keys, vals);
- return vals;
- }
-
- @Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
_delegate.multiPut(keys, vals);
cache(keys, vals);
@@ -51,16 +58,6 @@ private void cache(List<List<Object>> keys, List<T> vals) {
}
}
- @Override
- public void beginCommit(Long txid) {
- _cached.clear(); //if a commit was pending and failed, we need to make sure to clear the cache
- _delegate.beginCommit(txid);
- }
- @Override
- public void commit(Long txid) {
- _cached.clear();
- _delegate.commit(txid);
- }
}
View
24 src/jvm/storm/trident/state/map/OpaqueMap.java
@@ -9,21 +9,22 @@
public class OpaqueMap<T> implements MapState<T> {
public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {
- return new CachedBatchReadsMap<T>(new OpaqueMap<T>(backing));
+ return new OpaqueMap<T>(backing);
}
- IBackingMap<OpaqueValue> _backing;
+ CachedBatchReadsMap<OpaqueValue> _backing;
Long _currTx;
protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
- _backing = backing;
+ _backing = new CachedBatchReadsMap(backing);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
- List<OpaqueValue> curr = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(curr.size());
- for(OpaqueValue val: curr) {
+ for(CachedBatchReadsMap.RetVal<OpaqueValue> retval: curr) {
+ OpaqueValue val = retval.val;
if(val!=null) {
ret.add((T) val.get(_currTx));
} else {
@@ -35,17 +36,22 @@ protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<OpaqueValue> curr = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size());
List<T> ret = new ArrayList<T>();
for(int i=0; i<curr.size(); i++) {
- OpaqueValue<T> val = curr.get(i);
+ CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i);
+ OpaqueValue<T> val = retval.val;
ValueUpdater<T> updater = updaters.get(i);
T prev;
if(val==null) {
prev = null;
} else {
- prev = val.get(_currTx);
+ if(retval.cached) {
+ prev = val.getCurr();
+ } else {
+ prev = val.get(_currTx);
+ }
}
T newVal = updater.update(prev);
ret.add(newVal);
@@ -73,11 +79,13 @@ public void multiPut(List<List<Object>> keys, List<T> vals) {
@Override
public void beginCommit(Long txid) {
_currTx = txid;
+ _backing.reset();
}
@Override
public void commit(Long txid) {
_currTx = null;
+ _backing.reset();
}
static class ReplaceUpdater<T> implements ValueUpdater<T> {
View
39 src/jvm/storm/trident/state/map/TransactionalMap.java
@@ -9,21 +9,22 @@
public class TransactionalMap<T> implements MapState<T> {
public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
- return new CachedBatchReadsMap<T>(new TransactionalMap<T>(backing));
+ return new TransactionalMap<T>(backing);
}
-
- IBackingMap<TransactionalValue> _backing;
+
+ CachedBatchReadsMap<TransactionalValue> _backing;
Long _currTx;
protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
- _backing = backing;
+ _backing = new CachedBatchReadsMap(backing);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
- List<TransactionalValue> vals = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(vals.size());
- for(TransactionalValue v: vals) {
+ for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {
+ TransactionalValue v = retval.val;
if(v!=null) {
ret.add((T) v.getVal());
} else {
@@ -35,26 +36,36 @@ protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<TransactionalValue> curr = _backing.multiGet(keys);
+ List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
+ List<List<Object>> newKeys = new ArrayList();
List<T> ret = new ArrayList<T>();
for(int i=0; i<curr.size(); i++) {
- TransactionalValue<T> val = curr.get(i);
+ CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
+ TransactionalValue<T> val = retval.val;
ValueUpdater<T> updater = updaters.get(i);
TransactionalValue<T> newVal;
+ boolean changed = false;
if(val==null) {
newVal = new TransactionalValue<T>(_currTx, updater.update(null));
+ changed = true;
} else {
- if(_currTx!=null && _currTx.equals(val.getTxid())) {
+ if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {
newVal = val;
} else {
newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
- }
+ changed = true;
+ }
}
ret.add(newVal.getVal());
- newVals.add(newVal);
+ if(changed) {
+ newVals.add(newVal);
+ newKeys.add(keys.get(i));
+ }
+ }
+ if(!newKeys.isEmpty()) {
+ _backing.multiPut(newKeys, newVals);
}
- _backing.multiPut(keys, newVals);
return ret;
}
@@ -70,10 +81,12 @@ public void multiPut(List<List<Object>> keys, List<T> vals) {
@Override
public void beginCommit(Long txid) {
_currTx = txid;
+ _backing.reset();
}
@Override
public void commit(Long txid) {
_currTx = null;
- }
+ _backing.reset();
+ }
}
View
30 src/jvm/storm/trident/testing/MemoryBackingMap.java
@@ -0,0 +1,30 @@
+package storm.trident.testing;
+
+import storm.trident.state.map.IBackingMap;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MemoryBackingMap implements IBackingMap<Object> {
+ Map _vals = new HashMap();
+
+ @Override
+ public List<Object> multiGet(List<List<Object>> keys) {
+ List ret = new ArrayList();
+ for(List key: keys) {
+ ret.add(_vals.get(key));
+ }
+ return ret;
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keys, List<Object> vals) {
+ for(int i=0; i<keys.size(); i++) {
+ List key = keys.get(i);
+ Object val = vals.get(i);
+ _vals.put(key, val);
+ }
+ }
+}
View
10 test/clj/backtype/storm/nimbus_test.clj
@@ -144,6 +144,16 @@
(is (not-nil? ((:executor->start-time-secs assignment) e))))
))
+(deftest test-bogusId
+ (with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
+ (let [state (:storm-cluster-state cluster)
+ nimbus (:nimbus cluster)]
+ (is (thrown? NotAliveException (.getTopologyConf nimbus "bogus-id")))
+ (is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
+ (is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
+ (is (thrown? NotAliveException (.getTopologyInfo nimbus "bogus-id")))
+ )))
+
(deftest test-assignment
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
View
56 test/clj/storm/trident/state_test.clj
@@ -0,0 +1,56 @@
+(ns storm.trident.state-test
+ (:use [clojure test])
+ (:require [backtype.storm [testing :as t]])
+ (:import [storm.trident.operation.builtin Count])
+ (:import [storm.trident.state CombinerValueUpdater])
+ (:import [storm.trident.state.map TransactionalMap OpaqueMap])
+ (:import [storm.trident.testing MemoryBackingMap])
+ (:use [storm.trident testing])
+ (:use [backtype.storm util]))
+
+(defn single-get [map key]
+ (-> map (.multiGet [[key]]) first))
+
+(defn single-update [map key amt]
+ (-> map (.multiUpdate [[key]] [(CombinerValueUpdater. (Count.) amt)]) first))
+
+(deftest test-opaque-map
+ (let [map (OpaqueMap/build (MemoryBackingMap.))]
+ (.beginCommit map 1)
+ (is (= nil (single-get map "a")))
+ ;; tests that intra-batch caching works
+ (is (= 1 (single-update map "a" 1)))
+ (is (= 3 (single-update map "a" 2)))
+ (.commit map 1)
+ (.beginCommit map 1)
+ (is (= nil (single-get map "a")))
+ (is (= 2 (single-update map "a" 2)))
+ (.commit map 1)
+ (.beginCommit map 2)
+ (is (= 2 (single-get map "a")))
+ (is (= 5 (single-update map "a" 3)))
+ (is (= 6 (single-update map "a" 1)))
+ (.commit map 2)
+ ))
+
+(deftest test-transactional-map
+ (let [map (TransactionalMap/build (MemoryBackingMap.))]
+ (.beginCommit map 1)
+ (is (= nil (single-get map "a")))
+ ;; tests that intra-batch caching works
+ (is (= 1 (single-update map "a" 1)))
+ (is (= 3 (single-update map "a" 2)))
+ (.commit map 1)
+ (.beginCommit map 1)
+ (is (= 3 (single-get map "a")))
+ ;; tests that intra-batch caching has no effect if it's the same commit as previous commit
+ (is (= 3 (single-update map "a" 1)))
+ (is (= 3 (single-update map "a" 2)))
+ (.commit map 1)
+ (.beginCommit map 2)
+ (is (= 3 (single-get map "a")))
+ (is (= 6 (single-update map "a" 3)))
+ (is (= 7 (single-update map "a" 1)))
+ (.commit map 2)
+ ))
+
Please sign in to comment.
Something went wrong with that request. Please try again.