Permalink
Browse files

Create an abstraction between Resque and Redis

I want to write code to access information about what's going on in
Resque, but the code needs to work for multiple Resque instances in the
same Ruby VM.  Because `Resque.redis` is global, it is very difficult
(impossible in some cases) to use the Resque API directly.

Provide an API that does not rely on a global variable that encapsulates
all the ways in which Resque interacts with Redis, namely the names of
keys and what sort of data structure is expected in those keys.

Consider a call like this:

```ruby
decode redis.lpop("queue:#{queue}")
```

This should mean "decode the job on queue `queue`", but it actually
means "decode whatever is in redis under the key `"queue:#{queue}"`
which just so happens to be how we store queues, but don't worry about
that right now, just go in Redis and do it".

With this PR, it turns into this:

```ruby
decode(@data_store.pop_from_queue(queue))
  ```
  which is saying "get me the job in queue `queue`, however that's done,
  and decode it.

  Which means that someone _else_ can do this without knowing how to
  construct the redis key for queue.

  And because that knowledge is now centralized in one class
  (`DataStore`) instead of littered throughout the codebase, one could
  perform these operations on multiple resque queues from the same Ruby
  VM, e.g. for monitoring:

  ```ruby
  resques = {
        www: '10.0.3.4:2345',
               admin: '10.1.4.5:8765',
                   ops: '10.1.4.5:8766',
  }

data_stores = Hash[resques.map { |name,location|
    [name,Resque::DataStore.new(Redis.new(location))]
}]

data_stores[:www].num_failed # => how many are failed in www's Resque
data_stores[:admin].num_failed # => what about admin?
stuck_workers = data_stores[:ops].workers.select { |worker|
    data_stores[:ops].worker_start_time(worker) > 1.hour.ago
}
```
And so forth.

This is not an ideal design, but it solves the problem without breaking
backwards compatibility and is better than what exists now, since it at
least centralizes how Resque's data structures are stored in Redis.  It
could also, in theory, allow a different backing store than Redis.

I hacked a `concerning` concept to demonstrate which calls were relevant
to what—this could be split into further classes.  It's also possible
that versions of the major objects (`Resque`, `Worker`, and `Job`) could
be created to not use a global for `redis`, but that is for another day.
  • Loading branch information...
1 parent 105a540 commit 4bb44413a045fca78d25e22ad535b3f5ea97fab8 @davetron5000 davetron5000 committed with chrisccerami Apr 27, 2014
View
@@ -8,3 +8,6 @@ test/stdout
*.swp
pkg
Gemfiles/Gemfile.1.8.7.lock
+.ruby-version
+.ruby-gemset
+*.sw?
View
@@ -18,6 +18,7 @@
require 'resque/job'
require 'resque/worker'
require 'resque/plugin'
+require 'resque/data_store'
require 'resque/vendor/utf8_util'
@@ -120,33 +121,27 @@ def redis=(server)
end
namespace ||= :resque
- @redis = Redis::Namespace.new(namespace, :redis => redis)
+ @data_store = Resque::DataStore.new(Redis::Namespace.new(namespace, :redis => redis))
when Redis::Namespace
- @redis = server
+ @data_store = server
when Hash
- @redis = Redis::Namespace.new(:resque, :redis => Redis.new(server))
+ @data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => Redis.new(server)))
else
- @redis = Redis::Namespace.new(:resque, :redis => server)
+ @data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => server))
end
end
# Returns the current Redis connection. If none has been created, will
# create a new one.
def redis
- return @redis if @redis
+ return @data_store if @data_store
self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
self.redis
end
+ alias :data_store :redis
def redis_id
- # support 1.x versions of redis-rb
- if redis.respond_to?(:server)
- redis.server
- elsif redis.respond_to?(:nodes) # distributed
- redis.nodes.map { |n| n.id }.join(', ')
- else
- redis.client.id
- end
+ @data_store.identifier
end
# Set or retrieve the current logger object
@@ -270,24 +265,20 @@ def to_s
#
# Returns nothing
def push(queue, item)
- encoded = encode(item)
- redis.pipelined do
- watch_queue(queue)
- redis.send((enqueue_front ? 'lpush' : 'rpush'), "queue:#{queue}", encoded)
- end
+ @data_store.push_to_queue(queue,encode(item))
end
# Pops a job off a queue. Queue name should be a string.
#
# Returns a Ruby object.
def pop(queue)
- decode redis.lpop("queue:#{queue}")
+ decode(@data_store.pop_from_queue(queue))
end
# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
- redis.llen("queue:#{queue}").to_i
+ @data_store.queue_size(queue)
end
# Returns an array of items currently queued. Queue name should be
@@ -299,38 +290,39 @@ def size(queue)
# To get the 3rd page of a 30 item, paginatied list one would use:
# Resque.peek('my_list', 59, 30)
def peek(queue, start = 0, count = 1)
- list_range("queue:#{queue}", start, count)
+ results = @data_store.peek_in_queue(queue,start,count)
+ if count == 1
+ decode(results)
+ else
+ results.map { |result| decode(result) }
+ end
end
# Does the dirty work of fetching a range of items from a Redis list
# and converting them into Ruby objects.
def list_range(key, start = 0, count = 1)
+ results = @data_store.list_range(key, start, count)
if count == 1
- decode redis.lindex(key, start)
+ decode(results)
else
- Array(redis.lrange(key, start, start+count-1)).map do |item|
- decode item
- end
+ results.map { |result| decode(result) }
end
end
# Returns an array of all known Resque queues as strings.
def queues
- Array(redis.smembers(:queues))
+ @data_store.queue_names
end
# Given a queue name, completely deletes the queue.
def remove_queue(queue)
- redis.pipelined do
- redis.srem(:queues, queue.to_s)
- redis.del("queue:#{queue}")
- end
+ @data_store.remove_queue(queue)
end
# Used internally to keep track of which queues we've created.
# Don't call this directly.
def watch_queue(queue)
- redis.sadd(:queues, queue.to_s)
+ @data_store.watch_queue(queue)
end
@@ -493,7 +485,7 @@ def info
:queues => queues.size,
:workers => workers.size.to_i,
:working => working.size,
- :failed => Resque.redis.llen(:failed).to_i,
+ :failed => @data_store.num_failed,
:servers => [redis_id],
:environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
}
@@ -502,9 +494,7 @@ def info
# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
# is O(N) for the keyspace, so be careful - this can be slow for big databases.
def keys
- redis.keys("*").map do |key|
- key.sub("#{redis.namespace}:", '')
- end
+ @data_store.all_resque_keys
end
# Returns a hash, mapping queue names to queue sizes
Oops, something went wrong.

0 comments on commit 4bb4441

Please sign in to comment.