Permalink
Browse files

Added worker and listener prefetch options to Cloudist. You can now p…

…ass in :worker_prefetch => 2 and it will process two jobs at once
  • Loading branch information...
1 parent a616007 commit dfc4b658bc347edfb314327524ddb921b56843d0 @ivanvanderbyl committed May 28, 2011
@@ -47,6 +47,6 @@ def process
Cloudist.signal_trap!
-Cloudist.start(:logging => false) {
+Cloudist.start(:logging => false, :worker_prefetch => 2) {
Cloudist.handle('make.sandwich').with(SandwichWorker)
}
View
@@ -34,6 +34,9 @@ class << self
thread_local_accessor :listeners, :default => []
thread_local_accessor :listener_instances, :default => {}
+ thread_local_accessor :worker_prefetch, :default => 1
+ thread_local_accessor :listener_prefetch, :default => 1
+
# Start the Cloudist loop
#
# Cloudist.start {
@@ -53,6 +56,7 @@ class << self
#
def start(options_or_connection = {}, &block)
if options_or_connection.is_a?(Hash)
+ extract_cloudist_options!(options_or_connection)
config = settings.update(options_or_connection)
AMQP.start(config) do
self.instance_eval(&block) if block_given?
@@ -63,6 +67,11 @@ def start(options_or_connection = {}, &block)
end
end
+ def extract_cloudist_options!(options)
+ self.worker_prefetch = options.delete(:worker_prefetch) || 1
+ self.listener_prefetch = options.delete(:listener_prefetch) || 1
+ end
+
def connection
AMQP.connection
end
@@ -1,6 +1,15 @@
module Cloudist
class JobQueue < Cloudist::Queues::BasicQueue
+ def initialize(queue_name, options={})
+ options[:auto_delete] = false
+ options[:nowait] = false
+
+ @prefetch = Cloudist.worker_prefetch
+ puts "Prefetch: #{@prefetch}"
+ super(queue_name, options)
+ end
+
# def initialize(queue_name, options={})
# @prefetch = 1
# # opts[:auto_delete] = false
@@ -4,7 +4,7 @@ def initialize(queue_name, options={})
options[:auto_delete] = true
options[:nowait] = false
- @prefetch = 1
+ @prefetch = Cloudist.listener_prefetch
super(queue_name, options)
end

0 comments on commit dfc4b65

Please sign in to comment.