Skip to content

Commit

Permalink
Participant #timers, #rtimers and #rtimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
jmettraux committed Sep 12, 2011
1 parent d4ec728 commit edd797e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.txt
Expand Up @@ -4,6 +4,7 @@

== ruote - 2.2.1 not yet released

- participant: #rtimers, #rtimeout and #timers
- lose and forget expressions: aligned 'multi' behaviour
- concurrence and citerator: :merge_type => 'ignore'
- :timers error, redo/retry, undo/pass and commands
Expand Down
38 changes: 22 additions & 16 deletions lib/ruote/exp/fe_participant.rb
Expand Up @@ -154,7 +154,7 @@ def apply
h.applied_workitem['fields']['params']['__children__'] = dsub(tree.last)
end

schedule_timers(h.participant)
consider_participant_timers(h.participant)

persist_or_raise

Expand Down Expand Up @@ -230,28 +230,34 @@ def do_dispatched(msg)
# Note that process definition timeout has priority over participant
# specified timeout.
#
def schedule_timers(p_info)
def consider_participant_timers(p_info)

return if h.has_timers
# process definition takes precedence over participant defined timers.

# TODO timers/rtimers ?
timers = nil

pa = @context.plist.instantiate(p_info, :if_respond_to? => :rtimeout)
[ :rtimers, :timers, :rtimeout ].each do |meth|

timeout = Ruote.participant_send(
pa, :rtimeout, 'workitem' => Ruote::Workitem.new(h.applied_workitem)
) if pa
pa = @context.plist.instantiate(p_info, :if_respond_to? => meth)

next unless pa

timers = Ruote.participant_send(
pa, meth, 'workitem' => Ruote::Workitem.new(h.applied_workitem))

break if timers
end

return unless timers

return unless timeout
timers = if timers.index(':')
timers.split(/,/)
else
[ "#{timers}: timeout" ]
end

(h.timers ||= []) << [
@context.storage.put_schedule(
'at',
h.fei,
timeout,
{ 'action' => 'cancel', 'fei' => h.fei, 'flavour' => 'timeout' }),
'timeout'
]
schedule_timers(timers)
end

def do_pause(msg)
Expand Down
13 changes: 11 additions & 2 deletions lib/ruote/exp/flow_expression.rb
Expand Up @@ -892,10 +892,19 @@ def consider_timers
timers = (attribute(:timers) || '').split(/,/)

if to = attribute(:timeout)
to = to.strip
timers << "#{to}: timeout" unless to == ''
timers << "#{to}: timeout" unless to.strip == ''
end

schedule_timers(timers)
end

# Used by #consider_timers and
# ParticipantExpression#consider_participant_timers
#
# Takes care of registering the timers/timeout for an expression.
#
def schedule_timers(timers)

timers.each do |t|

after, action = if t.is_a?(String)
Expand Down
31 changes: 31 additions & 0 deletions test/functional/ft_65_timers.rb
Expand Up @@ -268,5 +268,36 @@ def test_jump_and_other_commands

@engine.wait_for(:charly)
end

class MyParticipant
include Ruote::LocalParticipant

def consume(workitem)
# do nothing
end

def rtimers(workitem)
"2d: reminder"
end
end

def test_participant_defined_timers

@engine.register 'alpha', MyParticipant

#@engine.noisy = true

wfid = @engine.launch(Ruote.define do
alpha
end)

@engine.wait_for(:alpha)

alpha = @engine.ps(wfid).expressions.last

pp alpha.h.timers.size

assert_equal 1, alpha.h.timers.size
end
end

2 comments on commit edd797e

@peterpunk
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timers are awesome, I was implementing this kind of pattern manually, now is simpler, waiting for the release ;-) thanks!

@jmettraux
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks ! I have to hurry up with the release

Please sign in to comment.