-
Notifications
You must be signed in to change notification settings - Fork 3
/
actor.rb
314 lines (255 loc) · 8.73 KB
/
actor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#--
# Copyright (C)2007 Tony Arcieri
# You can redistribute this under the terms of the Ruby license
# See file LICENSE for details
#++
require File.dirname(__FILE__) + '/../revactor'
require 'fiber'
class ActorError < StandardError; end
# Actors are lightweight concurrency primitives which communiucate via message
# passing. Each actor has a mailbox which it scans for matching messages.
# An actor sleeps until it receives a message, at which time it scans messages
# against its filter set and then executes appropriate callbacks.
#
# The Actor class is definined in the global scope in hopes of being generally
# useful for Ruby 1.9 users while also attempting to be as compatible as
# possible with the Rubinius Actor implementation. In this way it should
# be possible to run programs written using Rev on top of Rubinius and hopefully
# get some better performance.
#
# Rev Actor implements some features that Rubinius does not, however, such as
# receive timeouts, receive filter-by-proc, arguments passed to spawn, and an
# actor dictionary (used for networking functionality). Hopefully these
# additional features will not get in the way of Rubinius / Rev compatibility.
#
class Actor < Fiber
include Enumerable
@@registered = {}
class << self
include Enumerable
# Create a new Actor with the given block and arguments
def new(*args, &block)
raise ArgumentError, "no block given" unless block
actor = super() do
block.call(*args)
Actor.current.instance_eval { @_dead = true }
end
# For whatever reason #initialize is never called in subclasses of Fiber
actor.instance_eval do
@_dead = false
@_mailbox = Mailbox.new
@_dictionary = {}
end
Scheduler << actor
actor
end
alias_method :spawn, :new
# This will be defined differently in the future, but now the two are the same
alias_method :start, :new
# Obtain a handle to the current Actor
def current
actor = super
raise ActorError, "current fiber is not an actor" unless actor.is_a? Actor
actor
end
# Wait for messages matching a given filter. The filter object is yielded
# to be block passed to receive. You can then invoke the when argument
# which takes a parameter and a block. Messages are compared (using ===)
# against the parameter, or if the parameter is a proc it is called with
# a message and matches if the proc returns true.
#
# The first filter to match a message in the mailbox is executed. If no
# filters match then the actor sleeps.
def receive(&filter)
unless current.is_a?(Actor)
raise ActorError, "receive must be called in the context of an Actor"
end
current.__send__(:_mailbox).receive(&filter)
end
# Look up an actor in the global dictionary
def [](key)
@@registered[key]
end
# Register this actor in the global dictionary
def []=(key, actor)
unless actor.is_a?(Actor)
raise ArgumentError, "only actors may be registered"
end
@@registered[key] = actor
end
# Delete an actor from the global dictionary
def delete(key, &block)
@@registered.delete(key, &block)
end
# Iterate over the actors in the global dictionary
def each(&block)
@@registered.each(&block)
end
end
# Look up value in the actor's dictionary
def [](key)
@_dictionary[key]
end
# Store a value in the actor's dictionary
def []=(key, value)
@_dictionary[key] = value
end
# Delete a value from the actor's dictionary
def delete(key, &block)
@_dictionary.delete(key, &block)
end
# Iterate over values in the actor's dictionary
def each(&block)
@_dictionary.each(&block)
end
# Is the current actor dead?
def dead?; @_dead; end
# Send a message to an actor
def <<(message)
# Erlang discards messages sent to dead actors, and if Erlang does it,
# it must be the right thing to do, right? Hooray for the Erlang
# cargo cult! I think they do this because dealing with errors raised
# from dead actors complicates overall error handling too much to be worth it.
return message if dead?
@_mailbox << message
Scheduler << self
message
end
alias_method :send, :<<
#########
protected
#########
attr_reader :_mailbox
# Actor scheduler class, maintains a run queue of actors with outstanding
# messages who have not yet processed their mailbox. If all actors have
# processed their mailboxes then the scheduler waits for any outstanding
# Rev events. If there are no active Rev watchers then the scheduler exits.
class Scheduler
@@queue = []
@@running = false
class << self
def <<(actor)
@@queue << actor
run unless @@running
end
def run
return if @@running
@@running = true
default_loop = Rev::Loop.default
until @@queue.empty? and default_loop.watchers.empty?
@@queue.each do |actor|
begin
actor.resume
rescue FiberError # Fiber may have died since being scheduled
end
end
@@queue.clear
default_loop.run_once unless default_loop.watchers.empty?
end
@@running = false
end
end
end
# Actor mailbox. For purposes of efficiency the mailbox also handles
# suspending and resuming an actor when no messages match its filter set.
class Mailbox
attr_accessor :timer
attr_accessor :timed_out
attr_accessor :timeout_action
def initialize
@timer = nil
@queue = []
end
# Add a message to the mailbox queue
def <<(message)
@queue << message
end
# Attempt to receive a message
def receive
raise ArgumentError, "no filter block given" unless block_given?
# Clear mailbox processing variables
action = matched_index = matched_message = nil
processed_upto = 0
# Clear timeout variables
@timed_out = false
@timeout_action = nil
# Build the filter
filter = Filter.new(self)
yield filter
raise ArgumentError, "empty filter" if filter.empty?
# Process incoming messages
while action.nil?
@queue[processed_upto..@queue.size].each_with_index do |message, index|
processed_upto += 1
next unless (action = filter.match message)
# We've found a matching action, so break out of the loop
matched_index = index
matched_message = message
break
end
# If we've timed out, run the timeout action unless another has been found
action ||= @timeout_action if @timed_out
# If we didn't find a matching action, yield until we get another message
Actor.yield unless action
end
if @timer
@timer.detach if @timer.attached?
@timer = nil
end
# If we encountered a timeout, call the action directly
return action.call if @timed_out
# Otherwise we matched a message, so process it with the action
@queue.delete_at matched_index
return action.(matched_message)
end
# Timeout class, used to implement receive timeouts
class Timer < Rev::TimerWatcher
def initialize(timeout, actor)
@actor = actor
super(timeout)
end
def on_timer
@actor.instance_eval { @_mailbox.timed_out = true }
Scheduler << @actor
end
end
# Mailbox filterset. Takes patterns or procs to match messages with
# and returns the associated proc when a pattern matches.
class Filter
def initialize(mailbox)
@mailbox = mailbox
@ruleset = []
end
def when(pattern, &action)
raise ArgumentError, "no block given" unless action
@ruleset << [pattern, action]
end
def after(timeout, &action)
raise ArgumentError, "timeout already specified" if @mailbox.timer
raise ArgumentError, "must be zero or positive" if timeout < 0
@mailbox.timeout_action = action
if timeout > 0
@mailbox.timer = Timer.new(timeout, Actor.current).attach(Rev::Loop.default)
else
# No need to actually set a timer if the timeout is zero,
# just short-circuit waiting for one entirely...
@timed_out = true
Scheduler << self
end
end
def match(message)
_, action = @ruleset.find do |pattern, _|
if pattern.is_a? Proc
pattern.(message)
else
pattern === message
end
end
action
end
def empty?
@ruleset.empty?
end
end
end
end