Skip to content
This repository


Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 177 lines (136 sloc) 9.959 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
Use cases:

1. number of steps between 2 people in a graph (topology with cycles?)


* Repackage jzmq and zmq as a leiningen "native dep"
       - this might be good, since the native dep can package builds for all different systems/os's?

* Deploy design:

- storm swap {name} {jar} {class}
- it's allowed to use resources equal to current running topology plus number of free resources
- starts in deactivated mode
- add TOPOLOGY_STARTUP_TIME config for the delay until nimbus activates a topology after launching it
- for swap, after the startup time, deactivate the other topology, wait the TOPOLOGY_MESSAGE_TIMEOUT_SECS, and then activate the other topology
- should be able to decrease the message timeout for killing or swapping (add optional thrift parameter) -- or just make it part of the config?
- add killWithOptions, swap, swapWithOptions

* Storm UI, stats, debugging, diagnosis tools
-- need to be able to hide system streams/components from the calculations (another query param and should be default)
-- need to optimize (slowness is probably on nimbus end of querying zk, consider adding heartbeat caching into nimbus)
-- add margins
-- add titles so its easier to distinguish the various pages
-- right align all table columns except for the leftmost

* Unit test the core pieces that have stabilized their APIs

- process simulator
- virtual ports
- supervisor
- utils
- test worker/tasks

* implement pseudo-distributed mode -- this is for testing the distributed parts of the code
  - perhaps i can use pallet/vmfest for this

* Need integration tests that run on an actual storm cluster (scp code/process code/zookeeper code not tested in unit tests)

* bolts with none grouping can be pushed into a bolt. e.g. A -> B -> C
     A -> D -> E
If A -> B and A -> D are shuffle grouping = none, and B -> C and D -> E are not, then both can be run in A, b's branch goes to C and D's branch goes to E

* Failure design

Add fail method to outputcollector
Fail sends fail message to Acker for those anchors, which sends fail message back to spout.
Whenever spout fails a tuple, it emits it in its failure stream...

Add fail method to drpc... Causes blocked thread to throw exception

* Have worker heartbeat with its task ids, nimbus verifies - if wrong, reassign tasks?
- detect and ignore stray tasks
Each worker can choose a unique id for itself when heart beating
- nimbus deletes those that aren't in topology

* Subscriptions design

-- new kind of spout: "subscription spout"
   --> goal is to sync it's data across the tasks that subscribe to its streams
   --> after doing a grouping, remembers what task it sent the tuple to (regardless of grouping). if a task dies, it knows its subscriptions and asks to be resynced
   --> normal operation is to push to tasks, but pull done when a task starts up (b/c previous task died or something)
   --> need to be able to add tuples to subscription or take tuples away (this is protocol with who you're subscribing to - e.g. rocket)
   --> subscriptions can only happen in a spout because it requires persistent state
   --> when subscription spout task dies, it polls the source (e.g. rocket) for all the subscription info
   --> ideally you'd set things up to have one subscription spout per rocket server
   --> TODO: Need some way to delete subscriptions -> part of tuple or extra metadata on tuple (extra metadata seems cleaner)
        --> add isSubscription() method to Tuple as well as a getSubscriptionType() [which returns ADD or REMOVE]
   --> when a spout starts up, it also needs to push all of its subscription info
   --> acks are irrelevant for subscription tuples -- how should acks be managed as an abstraction?
        -- maybe the synchronized state is done for you -- you just access the state directly and receive a callback whenever it changes?
        -- so don't use tuples...
   --> subscriptions break all the abstractions, perhaps I should generalize spouts and factor acking as a library on top of storm. subscriptions would just be another kind of library? -> no, it seems to break abstractions anyway (like keeping task -> tuples in memory)
   --> maybe call it "syncspout"
   --> if just do syncing (don't expose tuples directly?)
   --> have a "SubscribedState" class that takes care of indexing/etc. --> expose it through topologycontext?
      -- need a way to distinguish between states of different streams
      -- has "add" and "remove" methods
      -- bolt can give a statemanager object that implements add and remove in the prepare method
      -- add(Tuple tuple)
      -- remove(Tuple tuple)
   --> synchronize protocol (when spout or source of data dies):
      --> send how many tuples are going to be sent
      --> send the tuples
      --> OR: pack everything together into a single message (could be hard b/c where tuples are supposed to go is abstracted away)
      --> tie everything together with a unique ID
      --> once task receives everything, has info needed to remove tuples
   --> statespout should do long-polling with timeout
   --> to do subscriptions, the state should contain something like [url, subscriber]. some bolt appends subscriber to tuples, group by subscriber, and send info back
        --> how to to fields grouping with an even distribution?
   --> ********* tasks need to block on startup until they're synchronized *********
          --> send sync messages in a loop until it's synchronized
          --> add a task.synchronize.poll.freq.secs config (default to 10 seconds)
          --> need to buffer other messages as topology is waiting for synchronization messages (use disk?)
   --> could use acking system to know if a piece of state gets fully synchronized and communicate this with user
      --> perhaps expose this through a special stream? (the state status stream -> similar to failure streams)
   --> should be able to do updates of existing state
      --> use case: have a knob that you can set externally
      --> this isn't really any better than just using zookeeper directly
_myState = context.setSubscribedState(_myState)

StateSpout {
  //does a timeout long poll and emits new add or remove state tuples (add and remove on the output collector)
  nextTuple(StateSpoutOutputCollector) //collector has add and remove methods add(id, tuple). remove(id)
  //emits all the tuples into the output collector (in the background, will also send ids and counts to tasks so they know how to synchronize)
  //called on startup
  //collector can have a synchronize method in case the source of data (e.g., rocket) craps out
  synchronize(SynchronizationOutputCollector) //collector only has add(id, tuple) method

//task startup (in prepare method) [this is automatic]
for(int taskId: statespoutids) {
  emitDirect(SYNC_STREAM, tuple())

statespout synchronization():
  id = uuid()
  //getAlLStateTuples calls synchronize on the spout to get the tuples
  for(Tuple t: getAllStateTuplesFromSource()) {
    List tasks = emit(cons(id, t));
    .. keep track of id -> tasks -> count
    for(task: all output tasks) {
      emitDirect(task, id, count)

for synchronization to work, task needs to keep track of which tasks sent it tuples, and compare against only that set on synchronization

Need a way to propogate information back up the topology - "subscriptions"
e.g. browser -> rocket -> bolt -> bolt -> bolt.

example: #retweets for a subscribed set of tweet ids

storm topology

 -> tweet spout (A) -> group on original id -> count (B) -> rocket

subscriptions: rocket -> count (B) tweet id (need to group) -> spout (need to go to all)

-- how does it work when stuff dies downstream or upstream? do people ask what the subscriptions are? or do you push your subscriptions up? a combination?

-- maybe subscriptions are a "constant" spout? e..g, continuously emits and refreshes to make sure every task has the tuple. this seem amporphous and hard to implement... nimbus would need to refire all constant spouts whenever there's a reassignment that affects the flow of data. subscriptions seem more natural

-- subscriptions are a special kind of stream that are driven by being asked to send it. e..g, rocket is a spout that emits subscription/unsubscription tuples. they only send it when they get something new, or are asked as to what all the subscriptions are

-- maybe you just need a system stream to know when tasks are created. when you see that a downstream task is created, you know to fire subscriptions to it if its subscribed to your subscriptions stream? - how does this interplay with all the grouping types... you almost want to do a grouping and only send what to tasks that would have received. spouts would need to be able to subscribe to streams as well

(use 'backtype.storm.testing)
(def cluster (mk-local-storm-cluster))
(use 'backtype.storm.bootstrap) (bootstrap)
(import '[backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(def spout (feeder-spout ["word"]))
(def topology (thrift/mk-topology
                    {1 (thrift/mk-spout-spec spout :parallelism-hint 3)}
                    {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
                     3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
                     4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4 TOPOLOGY-DEBUG true} topology)

* clean up project
  - remove log4j dir and instead generate it in the deploy (it's only used in bin/storm -> create a console one and put into bin/)
  - include system component / stream information in the topologycontext and clean up system specific code all over the place

* Very rare errors

weird nullptr exceptions:
(tasks i) on send-fn
no virtual port socket for outbound task (in worker)
Something went wrong with that request. Please try again.