diff --git a/.gitignore b/.gitignore index 9d85e39..ad85316 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ Gemfile.lock pkg/* rbxdb/ *.rdb +.idea/* + diff --git a/README.md b/README.md index c718293..8a2db24 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ In your Rails app, create a `config/initializers/girl_friday.rb` which defines y :size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`. In order to use the Redis backend, you must use a connection pool to share a set of Redis connections with -other threads and GirlFriday queues using the `connection\_pool` gem: +other threads and GirlFriday queues using the `connection_pool` gem: require 'connection_pool' diff --git a/lib/girl_friday/persistence.rb b/lib/girl_friday/persistence.rb index 32527d3..c0816f4 100644 --- a/lib/girl_friday/persistence.rb +++ b/lib/girl_friday/persistence.rb @@ -23,25 +23,23 @@ def size class Redis def initialize(name, options) @opts = options - unless @opts[:pool] - raise ArgumentError, "you must pass in a :pool" - end + raise ArgumentError, "you must pass in a :pool" unless @opts[:pool] @key = "girl_friday-#{name}-#{environment}" end def push(work) val = Marshal.dump(work) - redis{ |r| r.rpush(@key, val) } + redis { |r| r.rpush(@key, val) } end alias_method :<<, :push def pop - val = redis{ |r| r.lpop(@key) } + val = redis { |r| r.lpop(@key) } Marshal.load(val) if val end def size - redis.llen(@key) + redis { |r| r.llen(@key) } end private diff --git a/lib/girl_friday/work_queue.rb b/lib/girl_friday/work_queue.rb index 6972d09..0378c78 100644 --- a/lib/girl_friday/work_queue.rb +++ b/lib/girl_friday/work_queue.rb @@ -70,17 +70,26 @@ def wait_for_empty end end - def shutdown + def shutdown(&block) # Runtime state should never be modified by caller thread, # only the Supervisor thread. - @supervisor << Shutdown[block_given? ? Proc.new : nil] + @supervisor << Shutdown[block] end private + def running? + !@shutdown + end + + def handle_error(ex) + # Redis network error? Log and ignore. + @error_handlers.each { |handler| handler.handle(ex) } + end + def on_ready(who) @total_processed += 1 - if !@shutdown && work = @persister.pop + if running? && work = @persister.pop who.this << work drain else @@ -88,22 +97,21 @@ def on_ready(who) ready_workers << who.this end rescue => ex - # Redis network error? Log and ignore. - @error_handlers.each { |handler| handler.handle(ex) } + handle_error(ex) end def shutdown_complete begin @when_shutdown.call(self) if @when_shutdown rescue Exception => ex - @error_handlers.each { |handler| handler.handle(ex) } + handle_error(ex) end end def on_work(work) @total_queued += 1 - if !@shutdown && worker = ready_workers.pop + if running? && worker = ready_workers.pop @busy_workers << worker worker << work drain @@ -111,19 +119,12 @@ def on_work(work) @persister << work end rescue => ex - # Redis network error? Log and ignore. - @error_handlers.each { |handler| handler.handle(ex) } + handle_error(ex) end def ready_workers - @ready_workers ||= begin - workers = [] - @size.times do - # start N workers - workers << Actor.spawn_link(&@work_loop) - end - workers - end + # start N workers + @ready_workers ||= Array.new(@size) { Actor.spawn_link(&@work_loop) } end def start @@ -132,9 +133,9 @@ def start supervisor = Actor.current @work_loop = Proc.new do Thread.current[:label] = "#{name}-worker" - while !@shutdown do + while running? do work = Actor.receive - if !@shutdown + if running? result = @processor.call(work.msg) work.callback.call(result) if work.callback supervisor << Ready[Actor.current] @@ -155,8 +156,7 @@ def start def drain # give as much work to as many ready workers as possible - ps = @persister.size - todo = ready_workers.size < ps ? ready_workers.size : ps + todo = [@persister.size, ready_workers.size].min todo.times do worker = ready_workers.pop @busy_workers << worker @@ -183,18 +183,18 @@ def supervisor_loop return end f.when(Actor::DeadActorError) do |ex| - if !@shutdown + if running? # TODO Provide current message contents as error context @total_errors += 1 @busy_workers.delete(ex.actor) ready_workers << Actor.spawn_link(&@work_loop) - @error_handlers.each { |handler| handler.handle(ex.reason) } + handle_error(ex.reason) end end end end end - end + Queue = WorkQueue end