Permalink
Browse files

timeout for everybody

  • Loading branch information...
1 parent bf2622d commit 389fad75c4898290c6c7a6c8e1ff4e52556a2032 @jmettraux committed Jun 28, 2009
Showing with 101 additions and 8 deletions.
  1. +8 −3 TODO.txt
  2. +18 −4 lib/ruote/exp/flowexpression.rb
  3. +2 −0 lib/ruote/log/logger.rb
  4. +6 −1 lib/ruote/time/scheduler.rb
  5. +67 −0 test/functional/ft_15_timeout.rb
View
@@ -75,9 +75,6 @@ restart tests :
[ ] participants stop
-[ ] part/sub timeout (with Andrew's at)
-[ ] part/sub conditional (all expressions ? no, there's 'if')
-
[o] stalled [participant] expressions restart (apply/reply ?)
"re-apply on reload"
http://groups.google.com/group/openwferu-users/browse_thread/thread/c2aa4b53d1664d45
@@ -106,3 +103,11 @@ restart tests :
[o] wfidgen.shutdown (close wfidgen.last)
+[ ] part/sub timeout (with Andrew's at)
+[o] timeout for everybody
+[ ] __timeout__ = true ? (wi.fields)
+[ ] part/sub conditional (all expressions ? no, there's 'if')
+[ ] conditional for everybody
+
+[ ] do_apply instead of if self.class == FlowExp
+
@@ -73,12 +73,18 @@ def initialize (context, fei, parent_id, tree, variables, workitem)
@created_time = Time.now
@modified_time = @created_time
- @applied_workitem = workitem.dup
+ @applied_workitem = workitem
+
+ return if self.class == Ruote::FlowExpression
+ # do not continue if it's only a temporary expression
+
+ @applied_workitem = workitem.dup # now, we need a dup
@on_cancel = attribute(:on_cancel)
@on_error = attribute(:on_error)
consider_tag
+ consider_timeout
end
# Returns the parent expression of this expression instance.
@@ -481,16 +487,20 @@ def unpersist
#
def consider_tag
- return if self.class == Ruote::FlowExpression
- # do not consider tag if this expression is only a temp exp
-
if @tagname = attribute(:tag)
set_variable(@tagname, @fei)
wqueue.emit(:expressions, :entered_tag, :tag => @tagname, :fei => @fei)
end
end
+ def consider_timeout
+
+ if timeout = attribute(:timeout)
+ @timeout_job_id = scheduler.in(timeout, @fei, :cancel).job_id
+ end
+ end
+
VAR_PREFIX_REGEX = /^(\/*)/
# Used by lookup_variable and set_variable to extract the
@@ -520,6 +530,10 @@ def reply_to_parent (workitem)
wqueue.emit(:expressions, :left_tag, :tag => @tagname, :fei => @fei)
end
+ if @timeout_job_id
+ scheduler.unschedule(@timeout_job_id)
+ end
+
if @in_error
trigger_on_error(workitem)
View
@@ -72,6 +72,8 @@ def value_to_s (k, v)
when FlowExpression then "#{k}/#{v.fei.to_s}"
when FlowExpressionId then v.to_s
when NilClass then nil
+ when TrueClass then true
+ when FalseClass then false
else v.class
end
end
@@ -47,7 +47,7 @@ def call (rufus_job)
context = rufus_job.scheduler.options[:context]
- opts = { :fei => @fei }
+ opts = { :fei => @fei, :scheduler => true }
if @method == :reply
@@ -86,6 +86,11 @@ def at (t, fei, method)
@scheduler.at(t, :schedulable => RuoteSchedulable.new(fei, method))
end
+ def in (t, fei, method)
+
+ @scheduler.in(t, :schedulable => RuoteSchedulable.new(fei, method))
+ end
+
def unschedule (job_id)
@scheduler.unschedule(job_id)
@@ -0,0 +1,67 @@
+
+#
+# Testing Ruote (OpenWFEru)
+#
+# Sun Jun 28 16:45:57 JST 2009
+#
+
+require File.dirname(__FILE__) + '/base'
+
+require 'ruote/part/hash_participant'
+
+
+class FtTimeoutTest < Test::Unit::TestCase
+ include FunctionalBase
+
+ def test_timeout
+
+ pdef = Ruote.process_definition do
+ sequence do
+ alpha :timeout => '1s'
+ bravo
+ end
+ end
+
+ alpha = @engine.register_participant :alpha, Ruote::HashParticipant
+ bravo = @engine.register_participant :bravo, Ruote::HashParticipant
+
+ #noisy
+
+ wfid = @engine.launch(pdef)
+ sleep 1.5
+
+ assert_equal 0, alpha.size
+ assert_equal 1, bravo.size
+ assert_equal 1, logger.log.select { |e| e[2][:scheduler] == true }.size
+ assert_equal 0, @engine.scheduler.jobs.size
+ end
+
+ def test_cancel_timeout
+
+ pdef = Ruote.process_definition do
+ sequence do
+ alpha :timeout => '2s'
+ bravo
+ end
+ end
+
+ alpha = @engine.register_participant :alpha, Ruote::HashParticipant
+ bravo = @engine.register_participant :bravo, Ruote::HashParticipant
+
+ #noisy
+
+ wfid = @engine.launch(pdef)
+ sleep 1
+
+ assert_equal 1, alpha.size
+
+ @engine.cancel_expression(alpha.first.fei)
+
+ sleep 0.5
+
+ assert_equal 0, alpha.size
+ assert_equal 1, bravo.size
+ assert_equal 0, @engine.scheduler.jobs.size
+ end
+end
+

0 comments on commit 389fad7

Please sign in to comment.