Skip to content

Commit

Permalink
Fork resque-mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrochalakis committed Jan 22, 2010
1 parent 81a5cc2 commit 896131d
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 113 deletions.
19 changes: 19 additions & 0 deletions README.markdown
@@ -1,3 +1,21 @@
Resque-mongo
============
Resque-mongo is a fork of [Resque][resque] that uses MongoDB as a
backend instead of Redis. This fork is a work in progress, all the
library functionality has been ported (all tests pass) and the
monitoring sinatra app works except the "stats" panel, although there are
a lot of details that need to been taken care of.

Resque-mongo uses a fairly new feature of mongo, [findAndModify()][fnr].
findAndModify is not yet supported by the ruby mongo driver because the
command's api might change a bit. You can use a [patched version
mongo-ruby-driver][fnr-ruby] that supports the current implementation.

Also, check your mongo version: 1.3.0 or higher

[fnr]: http://www.mongodb.org/display/DOCS/findandmodify+Command
[fnr-ruby]: http://github.com/ctrochalakis/mongo-ruby-driver/tree/find_replace

Resque
======

Expand Down Expand Up @@ -734,3 +752,4 @@ Chris Wanstrath :: chris@ozmm.org :: @defunkt
[1]: http://help.github.com/forking/
[2]: http://github.com/defunkt/resque/issues
[sv]: http://semver.org/
[resque]: http://github.com/defunct/resque
13 changes: 6 additions & 7 deletions Rakefile
Expand Up @@ -30,16 +30,15 @@ begin
require 'resque/version'

Jeweler::Tasks.new do |gemspec|
gemspec.name = "resque"
gemspec.name = "resque-mongo"
gemspec.summary = ""
gemspec.description = ""
gemspec.email = "chris@ozmm.org"
gemspec.homepage = "http://github.com/defunkt/resque"
gemspec.authors = ["Chris Wanstrath"]
gemspec.email = "yatiohi@ideopolis.gr"
gemspec.homepage = "http://github.com/ctrochalakis/resque-mongo"
gemspec.authors = ["Christos Trochalakis"]
gemspec.version = Resque::Version

gemspec.add_dependency "redis"
gemspec.add_dependency "redis-namespace"
gemspec.add_dependency "mongo"
gemspec.add_dependency "vegas", ">=0.1.2"
gemspec.add_dependency "sinatra", ">=0.9.2"
gemspec.add_development_dependency "jeweler"
Expand All @@ -60,7 +59,7 @@ task :publish => [ :test, :gemspec, :build ] do
system "git tag v#{Resque::Version}"
system "git push origin v#{Resque::Version}"
system "git push origin master"
system "gem push pkg/resque-#{Resque::Version}.gem"
system "gem push pkg/resque-mongo-#{Resque::Version}.gem"
system "git clean -fd"
exec "rake pages"
end
4 changes: 3 additions & 1 deletion deps.rip
@@ -1,4 +1,6 @@
git://github.com/ezmobius/redis-rb.git eed200ad
git://github.com/ctrochalakis/mongo-ruby-driver find_replace
mongo
mongo_ext
git://github.com/brianmario/yajl-ruby.git 0.6.3
git://github.com/sinatra/sinatra.git 0.9.4
git://github.com/rack/rack.git 1.0
Expand Down
102 changes: 69 additions & 33 deletions lib/resque.rb
@@ -1,4 +1,4 @@
require 'redis/namespace'
require 'mongo'

begin
require 'yajl'
Expand All @@ -21,32 +21,68 @@ module Resque
extend self

# Accepts a 'hostname:port' string or a Redis server.
def redis=(server)
def mongo=(server)
case server
when String
host, port = server.split(':')
redis = Redis.new(:host => host, :port => port, :thread_safe => true)
@redis = Redis::Namespace.new(:resque, :redis => redis)
when Redis
@redis = Redis::Namespace.new(:resque, :redis => server)
@con = Mongo::Connection.new
@db = @con.db('monque')
@mongo = @db.collection('monque')
@workers = @db.collection('workers')
@failures = @db.collection('failures')
@stats = @db.collection('stats')

add_indexes
else
raise "I don't know what to do with #{server.inspect}"
end
end


# Returns the current Redis connection. If none has been created, will
# create a new one.
def redis
return @redis if @redis
self.redis = 'localhost:6379'
self.redis
def mongo
return @mongo if @mongo
self.mongo = 'localhost:27017'
self.mongo
end

def mongo_workers
return @workers if @workers
self.mongo = 'localhost:27017'
@workers
end

def mongo_failures
return @failures if @failures
self.mongo = 'localhost:27017'
@failures
end

def mongo_stats
return @stats if @stats
self.mongo = 'localhost:27017'
@stats
end

def to_s
"Resque Client connected to #{redis.server}"
"Mongo Client connected to #{@con.host}"
end

def add_indexes
@mongo.create_index :queue
@workers.create_index :worker
@stats.create_index :stat
end

def drop
@mongo.drop
@workers.drop
@failures.drop
@stats.drop
@mongo = nil
end

#
# queue manipulation
#
Expand All @@ -55,20 +91,26 @@ def to_s
# item should be any JSON-able Ruby object.
def push(queue, item)
watch_queue(queue)
redis.rpush "queue:#{queue}", encode(item)
mongo << { :queue => queue.to_s, :item => encode(item) }
end

# Pops a job off a queue. Queue name should be a string.
#
# Returns a Ruby object.
def pop(queue)
decode redis.lpop("queue:#{queue}")
doc = mongo.find_modify( :query => { :queue => queue },
:sort => [:natural, :desc],
:remove => true )
decode doc['item']
rescue Mongo::OperationFailure => e
return nil if e.message =~ /No matching object/
raise e
end

# Returns an int representing the size of a queue.
# Queue name should be a string.
def size(queue)
redis.llen("queue:#{queue}").to_i
mongo.find(:queue => queue).count
end

# Returns an array of items currently queued. Queue name should be
Expand All @@ -80,36 +122,32 @@ def size(queue)
# To get the 3rd page of a 30 item, paginatied list one would use:
# Resque.peek('my_list', 59, 30)
def peek(queue, start = 0, count = 1)
list_range("queue:#{queue}", start, count)
end

# Does the dirty work of fetching a range of items from a Redis list
# and converting them into Ruby objects.
def list_range(key, start = 0, count = 1)
res = mongo.find(:queue => queue).sort([:natural, :desc]).skip(start).limit(count).to_a
res.collect! { |doc| decode(doc['item']) }

if count == 1
decode redis.lindex(key, start)
return nil if res.empty?
res.first
else
Array(redis.lrange(key, start, start+count-1)).map do |item|
decode item
end
return [] if res.empty?
res
end
end

# Returns an array of all known Resque queues as strings.
def queues
redis.smembers(:queues)
mongo.distinct(:queue)
end

# Given a queue name, completely deletes the queue.
def remove_queue(queue)
redis.srem(:queues, queue.to_s)
redis.del("queue:#{queue}")
mongo.remove(:queue => queue)
end

# Used internally to keep track of which queues we've created.
# Don't call this directly.
def watch_queue(queue)
redis.sadd(:queues, queue.to_s)
# redis.sadd(:queues, queue.to_s)
end


Expand Down Expand Up @@ -174,15 +212,13 @@ def info
:workers => workers.size.to_i,
:working => working.size,
:failed => Stat[:failed],
:servers => [redis.server]
:servers => ["#{@con.host}:#{@con.port}"]
}
end

# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
# is O(N) for the keyspace, so be careful - this can be slow for big databases.
def keys
redis.keys("*").map do |key|
key.sub('resque:', '')
end
queues
end
end
4 changes: 2 additions & 2 deletions lib/resque/failure.rb
Expand Up @@ -32,8 +32,8 @@ def self.backend=(backend)
# back to `Resque::Failure::Redis`
def self.backend
return @backend if @backend
require 'resque/failure/redis'
@backend = Failure::Redis
require 'resque/failure/mongo'
@backend = Failure::Mongo
end

# Returns the int count of how many failures we have seen.
Expand Down
13 changes: 6 additions & 7 deletions lib/resque/failure/redis.rb → lib/resque/failure/mongo.rb
@@ -1,8 +1,8 @@
module Resque
module Failure
# A Failure backend that stores exceptions in Redis. Very simple but
# A Failure backend that stores exceptions in Mongo. Very simple but
# works out of the box, along with support in the Resque web app.
class Redis < Base
class Mongo < Base
def save
data = {
:failed_at => Time.now.strftime("%Y/%m/%d %H:%M:%S"),
Expand All @@ -12,20 +12,19 @@ def save
:worker => worker.to_s,
:queue => queue
}
data = Resque.encode(data)
Resque.redis.rpush(:failed, data)
Resque.mongo_failures << data
end

def self.count
Resque.redis.llen(:failed).to_i
Resque.mongo_failures.count
end

def self.all(start = 0, count = 1)
Resque.list_range(:failed, start, count)
Resque.mongo_failures.find().sort([:natural, :desc]).skip(start).limit(count).to_a
end

def self.clear
Resque.redis.delete('resque:failed')
Resque.mongo_failures.remove
end

end
Expand Down
12 changes: 10 additions & 2 deletions lib/resque/helpers.rb
Expand Up @@ -2,8 +2,16 @@ module Resque
# Methods used by various classes in Resque.
module Helpers
# Direct access to the Redis instance.
def redis
Resque.redis
def mongo
Resque.mongo
end

def mongo_workers
Resque.mongo_workers
end

def mongo_stats
Resque.mongo_stats
end

# Given a Ruby object, returns a string suitable for storage in a
Expand Down
4 changes: 2 additions & 2 deletions lib/resque/server/views/layout.erb
Expand Up @@ -31,8 +31,8 @@

<div id="footer">
<p>Powered by <a href="http://github.com/defunkt/resque">Resque</a> v<%=Resque::Version%></p>
<p>Connected to Redis on <%=Resque.redis.server%></p>
<p>Connected to Redis on ... </p>
</div>

</body>
</html>
</html>
10 changes: 6 additions & 4 deletions lib/resque/stat.rb
Expand Up @@ -11,7 +11,9 @@ module Stat

# Returns the int value of a stat, given a string stat name.
def get(stat)
redis.get("stat:#{stat}").to_i
res = mongo_stats.find_one(:stat => stat)
return 0 unless res
res['value'].to_i
end

# Alias of `get`
Expand All @@ -24,7 +26,7 @@ def [](stat)
# Can optionally accept a second int parameter. The stat is then
# incremented by that amount.
def incr(stat, by = 1)
redis.incr("stat:#{stat}", by)
mongo_stats.update({:stat => stat}, {'$inc' => {:value => by}}, :upsert => true)
end

# Increments a stat by one.
Expand All @@ -37,7 +39,7 @@ def <<(stat)
# Can optionally accept a second int parameter. The stat is then
# decremented by that amount.
def decr(stat, by = 1)
redis.decr("stat:#{stat}", by)
mongo_stats.update({:stat => stat}, {'$inc' => {:value => -by}})
end

# Decrements a stat by one.
Expand All @@ -47,7 +49,7 @@ def >>(stat)

# Removes a stat from Redis, effectively setting it to 0.
def clear(stat)
redis.del("stat:#{stat}")
mongo_stats.remove(:stat => stat)
end
end
end

0 comments on commit 896131d

Please sign in to comment.