Skip to content

Commit

Permalink
Switch over to connection_pool
Browse files Browse the repository at this point in the history
The Redis backend no longer accepts a Redis instance or connects for
you. You have to set up a connection_pool and pass it in the
store_config param.

Also, for the time being, we're removing support for Rubinius until we can get
connection_pool working, as a recent change to connection_pool
(see mperham/connection_pool@3ac11a9206) now uses BasicObject.
  • Loading branch information
jc00ke committed Sep 26, 2011
1 parent 7d6695d commit c3fe33c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 84 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Expand Up @@ -2,8 +2,7 @@ rvm:
- 1.9.2
- 1.9.3
- jruby
- rbx-2.0
branches:
only:
- master
env: JRUBY_OPTS='--1.9' RBXOPT=-X19
env: JRUBY_OPTS='--1.9'
7 changes: 7 additions & 0 deletions History.md
@@ -1,6 +1,13 @@
Changes
================

HEAD
---------

* Completely switch to connection\_pool for Redis backend.
The Redis store no longer connects for you automatically, so it's mandatory to use
connection\_pool.

0.9.5
---------

Expand Down
14 changes: 4 additions & 10 deletions README.md
Expand Up @@ -33,22 +33,16 @@ In your Rails app, create a `config/initializers/girl_friday.rb` which defines y
ImageCrawler.process(msg)
end

SCRAPE_QUEUE = GirlFriday::WorkQueue.new(:scrape_sites, :size => 4, :store => GirlFriday::Store::Redis, :store_config => [{ :host => 'host' }]) do |msg|
Page.scrape(msg)
end

TRANSCODE_QUEUE = GirlFriday::WorkQueue.new(:scrape_sites, :size => 4, :store => GirlFriday::Store::Redis, :store_config => [{ :redis => $redis }]) do |msg|
VideoProcessor.transcode(msg)
end

:size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`.

You can use a connection pool to share a set of Redis connections with
In order to use the Redis backend, you must use a connection pool to share a set of Redis connections with
other threads and GirlFriday queues using the `connection\_pool` gem:

require 'connection_pool'

redis_pool = ConnectionPool.new(:size => 5, :timeout => 5) { Redis.new }
CLEAN_FILTER_QUEUE = GirlFriday::WorkQueue.new(:clean_filter, :store => GirlFriday::Store::Redis, :store_config => [{ :redis => redis_pool}]) do |msg|

CLEAN_FILTER_QUEUE = GirlFriday::WorkQueue.new(:clean_filter, :store => GirlFriday::Store::Redis, :store_config => [{ :pool => redis_pool}]) do |msg|
Filter.clean(msg)
end

Expand Down
13 changes: 10 additions & 3 deletions lib/girl_friday/persistence.rb
Expand Up @@ -23,17 +23,20 @@ def size
class Redis
def initialize(name, options)
@opts = options
unless @opts[:pool]
raise ArgumentError, "you must pass in a :pool"
end
@key = "girl_friday-#{name}-#{environment}"
end

def push(work)
val = Marshal.dump(work)
redis.rpush(@key, val)
redis{ |r| r.rpush(@key, val) }
end
alias_method :<<, :push

def pop
val = redis.lpop(@key)
val = redis{ |r| r.lpop(@key) }
Marshal.load(val) if val
end

Expand All @@ -48,8 +51,12 @@ def environment
end

def redis
@redis ||= (@opts.delete(:redis) || ::Redis.connect(*@opts))
@redis ||= @opts.delete(:pool)
@redis.with do |pooled|
yield pooled
end
end
end

end
end
2 changes: 1 addition & 1 deletion lib/girl_friday/work_queue.rb
Expand Up @@ -18,7 +18,7 @@ def initialize(name, options={}, &block)
@ready_workers = nil
@created_at = Time.now.to_i
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || []))
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {}))
@weakref = WeakRef.new(self)
start
GirlFriday.add_queue @weakref
Expand Down
78 changes: 10 additions & 68 deletions test/test_girl_friday_queue.rb
Expand Up @@ -87,44 +87,12 @@ def test_should_provide_status
assert(metrics[:total_processed] > 0)
end

def test_should_persist_with_redis
begin
require 'redis'
redis = Redis.new
redis.flushdb
rescue LoadError
return puts "Skipping redis test, 'redis' gem not found: #{$!.message}"
rescue Errno::ECONNREFUSED
return puts 'Skipping redis test, not running locally'
end

mutex = Mutex.new
total = 100
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end

async_test(1.0) do |cb|
queue = GirlFriday::WorkQueue.new('redis', :size => 2, :store => GirlFriday::Store::Redis) do |msg|
incr.call
queue.shutdown do
cb.call
end if count == total
end
total.times do
queue.push(:text => 'foo')
end
end
end

def test_should_persist_with_redis_instance
def test_should_persist_with_redis_connection_pool
begin
require 'redis'
redis = Redis.new
redis.flushdb
require 'connection_pool'
pool = ConnectionPool.new(:size => 5, :timeout => 2){ Redis.new }
pool.flushdb
rescue LoadError
return puts "Skipping redis test, 'redis' gem not found: #{$!.message}"
rescue Errno::ECONNREFUSED
Expand All @@ -140,8 +108,8 @@ def test_should_persist_with_redis_instance
end
end

async_test(1.0) do |cb|
queue = GirlFriday::WorkQueue.new('redis-instance', :size => 2, :store => GirlFriday::Store::Redis, :store_config => [{ :redis => redis }]) do |msg|
async_test(2.0) do |cb|
queue = GirlFriday::WorkQueue.new('redis-pool', :size => 2, :store => GirlFriday::Store::Redis, :store_config => { :pool => pool }) do |msg|
incr.call
queue.shutdown do
cb.call
Expand All @@ -153,36 +121,10 @@ def test_should_persist_with_redis_instance
end
end

def test_should_persist_with_redis_connection_pool
begin
require 'redis'
require 'connection_pool'
redis = ConnectionPool.new(:size => 5, :timeout => 5){ Redis.new }
redis.flushdb
rescue LoadError
return puts "Skipping redis test, 'redis' gem not found: #{$!.message}"
rescue Errno::ECONNREFUSED
return puts 'Skipping redis test, not running locally'
end

mutex = Mutex.new
total = 100
count = 0
incr = Proc.new do
mutex.synchronize do
count += 1
end
end

async_test(1.0) do |cb|
queue = GirlFriday::WorkQueue.new('redis-pool', :size => 2, :store => GirlFriday::Store::Redis, :store_config => [{ :redis => redis }]) do |msg|
incr.call
queue.shutdown do
cb.call
end if count == total
end
total.times do
queue.push(:text => 'foo')
def test_should_raise_if_no_store_config_passed_in_for_redis_backend
assert_raises(ArgumentError) do
GirlFriday::WorkQueue.new('raise-test', :store => GirlFriday::Store::Redis) do |msg|
# doing work
end
end
end
Expand Down

0 comments on commit c3fe33c

Please sign in to comment.