Skip to content

Commit

Permalink
Merge pull request nathanmarz#1 from nathanmarz/master
Browse files Browse the repository at this point in the history
merge changes from nathanmarz/storm
  • Loading branch information
anfeng committed Feb 21, 2013
2 parents 01c4147 + e6eea02 commit d104640
Show file tree
Hide file tree
Showing 18 changed files with 282 additions and 133 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,16 @@
* Logs are now limited to 1GB per worker (configurable via logging configuration file) * Logs are now limited to 1GB per worker (configurable via logging configuration file)
* Build upgraded to leiningen 2.0 * Build upgraded to leiningen 2.0
* Revamped Trident spout interfaces to support more dynamic spouts, such as a spout who reads from a changing set of brokers * Revamped Trident spout interfaces to support more dynamic spouts, such as a spout who reads from a changing set of brokers
* How tuples are serialized is now pluggable (thanks anfeng)
* Added blowfish encryption based tuple serialization (thanks anfeng)
* Have storm fall back to installed storm.yaml (thanks revans2)
* Improve error message when Storm detects bundled storm.yaml to show the URL's for offending resources (thanks revans2)
* Nimbus throws NotAliveException instead of FileNotFoundException from various query methods when topology is no longer alive (thanks revans2)
* Bug fix: Supervisor provides full path to workers to logging config rather than relative path (thanks revans2)
* Bug fix: Call ReducerAggregator#init properly when used within persistentAggregate (thanks lorcan)
* Bug fix: Set component-specific configs correctly for Trident spouts
* Bug fix: Fix TransactionalMap and OpaqueMap to correctly do multiple updates to the same key in the same batch
* Bug fix: Fix race condition between supervisor and Nimbus that could lead to stormconf.ser errors and infinite crashing of supervisor


## 0.8.2 ## 0.8.2


Expand Down
4 changes: 4 additions & 0 deletions NOTICE
@@ -0,0 +1,4 @@
Storm
Copyright 2011-2013 Nathan Marz


3 changes: 3 additions & 0 deletions README.markdown
Expand Up @@ -65,6 +65,9 @@ You must not remove this notice, or any other, from this software.
* Bryan Peterson ([@Lazyshot](https://github.com/Lazyshot)) * Bryan Peterson ([@Lazyshot](https://github.com/Lazyshot))
* Sam Ritchie ([@sritchie](https://github.com/sritchie)) * Sam Ritchie ([@sritchie](https://github.com/sritchie))
* Stuart Anderson ([@emblem](https://github.com/emblem)) * Stuart Anderson ([@emblem](https://github.com/emblem))
* Robert Evans ([@revans2](https://github.com/revans2))
* Andy Feng ([@anfeng](https://github.com/anfeng))
* Lorcan Coyle ([@lorcan](https://github.com/lorcan))


## Acknowledgements ## Acknowledgements


Expand Down
40 changes: 22 additions & 18 deletions bin/storm
Expand Up @@ -22,11 +22,15 @@ if sys.platform == "cygwin":
else: else:
normclasspath = identity normclasspath = identity


CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.realpath( __file__ ).split("/")[:-2]) STORM_DIR = "/".join(os.path.realpath( __file__ ).split("/")[:-2])
USER_CONF_DIR = os.path.expanduser("~/.storm")
CLUSTER_CONF_DIR = STORM_DIR + "/conf"
if (not os.path.isfile(USER_CONF_DIR + "/storm.yaml")):
USER_CONF_DIR = CLUSTER_CONF_DIR
CONFIG_OPTS = [] CONFIG_OPTS = []
CONFFILE = "" CONFFILE = ""



def get_config_opts(): def get_config_opts():
global CONFIG_OPTS global CONFIG_OPTS
return "-Dstorm.options=" + (','.join(CONFIG_OPTS)).replace(' ', "%%%%") return "-Dstorm.options=" + (','.join(CONFIG_OPTS)).replace(' ', "%%%%")
Expand Down Expand Up @@ -73,7 +77,7 @@ def print_localconfvalue(name):
The local Storm configs are the ones in ~/.storm/storm.yaml merged The local Storm configs are the ones in ~/.storm/storm.yaml merged
in with the configs in defaults.yaml. in with the configs in defaults.yaml.
""" """
print name + ": " + confvalue(name, [CONF_DIR]) print name + ": " + confvalue(name, [USER_CONF_DIR])


def print_remoteconfvalue(name): def print_remoteconfvalue(name):
"""Syntax: [storm remoteconfvalue conf-name] """Syntax: [storm remoteconfvalue conf-name]
Expand All @@ -84,7 +88,7 @@ def print_remoteconfvalue(name):
This command must be run on a cluster machine. This command must be run on a cluster machine.
""" """
print name + ": " + confvalue(name, [STORM_DIR + "/conf"]) print name + ": " + confvalue(name, [CLUSTER_CONF_DIR])


def parse_args(string): def parse_args(string):
r"""Takes a string of whitespace-separated tokens and parses it into a list. r"""Takes a string of whitespace-separated tokens and parses it into a list.
Expand Down Expand Up @@ -132,7 +136,7 @@ def jar(jarfile, klass, *args):
exec_storm_class( exec_storm_class(
klass, klass,
jvmtype="-client", jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
args=args, args=args,
jvmopts=["-Dstorm.jar=" + jarfile]) jvmopts=["-Dstorm.jar=" + jarfile])


Expand All @@ -150,7 +154,7 @@ def kill(*args):
"backtype.storm.command.kill_topology", "backtype.storm.command.kill_topology",
args=args, args=args,
jvmtype="-client", jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"]) extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])


def activate(*args): def activate(*args):
"""Syntax: [storm activate topology-name] """Syntax: [storm activate topology-name]
Expand All @@ -161,7 +165,7 @@ def activate(*args):
"backtype.storm.command.activate", "backtype.storm.command.activate",
args=args, args=args,
jvmtype="-client", jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"]) extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])


def listtopos(*args): def listtopos(*args):
"""Syntax: [storm list] """Syntax: [storm list]
Expand All @@ -172,7 +176,7 @@ def listtopos(*args):
"backtype.storm.command.list", "backtype.storm.command.list",
args=args, args=args,
jvmtype="-client", jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"]) extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])


def deactivate(*args): def deactivate(*args):
"""Syntax: [storm deactivate topology-name] """Syntax: [storm deactivate topology-name]
Expand All @@ -183,7 +187,7 @@ def deactivate(*args):
"backtype.storm.command.deactivate", "backtype.storm.command.deactivate",
args=args, args=args,
jvmtype="-client", jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"]) extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])


def rebalance(*args): def rebalance(*args):
"""Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*] """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]
Expand All @@ -210,7 +214,7 @@ def rebalance(*args):
"backtype.storm.command.rebalance", "backtype.storm.command.rebalance",
args=args, args=args,
jvmtype="-client", jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"]) extrajars=[USER_CONF_DIR, STORM_DIR + "/bin"])


def shell(resourcesdir, command, *args): def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
Expand All @@ -221,7 +225,7 @@ def shell(resourcesdir, command, *args):
"backtype.storm.command.shell_submission", "backtype.storm.command.shell_submission",
args=runnerargs, args=runnerargs,
jvmtype="-client", jvmtype="-client",
extrajars=[CONF_DIR], extrajars=[USER_CONF_DIR],
fork=True) fork=True)
os.system("rm " + tmpjarpath) os.system("rm " + tmpjarpath)


Expand All @@ -231,7 +235,7 @@ def repl():
Opens up a Clojure REPL with the storm jars and configuration Opens up a Clojure REPL with the storm jars and configuration
on the classpath. Useful for debugging. on the classpath. Useful for debugging.
""" """
cppaths = [STORM_DIR + "/conf"] cppaths = [CLUSTER_CONF_DIR]
exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths) exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths)


def nimbus(klass="backtype.storm.daemon.nimbus"): def nimbus(klass="backtype.storm.daemon.nimbus"):
Expand All @@ -243,7 +247,7 @@ def nimbus(klass="backtype.storm.daemon.nimbus"):
See Setting up a Storm cluster for more information. See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
""" """
cppaths = [STORM_DIR + "/conf"] cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
"-Dlogfile.name=nimbus.log", "-Dlogfile.name=nimbus.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml", "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
Expand All @@ -263,7 +267,7 @@ def supervisor(klass="backtype.storm.daemon.supervisor"):
See Setting up a Storm cluster for more information. See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
""" """
cppaths = [STORM_DIR + "/conf"] cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [ jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
"-Dlogfile.name=supervisor.log", "-Dlogfile.name=supervisor.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml", "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
Expand All @@ -284,7 +288,7 @@ def ui():
See Setting up a Storm cluster for more information. See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
""" """
cppaths = [STORM_DIR + "/conf"] cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [ jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
"-Dlogfile.name=ui.log", "-Dlogfile.name=ui.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml", "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
Expand All @@ -293,7 +297,7 @@ def ui():
"backtype.storm.ui.core", "backtype.storm.ui.core",
jvmtype="-server", jvmtype="-server",
jvmopts=jvmopts, jvmopts=jvmopts,
extrajars=[STORM_DIR, STORM_DIR + "/conf"]) extrajars=[STORM_DIR, CLUSTER_CONF_DIR])


def drpc(): def drpc():
"""Syntax: [storm drpc] """Syntax: [storm drpc]
Expand All @@ -312,7 +316,7 @@ def drpc():
"backtype.storm.daemon.drpc", "backtype.storm.daemon.drpc",
jvmtype="-server", jvmtype="-server",
jvmopts=jvmopts, jvmopts=jvmopts,
extrajars=[STORM_DIR + "/conf"]) extrajars=[CLUSTER_CONF_DIR])


def dev_zookeeper(): def dev_zookeeper():
"""Syntax: [storm dev-zookeeper] """Syntax: [storm dev-zookeeper]
Expand All @@ -321,11 +325,11 @@ def dev_zookeeper():
"storm.zookeeper.port" as its port. This is only intended for development/testing, the "storm.zookeeper.port" as its port. This is only intended for development/testing, the
Zookeeper instance launched is not configured to be used in production. Zookeeper instance launched is not configured to be used in production.
""" """
cppaths = [STORM_DIR + "/conf"] cppaths = [CLUSTER_CONF_DIR]
exec_storm_class( exec_storm_class(
"backtype.storm.command.dev_zookeeper", "backtype.storm.command.dev_zookeeper",
jvmtype="-server", jvmtype="-server",
extrajars=[STORM_DIR + "/conf"]) extrajars=[CLUSTER_CONF_DIR])


def version(): def version():
"""Syntax: [storm version] """Syntax: [storm version]
Expand Down
2 changes: 1 addition & 1 deletion conf/storm.yaml.example
Expand Up @@ -23,7 +23,7 @@
# - "server2" # - "server2"


## Metrics Consumers ## Metrics Consumers
# topology.metrics.consumers.register: # topology.metrics.consumer.register:
# - class: "org.mycompany.MyMetricsConsumer" # - class: "org.mycompany.MyMetricsConsumer"
# argument: # argument:
# - endpoint: "metrics-collector.mycompany.org" # - endpoint: "metrics-collector.mycompany.org"
Expand Down
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject storm/storm "0.9.0-wip15" (defproject storm/storm "0.9.0-wip16"
:url "http://storm-project.clj" :url "http://storm-project.clj"
:description "Distributed and fault-tolerant realtime computation" :description "Distributed and fault-tolerant realtime computation"
:license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"} :license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"}
Expand Down
25 changes: 21 additions & 4 deletions src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -4,6 +4,7 @@
(:import [org.apache.thrift7 TException]) (:import [org.apache.thrift7 TException])
(:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket]) (:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [java.nio ByteBuffer]) (:import [java.nio ByteBuffer])
(:import [java.io FileNotFoundException])
(:import [java.nio.channels Channels WritableByteChannel]) (:import [java.nio.channels Channels WritableByteChannel])
(:use [backtype.storm.scheduler.DefaultScheduler]) (:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Expand Down Expand Up @@ -856,6 +857,22 @@
(throw (InvalidTopologyException. (throw (InvalidTopologyException.
(str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS)))))) (str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))))


(defn- try-read-storm-conf [conf storm-id]
(try-cause
(read-storm-conf conf storm-id)
(catch FileNotFoundException e
(throw (NotAliveException. storm-id)))
)
)

(defn- try-read-storm-topology [conf storm-id]
(try-cause
(read-storm-topology conf storm-id)
(catch FileNotFoundException e
(throw (NotAliveException. storm-id)))
)
)

(defserverfn service-handler [conf inimbus] (defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf)) (.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf) (log-message "Starting Nimbus with conf " conf)
Expand Down Expand Up @@ -1014,13 +1031,13 @@
(to-json (:conf nimbus))) (to-json (:conf nimbus)))


(^String getTopologyConf [this ^String id] (^String getTopologyConf [this ^String id]
(to-json (read-storm-conf conf id))) (to-json (try-read-storm-conf conf id)))


(^StormTopology getTopology [this ^String id] (^StormTopology getTopology [this ^String id]
(system-topology! (read-storm-conf conf id) (read-storm-topology conf id))) (system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))


(^StormTopology getUserTopology [this ^String id] (^StormTopology getUserTopology [this ^String id]
(read-storm-topology conf id)) (try-read-storm-topology conf id))


(^ClusterSummary getClusterInfo [this] (^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus) (let [storm-cluster-state (:storm-cluster-state nimbus)
Expand Down Expand Up @@ -1063,7 +1080,7 @@


(^TopologyInfo getTopologyInfo [this ^String storm-id] (^TopologyInfo getTopologyInfo [this ^String storm-id]
(let [storm-cluster-state (:storm-cluster-state nimbus) (let [storm-cluster-state (:storm-cluster-state nimbus)
task->component (storm-task-info (read-storm-topology conf storm-id) (read-storm-conf conf storm-id)) task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id))
base (.storm-base storm-cluster-state storm-id nil) base (.storm-base storm-cluster-state storm-id nil)
assignment (.assignment-info storm-cluster-state storm-id nil) assignment (.assignment-info storm-cluster-state storm-id nil)
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment)) beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
Expand Down
50 changes: 23 additions & 27 deletions src/clj/backtype/storm/daemon/supervisor.clj
Expand Up @@ -20,11 +20,17 @@
(shutdown-all-workers [this]) (shutdown-all-workers [this])
) )


(defn- assignments-snapshot [storm-cluster-state callback]
(let [storm-ids (.assignments storm-cluster-state callback)]
(->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid callback)})
(apply merge)
(filter-val not-nil?)
)))


(defn- read-my-executors [storm-cluster-state storm-id assignment-id callback] (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (.assignment-info storm-cluster-state storm-id callback) (let [assignment (get assignments-snapshot storm-id)
my-executors (filter (fn [[_ [node _]]] (= node assignment-id)) my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
(:executor->node+port assignment)) (:executor->node+port assignment))
port-executors (apply merge-with port-executors (apply merge-with
concat concat
(for [[executor [_ port]] my-executors] (for [[executor [_ port]] my-executors]
Expand All @@ -34,29 +40,18 @@
;; need to cast to int b/c it might be a long (due to how yaml parses things) ;; need to cast to int b/c it might be a long (due to how yaml parses things)
;; doall is to avoid serialization/deserialization problems with lazy seqs ;; doall is to avoid serialization/deserialization problems with lazy seqs
[(Integer. port) (LocalAssignment. storm-id (doall executors))] [(Integer. port) (LocalAssignment. storm-id (doall executors))]
)) ))))
))


(defn- read-assignments (defn- read-assignments
"Returns map from port to struct containing :storm-id and :executors" "Returns map from port to struct containing :storm-id and :executors"
[storm-cluster-state assignment-id callback] [assignments-snapshot assignment-id]
(let [storm-ids (.assignments storm-cluster-state callback)] (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
(apply merge-with (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
(fn [& ignored]
(throw (RuntimeException.
"Should not have multiple topologies assigned to one port")))
(dofor [sid storm-ids] (read-my-executors storm-cluster-state sid assignment-id callback))
)))


(defn- read-storm-code-locations (defn- read-storm-code-locations
[storm-cluster-state callback] [assignments-snapshot]
(let [storm-ids (.assignments storm-cluster-state callback)] (map-val :master-code-dir assignments-snapshot))
(into {}
(dofor [sid storm-ids]
[sid (:master-code-dir (.assignment-info storm-cluster-state sid callback))]
))
))



(defn- read-downloaded-storm-ids [conf] (defn- read-downloaded-storm-ids [conf]
(map #(java.net.URLDecoder/decode %) (read-dir-contents (supervisor-stormdist-root conf))) (map #(java.net.URLDecoder/decode %) (read-dir-contents (supervisor-stormdist-root conf)))
Expand Down Expand Up @@ -265,12 +260,12 @@
^ISupervisor isupervisor (:isupervisor supervisor) ^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor) ^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this)) sync-callback (fn [& ignored] (.add event-manager this))
storm-code-map (read-storm-code-locations storm-cluster-state sync-callback) assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf)) downloaded-storm-ids (set (read-downloaded-storm-ids conf))
all-assignment (read-assignments all-assignment (read-assignments
storm-cluster-state assignments-snapshot
(:assignment-id supervisor) (:assignment-id supervisor))
sync-callback)
new-assignment (->> all-assignment new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %))) (filter-key #(.confirmAssigned isupervisor %)))
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
Expand Down Expand Up @@ -409,6 +404,7 @@
(defmethod launch-worker (defmethod launch-worker
:distributed [supervisor storm-id port worker-id] :distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor) (let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
stormroot (supervisor-stormdist-root conf storm-id) stormroot (supervisor-stormdist-root conf storm-id)
stormjar (supervisor-stormjar-path stormroot) stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id) storm-conf (read-supervisor-storm-conf conf storm-id)
Expand All @@ -420,8 +416,8 @@
command (str "java -server " childopts command (str "java -server " childopts
" -Djava.library.path=" (conf JAVA-LIBRARY-PATH) " -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
" -Dlogfile.name=" logfilename " -Dlogfile.name=" logfilename
" -Dstorm.home=" (System/getProperty "storm.home") " -Dstorm.home=" storm-home
" -Dlogback.configurationFile=logback/cluster.xml" " -Dlogback.configurationFile=" storm-home "/logback/cluster.xml"
" -cp " classpath " backtype.storm.daemon.worker " " -cp " classpath " backtype.storm.daemon.worker "
(java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor) (java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor)
" " port " " worker-id)] " " port " " worker-id)]
Expand Down

0 comments on commit d104640

Please sign in to comment.