Permalink
Browse files

Merge pull request #157 from goldmann/TORQUE-959-ttl-futures

[TORQUE-959] expose the message_ttl for backgroundable futures
  • Loading branch information...
2 parents 0fe98a4 + c4c691c commit 32a6098741ac2d5dd738358d7567be7f0ec39a6a @bbrowning bbrowning committed May 2, 2013
@@ -2244,6 +2244,13 @@ Widget.background(:ttl => 1000, :persistent => false).monetize
invocation.</para>
</example>
+ <para>
+ Additionally you can define the <varname>future_ttl</varname> option
+ which sets the time (in miliseconds) to wait before moving the
+ not received future to the expiry queue. By default it's set to
+ 600000 (10 minutes).
+ </para>
+
</section>
<section id="backgroundable-message-processor-options">
@@ -193,6 +193,7 @@ def publish_message(receiver, method, args, options = { })
queue.publish( {:receiver => receiver,
:future_id => future.correlation_id,
:future_queue => queue_name,
+ :future_ttl => options[:future_ttl],
:method => method,
:args => args}, options )
@@ -30,7 +30,7 @@ def self.log_newrelic_notice(klass)
end
def on_message(hash)
- FutureResponder.new( Queue.new( hash[:future_queue] ), hash[:future_id] ).respond do
+ FutureResponder.new( Queue.new( hash[:future_queue] ), hash[:future_id], hash[:future_ttl] ).respond do
klass = hash[:receiver].class
if klass.respond_to?( :__enable_backgroundable_newrelic_tracing )
klass.__enable_backgroundable_newrelic_tracing( hash[:method] )
@@ -28,10 +28,10 @@ class FutureResponder
# @param [Integer] message_ttl The time-to-live used on messages
# to prevent them from staying in the queue indefinately if
# the result is never accessed.
- def initialize(response_queue, correlation_id, message_ttl = 600_000)
+ def initialize(response_queue, correlation_id, message_ttl = nil)
@queue = response_queue
@correlation_id = correlation_id
- @message_ttl = message_ttl
+ @message_ttl = message_ttl || 600_000
end
# Signal that processing has started.
@@ -40,7 +40,8 @@ def self.async(method, payload = {}, options = {})
:method => method,
:payload => payload,
:future_id => future.correlation_id,
- :future_queue => queue_name
+ :future_queue => queue_name,
+ :future_ttl => options[:future_ttl]
}
options[:encoding] = :marshal
queue.publish( message, options )
@@ -52,7 +53,7 @@ def self.async(method, payload = {}, options = {})
def process!(message)
hash = message.decode
- FutureResponder.new( Queue.new( hash[:future_queue] ), hash[:future_id] ).respond do
+ FutureResponder.new( Queue.new( hash[:future_queue] ), hash[:future_id], hash[:future_ttl] ).respond do
self.send hash[:method].to_sym, hash[:payload]
end
end
@@ -176,6 +176,7 @@ def some_peeking;end
:future_id => '1234',
:future_queue => "/queues/torquebox//tasks/torquebox_backgroundable",
:method => '__sync_an_async_action',
+ :future_ttl => nil,
:args => [:a, :b]
},
anything)
@@ -221,6 +222,16 @@ def some_peeking;end
@object.background.foo(1,2)
end
+ it "should use default future ttl" do
+ @queue.should_receive(:publish).with(hash_including(:future_ttl => nil), anything)
+ @object.background.foo
+ end
+
+ it "should use custom future ttl" do
+ @queue.should_receive(:publish).with(hash_including(:future_ttl => 100_000), anything)
+ @object.background(:future_ttl => 100_000).foo
+ end
+
it "should raise when given a block" do
lambda {
@object.background.foo { }
@@ -270,6 +281,16 @@ def some_peeking;end
@object.background.foo(1,2)
end
+ it "should use default future ttl" do
+ @queue.should_receive(:publish).with(hash_including(:future_ttl => nil), anything)
+ @object.background.foo
+ end
+
+ it "should use custom future ttl" do
+ @queue.should_receive(:publish).with(hash_including(:future_ttl => 100_000), anything)
+ @object.background(:future_ttl => 100_000).foo
+ end
+
it "should raise when given a block" do
lambda {
@object.background.foo { }
@@ -29,12 +29,19 @@ def respond
end
it "should send payload correctly" do
- expectation = [{:method => :payload=, :payload => {:foo => 'bar'}, :future_id => '1234', :future_queue => MyTestTask.queue_name('my_test')}, anything]
+ expectation = [{:method => :payload=, :payload => {:foo => 'bar'}, :future_id => '1234', :future_queue => MyTestTask.queue_name('my_test'), :future_ttl => nil}, anything]
@send_queue.should_receive(:publish).with(*expectation)
MyTestTask.async(:payload=, :foo => 'bar')
end
+ it "should send payload correctly with custom future ttl" do
+ expectation = [{:method => :payload=, :payload => {:foo => 'bar'}, :future_id => '1234', :future_queue => MyTestTask.queue_name('my_test'), :future_ttl => 10_000}, anything]
+ @send_queue.should_receive(:publish).with(*expectation)
+
+ MyTestTask.async(:payload=, {:foo => 'bar'}, {:future_ttl => 10_000})
+ end
+
it "should handle nil payload as empty hash" do
@send_queue.should_receive(:publish).with(hash_including(:payload => {}), anything)
@@ -3,7 +3,7 @@
class InstanceMethodsModel
include TorqueBox::Messaging::Backgroundable
- always_background :foo
+ always_background :foo, :future_ttl => 100_000
def initialize
@foreground = TorqueBox.fetch("queue/foreground")

0 comments on commit 32a6098

Please sign in to comment.