Skip to content

Commit

Permalink
Abstract out job persistence, implement in-memory and redis-backed.
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Apr 17, 2011
1 parent cbe84b7 commit 49141f1
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 11 deletions.
1 change: 0 additions & 1 deletion TODO.md
@@ -1,7 +1,6 @@
TODO
===============

- job persistence (push to redis list)
- clean shutdown (drain queues)
- web admin UI to surface status() metrics
- nicer project homepage
3 changes: 2 additions & 1 deletion lib/girl_friday.rb
Expand Up @@ -9,4 +9,5 @@

require 'girl_friday/version'
require 'girl_friday/work_queue'
require 'girl_friday/error_handler'
require 'girl_friday/error_handler'
require 'girl_friday/persistence'
8 changes: 5 additions & 3 deletions lib/girl_friday/persistence.rb
Expand Up @@ -27,12 +27,14 @@ def initialize(name, options)
end

def push(work)
redis.rpush(@key)
val = Marshal.dump(work)
redis.rpush(@key, val)
end
alias_method :<<, :push

def pop
redis.lpop(@key)
val = redis.lpop(@key)
Marshal.load(val) if val
end

def size
Expand All @@ -42,7 +44,7 @@ def size
private

def redis
@redis ||= Redis.new(*@opts)
@redis ||= ::Redis.new(*@opts)
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions lib/girl_friday/work_queue.rb
Expand Up @@ -16,10 +16,10 @@ def initialize(name, options={}, &block)
@error_handler = (options[:error_handler] || ErrorHandler.default).new

@ready_workers = []
@extra_work = []
@busy_workers = []
@started_at = Time.now.to_i
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Persistence::InMemory).new(name, (options[:store_config] || []))
start
end

Expand All @@ -35,7 +35,7 @@ def status
:pool_size => @size,
:ready => @ready_workers.size,
:busy => @busy_workers.size,
:backlog => @extra_work.size,
:backlog => @persister.size,
:total_queued => @total_queued,
:total_processed => @total_processed,
:total_errors => @total_errors,
Expand Down Expand Up @@ -70,9 +70,9 @@ def start
Actor.receive do |f|
f.when(Ready) do |who|
@total_processed += 1
if work = @extra_work.pop
if work = @persister.pop
who.this << work
drain(@ready_workers, @extra_work)
drain(@ready_workers, @persister)
else
@busy_workers.delete(who.this)
@ready_workers << who.this
Expand All @@ -82,9 +82,9 @@ def start
if worker = @ready_workers.pop
@busy_workers << worker
worker << work
drain(@ready_workers, @extra_work)
drain(@ready_workers, @persister)
else
@extra_work << work
@persister << work
end
end
f.when(Actor::DeadActorError) do |exit|
Expand Down
31 changes: 31 additions & 0 deletions test/test_girl_friday.rb
Expand Up @@ -77,4 +77,35 @@ def test_should_provide_status
assert(metrics[:total_processed] > 0)
end
end

def test_should_persist_with_redis
begin
require 'redis'
redis = Redis.new
redis.flushdb
rescue LoadError
return puts 'Skipping redis test, "redis" gem not found'
rescue Errno::ECONNREFUSED
return puts 'Skipping redis test, not running locally'
end

mutex = Mutex.new
total = 100
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end

async_test do |cb|
queue = GirlFriday::WorkQueue.new('test', :size => 2, :store => GirlFriday::Persistence::Redis) do |msg|
incr.call
cb.call if count == total
end
total.times do
queue.push(:text => 'foo')
end
end
end
end

0 comments on commit 49141f1

Please sign in to comment.