Skip to content

Commit 838c4dc

Browse files
committed
Add atomic linking
`AnActor.spawn name: 'an', link: true` will be linked before any message is received.
1 parent c3efc2f commit 838c4dc

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

lib/concurrent/actor/core.rb

+16-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class Core
3333
# @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour
3434
# @option opts [Array<Object>] args arguments for actor_class instantiation
3535
# @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool`
36+
# @option opts [true, false] link, atomically link the actor to its parent
3637
# @option opts [IVar, nil] initialized, if present it'll be set or failed after {Context} initialization
3738
# @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params,
3839
# can be used to hook actor instance to any logging system
@@ -69,11 +70,18 @@ def initialize(opts = {}, &block)
6970
args = opts.fetch(:args, [])
7071
initialized = Type! opts[:initialized], IVar, NilClass
7172

73+
messages = []
74+
messages << :link if opts[:link]
75+
7276
schedule_execution do
7377
begin
7478
@context.send :initialize_core, self
7579
@context.send :initialize, *args, &block
7680

81+
messages.each do |message|
82+
handle_envelope Envelope.new(message, nil, parent, reference)
83+
end
84+
7785
initialized.set true if initialized
7886
rescue => ex
7987
log ERROR, ex
@@ -117,10 +125,7 @@ def remove_child(child)
117125
# can be called from other alternative Reference implementations
118126
# @param [Envelope] envelope
119127
def on_envelope(envelope)
120-
schedule_execution do
121-
log DEBUG, "received #{envelope.message.inspect} from #{envelope.sender}"
122-
@first_behaviour.on_envelope envelope
123-
end
128+
schedule_execution { handle_envelope envelope }
124129
nil
125130
end
126131

@@ -178,6 +183,13 @@ def behaviour(klass)
178183
def behaviour!(klass)
179184
@behaviours.fetch klass
180185
end
186+
187+
private
188+
189+
def handle_envelope(envelope)
190+
log DEBUG, "received #{envelope.message.inspect} from #{envelope.sender}"
191+
@first_behaviour.on_envelope envelope
192+
end
181193
end
182194
end
183195
end

spec/concurrent/actor_spec.rb

+19-1
Original file line numberDiff line numberDiff line change
@@ -246,15 +246,33 @@ def on_message(message)
246246

247247
it 'links' do
248248
queue = Queue.new
249-
failure = AdHoc.spawn(:failure) { -> m { m } }
249+
failure = nil
250250
# failure = AdHoc.spawn(:failure) { -> m { terminate! } } # FIXME this leads to weird message processing ordering
251251
monitor = AdHoc.spawn(:monitor) do
252+
failure = AdHoc.spawn(:failure) { -> m { m } }
252253
failure << :link
253254
-> m { queue << [m, envelope.sender] }
254255
end
255256
failure << :hehe
256257
failure << :terminate!
257258
expect(queue.pop).to eq [:terminated, failure]
259+
260+
terminate_actors monitor
261+
end
262+
263+
it 'links atomically' do
264+
queue = Queue.new
265+
failure = nil
266+
monitor = AdHoc.spawn(:monitor) do
267+
failure = AdHoc.spawn(name: :failure, link: true) { -> m { m } }
268+
-> m { queue << [m, envelope.sender] }
269+
end
270+
271+
failure << :hehe
272+
failure << :terminate!
273+
expect(queue.pop).to eq [:terminated, failure]
274+
275+
terminate_actors monitor
258276
end
259277

260278
end

0 commit comments

Comments
 (0)