Skip to content
Browse files

Web Workers support

  • Loading branch information...
1 parent ff989fa commit e27ceeca6bdebb5597069ef25222857d02ad612b @mental committed
Showing with 338 additions and 16 deletions.
  1. +56 −1 README.md
  2. +73 −6 spec/WorkerSpec.coffee
  3. +39 −6 src/Actor.coffee
  4. +170 −3 src/Worker.coffee
View
57 README.md
@@ -353,7 +353,12 @@ function instead throws an exception, then the receiving
actor will die with that exception.
The callback should have no side-effects and should avoid
-state-modifying calls to the WebActors API.
+making state-modifying calls to the WebActors API.
+
+Note that kills don't, in themselves, break links. If an
+actor is sent a kill message by an actor it is linked to,
+the link will remain in place until the actor exits or
+it calls `unlink`.
## Pattern Matching
@@ -370,3 +375,53 @@ successful, or `null` otherwise.
### WebActors.ANY
When used in a pattern, `ANY` matches any value.
+
+## Web Workers
+
+In browsers that support it, WebActors offers support for
+true parallelism through the use of HTML5 Web Workers.
+
+Not only can WebActors be used within a worker, WebActors
+itself provides an actor-based API for managing Web
+Workers in a way that largely abstracts the difference
+between code running in a web worker.
+
+### WebActors.spawnWorker(scriptUrl)
+
+Starts a Web Worker running the given script and spawns
+an actor inside it, returning the new actor's id. This
+actor will run in parallel with the parent VM and can
+be used to perform background tasks without blocking the
+browser UI.
+
+In most respects, the spawned actor behaves like any
+other WebActors actor, and can interoperate with
+actors in the worker's parent VM. The main difference
+is that any messages crossing the worker/parent boundary
+are copied using `postMessage`, so some message features
+(such as classes) may not be preserved.
+
+### WebActors.spawnLinkedWorker(scriptUrl)
+
+Analagous to `spawnLinked`, but spawns a worker just as
+`spawnWorker` does. The spawned worker is immediately
+linked to the actor that spawned it.
+
+### WebActors.initWorker(function () {...})
+
+The "bottom half" of `spawnWorker`, `initWorker` MUST
+be called before any other WebActors API functions in
+a script started via `spawnWorker`.
+
+The passed-in function supplies the body of the actor
+returned by `spawnWorker`.
+
+When this actor exits, the worker will terminate, killing
+any other actors inside!
+
+### WebActors.terminateWorker(actorId)
+
+Forcibly terminates a worker spawned by `spawnWorker`,
+even if it is stuck in an infinite loop. This is only
+effective when called from the worker's parent VM;
+otherwise, it has no effect.
View
79 spec/WorkerSpec.coffee
@@ -23,9 +23,21 @@ describe "WebActors.spawnWorker", ->
waitsFor -> done
+describe "WebActor.spawnLinkedWorker", ->
+ it "should spawn a worker linked to the current one", ->
+ passed = false
+
+ WebActors.spawn ->
+ WebActors.trapKill (killer_id, reason) -> ["exit", killer_id, reason]
+ worker_id = spawn_helper 'spawnLinkedWorker', ->
+ WebActors.receive ["exit", worker_id, ANY], (message) ->
+ passed = true
+
+ waitsFor -> passed
+
describe "Workers", ->
- it "should handle array messages", ->
- done = false
+ it "should successfully receive messages with arrays in them", ->
+ passed = false
WebActors.spawn ->
worker_id = spawn_helper 'spawnWorker', ->
@@ -35,8 +47,63 @@ describe "Workers", ->
WebActors.send worker_id, [WebActors.self()]
- WebActors.receive ANY, (result) ->
- expect(result).toEqual("it works!")
- done = true
+ WebActors.receive "it works!", ->
+ passed = true
- waitsFor -> done
+ waitsFor -> passed
+
+describe "WebActors.terminateWorker", ->
+ it "should forcibly kill an idle worker", ->
+ passed = false
+
+ WebActors.spawn ->
+ WebActors.trapKill (killer_id, reason) -> [killer_id, reason]
+ worker_id = spawn_helper 'spawnLinkedWorker', ->
+ # render unkillable through normal means
+ WebActors.trapKill ->
+ WebActors.receive "beef", ->
+ WebActors.terminateWorker worker_id
+ WebActors.receive [worker_id, ANY], ->
+ passed = true
+
+ waitsFor -> passed
+
+ xit "should forcibly kill a worker stuck in a loop", ->
+ passed = false
+
+ WebActors.spawn ->
+ WebActors.trapKill (killer_id, reason) -> [killer_id, reason]
+ worker_id = spawn_helper 'spawnLinkedWorker', ->
+ null while true
+ WebActors.terminateWorker worker_id
+ WebActors.receive [worker_id, ANY], ->
+ passed = true
+
+ waitsFor -> passed
+
+ it "should properly kill other actors in the worker too", ->
+ passed = false
+
+ WebActors.spawn ->
+ WebActors.trapKill (killer_id, reason) -> [killer_id, reason]
+
+ worker_id = spawn_helper 'spawnWorker', ->
+ WebActors.receive WebActors.ANY, (parent_id) ->
+ WebActors.spawn ->
+ WebActors.send parent_id, WebActors.self()
+ WebActors.receive "ready", ->
+ WebActors.send parent_id, "go"
+ WebActors.receive "beef", ->
+ WebActors.receive "beef", ->
+
+ WebActors.send worker_id, WebActors.self()
+
+ WebActors.receive ANY, (grandchild_id) ->
+ WebActors.link grandchild_id
+ WebActors.send grandchild_id, "ready"
+ WebActors.receive "go", ->
+ WebActors.terminateWorker worker_id
+ WebActors.receive [grandchild_id, ANY], ->
+ passed = true
+
+ waitsFor -> passed
View
45 src/Actor.coffee
@@ -36,7 +36,7 @@ class DeadActor
unlink: (actor_id) ->
send: (message) ->
- console.error("Discarding message to actor #{@actor_id}")
+ reportError("Discarding message to actor #{@actor_id}")
kill: (killer_id, reason) ->
@@ -121,7 +121,7 @@ class LocalActor
cont.call(actor.state)
catch e
message = "Actor #{actor.actor_id}: #{e}"
- console.error(message)
+ reportError(message)
reason = e
finally
current_actor = NULL_ACTOR
@@ -138,16 +138,33 @@ class ForwardingActor
constructor: (@actor_id, @callback) ->
send: (message) ->
- @callback {target_id:@actor_id, event_name:"send", message:message}
+ event =
+ target_id: @actor_id
+ event_name: "send"
+ message: message
+ @callback event
link: (other_id) ->
- @callback {target_id:@actor_id, event_name:"link", peer_id:other_id}
+ event =
+ target_id: @actor_id
+ event_name: "link"
+ peer_id: other_id
+ @callback event
unlink: (other_id) ->
- @callback {target_id:@actor_id, event_name:"unlink", peer_id:other_id}
+ event =
+ target_id: @actor_id
+ event_name: "unlink"
+ peer_id: other_id
+ @callback event
kill: (killer_id, reason) ->
- @callback {target_id:@actor_id, event_name:"kill", killer_id:killer_id, reason:reason}
+ event =
+ target_id: @actor_id
+ event_name: "kill"
+ killer_id: killer_id
+ reason: reason
+ @callback event
next_actor_serial = 0
actors_by_id = {}
@@ -268,6 +285,19 @@ injectEvent = (event) ->
target.kill(event.killer_id, event.reason)
undefined
+error_handler = (message) ->
+ console.error(message)
+
+reportError = (message) ->
+ try
+ error_handler(message)
+ undefined
+ catch e
+ undefined
+
+setErrorHandler = (callback) ->
+ error_handler = callback
+
WebActors.spawn = spawn
WebActors.spawnLinked = spawnLinked
WebActors.send = send
@@ -278,6 +308,7 @@ WebActors.trapKill = trapKill
WebActors.kill = kill
WebActors.link = link
WebActors.unlink = unlink
+
WebActors.injectEvent = injectEvent
WebActors.registerGateway = registerGateway
WebActors.unregisterGateway = unregisterGateway
@@ -285,3 +316,5 @@ WebActors.setDefaultGateway = setDefaultGateway
WebActors.getLocalPrefix = getLocalPrefix
WebActors.setLocalPrefix = setLocalPrefix
WebActors.allocateChildPrefix = allocateChildPrefix
+WebActors.reportError = reportError
+WebActors.setErrorHandler = setErrorHandler
View
173 src/Worker.coffee
@@ -3,21 +3,181 @@ WebActors = if require? and exports?
else
@WebActors ?= {}
+class LinkMap
+ constructor: ->
+ @links = {}
+ @link_counts = {}
+
+ _link: (a, b) ->
+ links = @links[a]
+ unless links
+ links = {}
+ @links[a] = links
+ unless links[b]
+ links[b] = true
+ link_count = (@link_counts[a] or 0) + 1
+ @link_counts[a] = link_count
+
+ _unlink: (a, b) ->
+ links = @links[a]
+ if links and links[b]
+ delete links[b]
+ link_count = @link_counts[a] - 1
+ if link_count is 0
+ delete @links[a]
+ delete @link_counts[a]
+ else
+ @link_counts[a] = 0
+
+ link: (local_id, peer_id) ->
+ @_link(local_id, peer_id)
+ @_link(peer_id, local_id)
+
+ unlink: (local_id, peer_id) ->
+ @_unlink(local_id, peer_id)
+ @_unlink(peer_id, local_id)
+
+ has_links: (actor_id) ->
+ !!@links[actor_id]
+
+ remove: (actor_id) ->
+ links = @links[actor_id]
+ if links
+ delete @links[actor_id]
+ delete @link_counts[actor_id]
+ for peer_id, flag of links
+ @_unlink(peer_id, actor_id)
+
+synthesize_kill = (target_id, killer_id, reason) ->
+ event =
+ target_id: target_id
+ event_name: "kill"
+ killer_id: killer_id
+ reason: reason
+ WebActors.injectEvent event
+
+monitors_by_worker = {}
+
spawnWorker = (script_url) ->
worker_prefix = WebActors.allocateChildPrefix("worker")
worker_id = "#{worker_prefix}:0"
worker = new Worker(script_url)
- worker.postMessage(worker_prefix)
+ # launch a monitor to handle termination and cleanup
+ monitor_id = WebActors.spawn ->
+ WebActors.trapKill (killer_id, reason) ->
+ ["exited", killer_id]
+ WebActors.link worker_id
+
+ worker_links = new LinkMap()
+
+ track_link = (a, b) ->
+ unless worker_links.has_links(a)
+ WebActors.link a
+ unless worker_links.has_links(b)
+ WebActors.link b
+ worker_links.link(a, b)
+
+ track_unlink = (a, b) ->
+ worker_links.unlink(a, b)
+ unless worker_links.has_links(a)
+ WebActors.unlink a
+ unless worker_links.has_links(b)
+ WebActors.unlink b
+
+ track_link_events = (event) ->
+ if event.event_name is "link"
+ track_link(event.target_id, event.peer_id)
+ else if event.event_name is "unlink"
+ track_unlink(event.target_id, event.peer_id)
+
+ track_exit = (actor_id) ->
+ worker_links.remove(actor_id)
+ WebActors.unlink actor_id
+
+ # catch up on outstanding events, then synthesize kills and exit
+ termination_loop = ->
+ WebActors.receive ["from_worker", WebActors.ANY], (m) ->
+ event = m[1]
+ if event.event_name is "worker.error"
+ WebActors.reportError(event.message)
+ else
+ track_link_events(event)
+ WebActors.injectEvent(event)
+
+ WebActors.receive ["to_worker", WebActors.ANY], (m) ->
+ event = m[1]
+ track_link_events(event)
+ termination_loop()
+
+ WebActors.receive "terminated", ->
+ for actor_id, links of worker_links.links
+ for peer_id, flag of links
+ synthesize_kill(peer_id, actor_id, null)
+
+ # track outstanding links to actors in worker
+ monitor_loop = ->
+ WebActors.receive "terminate", ->
+ delete monitors_by_worker[worker_id]
+
+ # shut down routing to/from the worker
+ WebActors.unregisterGateway worker_prefix
+
+ # terminate the worker and enter cleanup phase
+ worker.terminate()
+ WebActors.sendSelf "terminated"
+ termination_loop()
+
+ WebActors.receive ["from_worker", WebActors.ANY], (m) ->
+ event = m[1]
+ if event.event_name is "worker.error"
+ WebActors.reportError(event.message)
+ else
+ track_link_events(event)
+ WebActors.injectEvent(event)
+ monitor_loop()
+
+ WebActors.receive ["to_worker", WebActors.ANY], (m) ->
+ event = m[1]
+ track_link_events(event)
+ worker.postMessage(event)
+ monitor_loop()
+
+ WebActors.receive ["exited", WebActors.ANY], (m) ->
+ actor_id = m[1]
+ track_exit(actor_id)
+ if actor_id is worker_id
+ WebActors.sendSelf "terminate"
+ monitor_loop()
+
+ monitor_loop()
+
+ monitors_by_worker[worker_id] = monitor_id
+
+ # set up event routing to/from the worker via the monitor
worker.onmessage = (event) ->
- WebActors.injectEvent(event.data)
+ WebActors.send monitor_id, ["from_worker", event.data]
WebActors.registerGateway worker_prefix, (event) ->
- worker.postMessage(event)
+ WebActors.send monitor_id, ["to_worker", event]
+
+ # kick off the worker
+ worker.postMessage(worker_prefix)
worker_id
+spawnLinkedWorker = (script_url) ->
+ worker_id = spawnWorker(script_url)
+ WebActors.link(worker_id)
+ worker_id
+
initWorker = (body) ->
+ WebActors.setErrorHandler (message) ->
+ error_event =
+ event_name: "worker.error"
+ message: message
+ self.postMessage(error_event)
+
self.onmessage = (event) ->
local_prefix = event.data
WebActors.setLocalPrefix local_prefix
@@ -30,5 +190,12 @@ initWorker = (body) ->
WebActors.spawn ->
body.apply(this)
+terminateWorker = (worker_id) ->
+ monitor_id = monitors_by_worker[worker_id]
+ if monitor_id
+ WebActors.send monitor_id, "terminate"
+
WebActors.spawnWorker = spawnWorker
+WebActors.spawnLinkedWorker = spawnLinkedWorker
WebActors.initWorker = initWorker
+WebActors.terminateWorker = terminateWorker

0 comments on commit e27ceec

Please sign in to comment.
Something went wrong with that request. Please try again.