Skip to content

Commit 3cf3be7

Browse files
committed
Add :restart! directive
as :reset but it also drops all messages
1 parent 8339106 commit 3cf3be7

File tree

3 files changed

+58
-16
lines changed

3 files changed

+58
-16
lines changed

lib/concurrent/actor/behaviour/buffer.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def process_envelope
4141
end
4242

4343
def on_event(event)
44-
if event == :terminated
44+
case event
45+
when :terminated, :restarted
4546
@buffer.each { |envelope| reject_envelope envelope }
4647
@buffer.clear
4748
end

lib/concurrent/actor/behaviour/pausing.rb

+23-15
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ def on_envelope(envelope)
2222
from_supervisor?(envelope) { resume! }
2323
when :reset!
2424
from_supervisor?(envelope) { reset! }
25-
# when :restart! TODO
26-
# from_supervisor?(envelope) { reset! }
25+
when :restart!
26+
from_supervisor?(envelope) { restart! }
2727
else
2828
if @paused
2929
@buffer << envelope
@@ -34,40 +34,48 @@ def on_envelope(envelope)
3434
end
3535
end
3636

37+
def from_supervisor?(envelope)
38+
if behaviour!(Supervised).supervisor == envelope.sender
39+
yield
40+
else
41+
false
42+
end
43+
end
44+
3745
def pause!(error = nil)
3846
@paused = true
3947
broadcast(error || :paused)
4048
true
4149
end
4250

4351
def resume!(broadcast = true)
44-
@buffer.each { |envelope| core.schedule_execution { pass envelope } }
45-
@buffer.clear
4652
@paused = false
4753
broadcast(:resumed) if broadcast
4854
true
4955
end
5056

51-
def from_supervisor?(envelope)
52-
if behaviour!(Supervised).supervisor == envelope.sender
53-
yield
54-
else
55-
false
56-
end
57-
end
58-
59-
def reset!
57+
def reset!(broadcast = true)
6058
core.allocate_context
6159
core.build_context
6260
resume!(false)
63-
broadcast(:reset)
61+
broadcast(:reset) if broadcast
62+
true
63+
end
64+
65+
def restart!
66+
reset! false
67+
broadcast(:restarted)
6468
true
6569
end
6670

6771
def on_event(event)
68-
if event == :terminated
72+
case event
73+
when :terminated, :restarted
6974
@buffer.each { |envelope| reject_envelope envelope }
7075
@buffer.clear
76+
when :resumed, :reset
77+
@buffer.each { |envelope| core.schedule_execution { pass envelope } }
78+
@buffer.clear
7179
end
7280
super event
7381
end

spec/concurrent/actor_spec.rb

+33
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,39 @@ def on_message(message)
336336
expect(queue.pop).to eq :init # rebuilds context
337337
expect(queue.pop).to eq :reset
338338
terminate_actors test
339+
340+
queue = Queue.new
341+
resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, args|
342+
if Behaviour::Supervising == c
343+
[c, [:restart!, :one_for_one]]
344+
else
345+
[c, args]
346+
end
347+
end
348+
349+
test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do
350+
actor = AdHoc.spawn name: :pausing,
351+
behaviour_definition: Behaviour.restarting_behaviour_definition do
352+
queue << :init
353+
-> m { m == :add ? 1 : pass }
354+
end
355+
356+
actor << :supervise
357+
queue << actor.ask!(:supervisor)
358+
actor << nil
359+
queue << actor.ask(:add)
360+
361+
-> m do
362+
queue << m
363+
end
364+
end
365+
366+
expect(queue.pop).to eq :init
367+
expect(queue.pop).to eq test
368+
expect(queue.pop.wait.reason).to be_a_kind_of(ActorTerminated)
369+
expect(queue.pop).to eq :init
370+
expect(queue.pop).to eq :restarted
371+
terminate_actors test
339372
end
340373

341374
end

0 commit comments

Comments
 (0)