Skip to content

Commit

Permalink
minor code changes to enhance readability
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanlecompte committed Sep 26, 2011
1 parent 5c025f4 commit f99d969
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -4,3 +4,5 @@ Gemfile.lock
pkg/*
rbxdb/
*.rdb
.idea/*

2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -36,7 +36,7 @@ In your Rails app, create a `config/initializers/girl_friday.rb` which defines y
:size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`.

In order to use the Redis backend, you must use a connection pool to share a set of Redis connections with
other threads and GirlFriday queues using the `connection\_pool` gem:
other threads and GirlFriday queues using the `connection_pool` gem:

require 'connection_pool'

Expand Down
10 changes: 4 additions & 6 deletions lib/girl_friday/persistence.rb
Expand Up @@ -23,25 +23,23 @@ def size
class Redis
def initialize(name, options)
@opts = options
unless @opts[:pool]
raise ArgumentError, "you must pass in a :pool"
end
raise ArgumentError, "you must pass in a :pool" unless @opts[:pool]
@key = "girl_friday-#{name}-#{environment}"
end

def push(work)
val = Marshal.dump(work)
redis{ |r| r.rpush(@key, val) }
redis { |r| r.rpush(@key, val) }
end
alias_method :<<, :push

def pop
val = redis{ |r| r.lpop(@key) }
val = redis { |r| r.lpop(@key) }
Marshal.load(val) if val
end

def size
redis.llen(@key)
redis { |r| r.llen(@key) }
end

private
Expand Down
48 changes: 24 additions & 24 deletions lib/girl_friday/work_queue.rb
Expand Up @@ -70,60 +70,61 @@ def wait_for_empty
end
end

def shutdown
def shutdown(&block)
# Runtime state should never be modified by caller thread,
# only the Supervisor thread.
@supervisor << Shutdown[block_given? ? Proc.new : nil]
@supervisor << Shutdown[block]
end

private

def running?
!@shutdown
end

def handle_error(ex)
# Redis network error? Log and ignore.
@error_handlers.each { |handler| handler.handle(ex) }
end

def on_ready(who)
@total_processed += 1
if !@shutdown && work = @persister.pop
if running? && work = @persister.pop
who.this << work
drain
else
@busy_workers.delete(who.this)
ready_workers << who.this
end
rescue => ex
# Redis network error? Log and ignore.
@error_handlers.each { |handler| handler.handle(ex) }
handle_error(ex)
end

def shutdown_complete
begin
@when_shutdown.call(self) if @when_shutdown
rescue Exception => ex
@error_handlers.each { |handler| handler.handle(ex) }
handle_error(ex)
end
end

def on_work(work)
@total_queued += 1

if !@shutdown && worker = ready_workers.pop
if running? && worker = ready_workers.pop
@busy_workers << worker
worker << work
drain
else
@persister << work
end
rescue => ex
# Redis network error? Log and ignore.
@error_handlers.each { |handler| handler.handle(ex) }
handle_error(ex)
end

def ready_workers
@ready_workers ||= begin
workers = []
@size.times do
# start N workers
workers << Actor.spawn_link(&@work_loop)
end
workers
end
# start N workers
@ready_workers ||= Array.new(@size) { Actor.spawn_link(&@work_loop) }
end

def start
Expand All @@ -132,9 +133,9 @@ def start
supervisor = Actor.current
@work_loop = Proc.new do
Thread.current[:label] = "#{name}-worker"
while !@shutdown do
while running? do
work = Actor.receive
if !@shutdown
if running?
result = @processor.call(work.msg)
work.callback.call(result) if work.callback
supervisor << Ready[Actor.current]
Expand All @@ -155,8 +156,7 @@ def start

def drain
# give as much work to as many ready workers as possible
ps = @persister.size
todo = ready_workers.size < ps ? ready_workers.size : ps
todo = [@persister.size, ready_workers.size].min
todo.times do
worker = ready_workers.pop
@busy_workers << worker
Expand All @@ -183,18 +183,18 @@ def supervisor_loop
return
end
f.when(Actor::DeadActorError) do |ex|
if !@shutdown
if running?
# TODO Provide current message contents as error context
@total_errors += 1
@busy_workers.delete(ex.actor)
ready_workers << Actor.spawn_link(&@work_loop)
@error_handlers.each { |handler| handler.handle(ex.reason) }
handle_error(ex.reason)
end
end
end
end
end

end

Queue = WorkQueue
end

0 comments on commit f99d969

Please sign in to comment.