Skip to content

Commit

Permalink
Create an abstraction between Resque and Redis
Browse files Browse the repository at this point in the history
I want to write code to access information about what's going on in
Resque, but the code needs to work for multiple Resque instances in the
same Ruby VM.  Because `Resque.redis` is global, it is very difficult
(impossible in some cases) to use the Resque API directly.

Provide an API that does not rely on a global variable that encapsulates
all the ways in which Resque interacts with Redis, namely the names of
keys and what sort of data structure is expected in those keys.

Consider a call like this:

```ruby
decode redis.lpop("queue:#{queue}")
```

This should mean "decode the job on queue `queue`", but it actually
means "decode whatever is in redis under the key `"queue:#{queue}"`
which just so happens to be how we store queues, but don't worry about
that right now, just go in Redis and do it".

With this PR, it turns into this:

```ruby
decode(@data_store.pop_from_queue(queue))
  ```
  which is saying "get me the job in queue `queue`, however that's done,
  and decode it.

  Which means that someone _else_ can do this without knowing how to
  construct the redis key for queue.

  And because that knowledge is now centralized in one class
  (`DataStore`) instead of littered throughout the codebase, one could
  perform these operations on multiple resque queues from the same Ruby
  VM, e.g. for monitoring:

  ```ruby
  resques = {
        www: '10.0.3.4:2345',
               admin: '10.1.4.5:8765',
                   ops: '10.1.4.5:8766',
  }

data_stores = Hash[resques.map { |name,location|
    [name,Resque::DataStore.new(Redis.new(location))]
}]

data_stores[:www].num_failed # => how many are failed in www's Resque
data_stores[:admin].num_failed # => what about admin?
stuck_workers = data_stores[:ops].workers.select { |worker|
    data_stores[:ops].worker_start_time(worker) > 1.hour.ago
}
```
And so forth.

This is not an ideal design, but it solves the problem without breaking
backwards compatibility and is better than what exists now, since it at
least centralizes how Resque's data structures are stored in Redis.  It
could also, in theory, allow a different backing store than Redis.

I hacked a `concerning` concept to demonstrate which calls were relevant
to what—this could be split into further classes.  It's also possible
that versions of the major objects (`Resque`, `Worker`, and `Job`) could
be created to not use a global for `redis`, but that is for another day.
  • Loading branch information
davetron5000 committed Feb 10, 2016
1 parent db4d5d0 commit 5143054
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 87 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ Gemfile.lock
test/dump.rdb
test/dump-cluster.rdb
test/stdout
*.swp
.ruby-version
.ruby-gemset
*.sw?
60 changes: 25 additions & 35 deletions lib/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'resque/job'
require 'resque/worker'
require 'resque/plugin'
require 'resque/data_store'

require 'resque/vendor/utf8_util'

Expand Down Expand Up @@ -120,33 +121,27 @@ def redis=(server)
end
namespace ||= :resque

@redis = Redis::Namespace.new(namespace, :redis => redis)
@data_store = Resque::DataStore.new(Redis::Namespace.new(namespace, :redis => redis))
when Redis::Namespace
@redis = server
@data_store = server
when Hash
@redis = Redis::Namespace.new(:resque, :redis => Redis.new(server))
@data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => Redis.new(server)))
else
@redis = Redis::Namespace.new(:resque, :redis => server)
@data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => server))
end
end

# Returns the current Redis connection. If none has been created, will
# create a new one.
def redis
return @redis if @redis
return @data_store if @data_store
self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
self.redis
end
alias :data_store :redis

def redis_id
# support 1.x versions of redis-rb
if redis.respond_to?(:server)
redis.server
elsif redis.respond_to?(:nodes) # distributed
redis.nodes.map { |n| n.id }.join(', ')
else
redis.client.id
end
@data_store.identifier
end

# Set or retrieve the current logger object
Expand Down Expand Up @@ -264,24 +259,20 @@ def to_s
#
# Returns nothing
def push(queue, item)
encoded = encode(item)
redis.pipelined do
watch_queue(queue)
redis.rpush "queue:#{queue}", encoded
end
@data_store.push_to_queue(queue,encode(item))
end

# Pops a job off a queue. Queue name should be a string.
#
# Returns a Ruby object.
def pop(queue)
decode redis.lpop("queue:#{queue}")
decode(@data_store.pop_from_queue(queue))
end

# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
redis.llen("queue:#{queue}").to_i
@data_store.queue_size(queue)
end

# Returns an array of items currently queued. Queue name should be
Expand All @@ -293,38 +284,39 @@ def size(queue)
# To get the 3rd page of a 30 item, paginatied list one would use:
# Resque.peek('my_list', 59, 30)
def peek(queue, start = 0, count = 1)
list_range("queue:#{queue}", start, count)
results = @data_store.peek_in_queue(queue,start,count)
if count == 1
decode(results)
else
results.map { |result| decode(result) }
end
end

# Does the dirty work of fetching a range of items from a Redis list
# and converting them into Ruby objects.
def list_range(key, start = 0, count = 1)
results = @data_store.list_range(key, start, count)
if count == 1
decode redis.lindex(key, start)
decode(results)
else
Array(redis.lrange(key, start, start+count-1)).map do |item|
decode item
end
results.map { |result| decode(result) }
end
end

# Returns an array of all known Resque queues as strings.
def queues
Array(redis.smembers(:queues))
@data_store.queue_names
end

# Given a queue name, completely deletes the queue.
def remove_queue(queue)
redis.pipelined do
redis.srem(:queues, queue.to_s)
redis.del("queue:#{queue}")
end
@data_store.remove_queue(queue)
end

# Used internally to keep track of which queues we've created.
# Don't call this directly.
def watch_queue(queue)
redis.sadd(:queues, queue.to_s)
@data_store.watch_queue(queue)
end


Expand Down Expand Up @@ -487,7 +479,7 @@ def info
:queues => queues.size,
:workers => workers.size.to_i,
:working => working.size,
:failed => Resque.redis.llen(:failed).to_i,
:failed => @data_store.num_failed,
:servers => [redis_id],
:environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
}
Expand All @@ -496,9 +488,7 @@ def info
# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
# is O(N) for the keyspace, so be careful - this can be slow for big databases.
def keys
redis.keys("*").map do |key|
key.sub("#{redis.namespace}:", '')
end
@data_store.all_resque_keys
end

# Returns a hash, mapping queue names to queue sizes
Expand Down
Loading

0 comments on commit 5143054

Please sign in to comment.