Skip to content

Commit

Permalink
mostly complete RevParticipant impl
Browse files Browse the repository at this point in the history
Next steps : think about => and ~> ... The resolution scheme now is place is very absolute...
  • Loading branch information
jmettraux committed Apr 9, 2011
1 parent ced75bf commit 503d607
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 24 deletions.
7 changes: 2 additions & 5 deletions lib/ruote/exp/fe_participant.rb
Expand Up @@ -146,17 +146,14 @@ def apply
"no participant named #{h.participant_name.inspect}")
) if participant_info.nil?

#
# participant found, consider timeout

schedule_timeout(participant_info)

#
# dispatch to participant

h.applied_workitem['participant_name'] = h.participant_name
h.applied_workitem['fields']['params'] = compile_atts

schedule_timeout(participant_info)

persist_or_raise

@context.storage.put_msg(
Expand Down
59 changes: 45 additions & 14 deletions lib/ruote/part/rev_participant.rb
Expand Up @@ -31,12 +31,14 @@
module Ruote

#
# TODO
# This participant was born out of a suggestion from Jan Topiński in XXX
#
class RevParticipant

include LocalParticipant

# TODO : how to deal with >= and ~> ?

def initialize(opts=nil)

@dir = opts['dir']
Expand All @@ -46,14 +48,48 @@ def initialize(opts=nil)
) unless @dir
end

def consume(workitem)

lookup_code(workitem).consume(workitem)
end

def cancel(fei, flavour)

lookup_code(fei).cancel(fei, flavour)
end

#--
#def accept?(workitem)
# part = lookup_code(workitem)
# part.respond_to?(:accept?) ? part.accept?(workitem) : true
#end
#
# No operation : simply replies immediately to the engine.
# Can't do this at this level, since it isn't the rev_participant's
# own accept?, it has to go in lookup_code
#++

def on_reply(workitem)

part = lookup_code(workitem)
part.on_reply(workitem) if part.respond_to?(:on_reply)
end

def rtimeout(workitem)

part = lookup_code(workitem)

part.respond_to?(:rtimeout) ? part.rtimeout(workitem) : nil
end

protected

# Maybe "lookup_real_participant_code" would be a better name...
#
def consume(wi)
def lookup_code(wi_or_fei)

rev = wi.params['revision'] || wi.params['rev']
wi = wi_or_fei.is_a?(Ruote::Workitem) ? wi_or_fei : workitem(wi_or_fei)

consumed = false
rev = wi.params['revision'] || wi.params['rev']

[
[ wi.wf_name, wi.wf_revision, wi.participant_name, rev ],
Expand All @@ -72,20 +108,15 @@ def consume(wi)
part = cpart.new
part.context = @context

part.consume(wi)
consumed = true
break
next if part.respond_to?(:accept?) and (not part.accept?(wi))

return part
end

raise ArgumentError.new(
"couldn't find code for participant #{wi.participant_name} " +
"in dir #{@dir}"
) unless consumed
end

def cancel(fei, flavour)

# TODO
)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ruote/workitem.rb
Expand Up @@ -251,7 +251,7 @@ def error
#
def params

@h['fields']['params']
@h['fields']['params'] || {}
end

# (advanced)
Expand Down
195 changes: 191 additions & 4 deletions test/functional/ft_57_rev_participant.rb
Expand Up @@ -16,11 +16,9 @@
class FtRevParticipantTest < Test::Unit::TestCase
include FunctionalBase

def test_x
def test_consume

dir = [
'rev_participant', $$, Time.now.to_f
].collect { |e| e.to_s }.join('_')
dir = compute_dir_name

FileUtils.mkdir(dir)

Expand Down Expand Up @@ -61,5 +59,194 @@ def consume(workitem)
ensure
FileUtils.rm_rf(dir)
end

def test_cancel

dir = compute_dir_name

FileUtils.mkdir(dir)

$seen = false

File.open(dir + '/alpha.rb', 'wb') do |f|
f.write(%{
def consume(workitem)
# do nothing
end
def cancel(fei, flavour)
$seen = true
end
})
end

@engine.register do
catchall Ruote::RevParticipant, :dir => dir
end

pdef = Ruote.process_definition do
alpha
end

#@engine.noisy = true

wfid = @engine.launch(pdef)

@engine.wait_for(:alpha)

assert_equal false, $seen

@engine.cancel(wfid)

@engine.wait_for(wfid)

assert_equal true, $seen

ensure
FileUtils.rm_rf(dir)
end

def test_accept

dir = compute_dir_name

FileUtils.mkdir(dir)

File.open(dir + '/alpha.rb', 'wb') do |f|
f.write(%{
def consume(workitem)
(workitem.fields['seen'] ||= []) << 'alpha'
reply_to_engine(workitem)
end
})
end
File.open(dir + '/alpha__z.rb', 'wb') do |f|
f.write(%{
def consume(workitem)
raise "I should never get raised !"
end
def accept?(workitem)
false
end
})
end

@engine.register do
catchall Ruote::RevParticipant, :dir => dir
end

pdef = Ruote.process_definition do
alpha :rev => 'z'
end

#@engine.noisy = true

wfid = @engine.launch(pdef)

r = @engine.wait_for(wfid)

assert_equal %w[ alpha ], r['workitem']['fields']['seen']

ensure
FileUtils.rm_rf(dir)
end

def test_on_reply

dir = compute_dir_name

FileUtils.mkdir(dir)

File.open(dir + '/alpha.rb', 'wb') do |f|
f.write(%{
def consume(workitem)
reply_to_engine(workitem)
end
def on_reply(workitem)
(workitem.fields['seen'] ||= []) << 'on_reply'
end
})
end

@engine.register do
catchall Ruote::RevParticipant, :dir => dir
end

pdef = Ruote.process_definition do
alpha
end

#@engine.noisy = true

wfid = @engine.launch(pdef)

r = @engine.wait_for(wfid)

assert_equal %w[ on_reply ], r['workitem']['fields']['seen']

ensure
FileUtils.rm_rf(dir)
end

def test_rtimeout

dir = compute_dir_name

FileUtils.mkdir(dir)

File.open(dir + '/alpha.rb', 'wb') do |f|
f.write(%{
def consume(workitem)
#reply_to_engine(workitem)
end
})
end
File.open(dir + '/bravo.rb', 'wb') do |f|
f.write(%{
def consume(workitem)
#reply_to_engine(workitem)
end
def rtimeout(workitem)
'2d'
end
})
end

@engine.register do
catchall Ruote::RevParticipant, :dir => dir
end

pdef = Ruote.process_definition do
alpha
bravo
end

#@engine.noisy = true

wfid = @engine.launch(pdef)

r = @engine.wait_for(:alpha)
sleep 0.350

assert_equal 0, @engine.storage.get_many('schedules').size

wi = @engine.ps(wfid).workitems.first

@engine.receive(wi)

r = @engine.wait_for(:bravo)
sleep 0.350

assert_equal 1, @engine.storage.get_many('schedules').size

ensure
FileUtils.rm_rf(dir)
end

protected

def compute_dir_name

[ 'rev_participant', $$, Time.now.to_f ].collect { |e| e.to_s }.join('_')
end
end

0 comments on commit 503d607

Please sign in to comment.