Browse files

Change driver from Mongo to Moped.

  • Loading branch information...
1 parent 18b7159 commit cca937be47b9c39b521e7f2e3e2d7331dc3d303e @dbackeus dbackeus committed Feb 5, 2013
View
52 lib/resque.rb
@@ -1,4 +1,4 @@
-require 'mongo'
+require 'moped'
require 'resque/version'
@@ -18,28 +18,28 @@ module Resque
extend self
@delayed_queues = []
- # Set the queue database. Expects a Mongo::DB object.
+ # Set the queue database. Expects a Moped::Session object with a default database.
def mongo=(database)
- if database.is_a?(Mongo::DB)
+ if database.is_a?(Moped::Session)
@mongo = database
initialize_mongo
else
- raise ArgumentError, "Resque.mongo= expects a Mongo::DB database, not a #{database.class}."
+ raise ArgumentError, "Resque.mongo= expects a Moped::Session, not a #{database.class}."
end
end
# Returns the current Mongo::DB. If none has been created, it will
# create a new one called 'resque'.
def mongo
return @mongo if @mongo
- self.mongo = Mongo::Connection.new.db("resque")
+ self.mongo = Moped::Session.new(["127.0.0.1:27017"]).with(database: "resque")
@mongo
end
def initialize_mongo
- mongo_workers.create_index :worker
- mongo_stats.create_index :stat
- delayed_queues = mongo_stats.find_one(:stat => 'Delayed Queues')
+ mongo_workers.indexes.create(worker: 1)
+ mongo_stats.indexes.create(stat: 1)
+ delayed_queues = mongo_stats.find(stat: 'Delayed Queues').first
@delayed_queues = delayed_queues['value'] if delayed_queues
end
@@ -103,8 +103,9 @@ def after_fork=(after_fork)
end
def to_s
- connection_info = mongo.connection.primary_pool
- "Resque Client connected to #{connection_info.host}:#{connection_info.port}/#{mongo.name}"
+ mongo.cluster.with_primary do |node|
+ "Resque Client connected to #{node.resolved_address}/#{mongo.options[:database]}"
+ end
end
def delayed_job?(klass)
@@ -120,7 +121,7 @@ def enable_delay(queue)
queue = namespace_queue(queue)
unless delayed_queue? queue
@delayed_queues << queue
- mongo_stats.update({:stat => 'Delayed Queues'}, {'$addToSet' => {'value' => queue}}, {:upsert => true})
+ mongo_stats.find(stat: 'Delayed Queues').upsert('$addToSet' => {'value' => queue})
end
end
@@ -158,7 +159,8 @@ def inline=(inline)
def push(queue, item)
queue = namespace_queue(queue)
item[:resque_enqueue_timestamp] = Time.now
- mongo[queue] << item
+ mongo[queue].insert item
+ item
end
# Pops a job off a queue. Queue name should be a string.
@@ -168,35 +170,31 @@ def pop(queue)
queue = namespace_queue(queue)
query = {}
query['delay_until'] = { '$lt' => Time.now } if delayed_queue?(queue)
- #sorting will result in significant performance penalties for large queues, you have been warned.
- item = mongo[queue].find_and_modify(:query => query, :remove => true, :sort => [[:_id, :asc]])
- rescue Mongo::OperationFailure => e
- return nil if e.message =~ /No matching object/
- raise e
+ mongo[queue].find(query).sort(_id: 1).modify({}, remove: true)
end
# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
queue = namespace_queue(queue)
- mongo[queue].count
+ mongo[queue].find.count
end
def delayed_size(queue)
queue = namespace_queue(queue)
if delayed_queue? queue
- mongo[queue].find({'delay_until' => { '$gt' => Time.now }}).count
+ mongo[queue].find(delay_until: { '$gt' => Time.now }).count
else
- mongo[queue].count
+ mongo[queue].find.count
end
end
def ready_size(queue)
queue = namespace_queue(queue)
if delayed_queue? queue
- mongo[queue].find({'delay_until' => { '$lt' => Time.now }}).count
+ mongo[queue].find(delay_until: { '$lt' => Time.now }).count
else
- mongo[queue].count
+ mongo[queue].find.count
end
end
@@ -216,23 +214,23 @@ def peek(queue, start = 0, count = 1, mode = :ready)
# Does the dirty work of fetching a range of items from a Redis list
# and converting them into Ruby objects.
def list_range(key, start = 0, count = 1, mode = :ready)
- query = { }
- sort = []
+ query = {}
+ sort = {}
if delayed_queue? key
if mode == :ready
query['delay_until'] = { '$not' => { '$gt' => Time.new}}
elsif mode == :delayed
query['delay_until'] = { '$gt' => Time.new}
elsif mode == :delayed_sorted
query['delay_until'] = { '$gt' => Time.new}
- sort << ['delay_until', 1]
+ sort << { delay_until: 1 }
elsif mode == :all_sorted
query = {}
- sort << ['delay_until', 1]
+ sort << { delay_until: 1 }
end
end
queue = namespace_queue(key)
- items = mongo[queue].find(query, { :limit => count, :skip => start, :sort => sort}).to_a.map{ |i| i}
+ items = mongo[queue].find(query).limit(count).skip(start).sort(sort).to_a
count > 1 ? items : items.first
end
View
10 lib/resque/delayed.rb
@@ -24,7 +24,7 @@ def self.extended(base)
end
def initialize_delayed
- delayed_queues = mongo_stats.find_one(:stat => 'Delayed Queues')
+ delayed_queues = mongo_stats.find(stat: 'Delayed Queues').first
@delayed_queues = delayed_queues['value'] if delayed_queues
end
@@ -48,18 +48,18 @@ def enable_delay(queue)
def delayed_size(queue)
queue = namespace_queue(queue)
if delayed_queue? queue
- mongo[queue].find({'delay_until' => { '$gt' => Time.now }}).count
+ mongo[queue].find(delay_until: { '$gt' => Time.now }).count
else
- mongo[queue].count
+ mongo[queue].find.count
end
end
def ready_size(queue)
queue = namespace_queue(queue)
if delayed_queue? queue
- mongo[queue].find({'delay_until' => { '$lt' => Time.now }}).count
+ mongo[queue].find(delay_until: { '$lt' => Time.now }).count
else
- mongo[queue].count
+ mongo[queue].find.count
end
end
end
View
12 lib/resque/failure/mongo.rb
@@ -13,32 +13,32 @@ def save
:worker => worker.to_s,
:queue => queue
}
- Resque.mongo_failures << data
+ Resque.mongo_failures.insert data
end
def self.count
- Resque.mongo_failures.count
+ Resque.mongo_failures.find.count
end
def self.all(start = 0, count = 1)
- all_failures = Resque.mongo_failures.find().skip(start.to_i).limit(count.to_i).to_a
+ all_failures = Resque.mongo_failures.find.skip(start.to_i).limit(count.to_i).to_a
all_failures.size == 1 ? all_failures.first : all_failures
end
def self.clear
- Resque.mongo_failures.remove
+ Resque.mongo_failures.find.remove
end
def self.requeue(index)
item = all(index)
item['retried_at'] = Time.now.strftime("%Y/%m/%d %H:%M:%S")
- Resque.mongo_failures.update({ :_id => item['_id']}, item)
+ Resque.mongo_failures.find(_id: item['_id']).update(item)
Job.create(item['queue'], item['payload']['class'], *item['payload']['args'])
end
def self.remove(index)
item = all(index)
- Resque.mongo_failures.remove(:_id => item['_id'])
+ Resque.mongo_failures.find(_id: item['_id']).remove
end
end
end
View
2 lib/resque/failure/redis.rb
@@ -17,7 +17,7 @@ def save
end
def self.count
- Resque.mongo_failures.count
+ Resque.mongo_failures.find.count
end
def self.all(start = 0, count = 1)
View
4 lib/resque/job.rb
@@ -97,7 +97,7 @@ def self.destroy(queue, klass, *args)
selector = {'class' => klass.to_s}
selector['args'] = args unless args.empty?
destroyed = collection.find(selector).count
- collection.remove(selector, :safe => true)
+ collection.find(selector).remove_all
destroyed
end
@@ -198,7 +198,7 @@ def recreate
# String representation
def inspect
obj = @payload
- "(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ]
+ "(Job{%s} | %s | %s)" % [@queue, obj['class'], obj['args'].inspect]
end
# Equality
View
8 lib/resque/server.rb
@@ -58,7 +58,7 @@ def tabs
end
def mongo_get_size(key)
- Resque.mongo[key].count
+ Resque.mongo[key].find.count
end
def mongo_get_value_as_array(key, start=0)
@@ -110,8 +110,10 @@ def show(page, layout = true)
response["Cache-Control"] = "max-age=0, private, must-revalidate"
begin
erb page.to_sym, {:layout => layout}, :resque => Resque
- rescue Mongo::ConnectionError, Mongo::ConnectionFailure
- erb :error, {:layout => false}, :error => "Can't connect to MongoDB!"
+ rescue Moped::Errors::ConnectionFailure
+ erb :error, {:layout => false}, :error => "Could not connect to mongodb!"
+ rescue Moped::Errors::AuthenticationFailure
+ erb :error, {:layout => false}, :error => "Could not authenticate against mongodb!"
end
end
View
2 lib/resque/server/views/stats.erb
@@ -63,7 +63,7 @@
<th>
<a href="<%=u "/stats/keys/#{key}" %>"><%= key %></a>
</th>
- <td><%= resque.mongo[key].count %></td>
+ <td><%= resque.mongo[key].find.count %></td>
</tr>
<% end %>
</table>
View
6 lib/resque/stat.rb
@@ -11,7 +11,7 @@ module Stat
# Returns the int value of a stat, given a string stat name.
def get(stat)
- value = mongo_stats.find_one :stat => stat
+ value = mongo_stats.find(stat: stat).first
value.nil? ? 0 : value['value']
end
@@ -25,7 +25,7 @@ def [](stat)
# Can optionally accept a second int parameter. The stat is then
# incremented by that amount.
def incr(stat, by = 1)
- mongo_stats.update({:stat => stat}, {'$inc' => {:value => by}}, :upsert => true)
+ mongo_stats.find(stat: stat).upsert('$inc' => { value: by })
end
# Increments a stat by one.
@@ -48,7 +48,7 @@ def >>(stat)
# Removes a stat from Redis, effectively setting it to 0.
def clear(stat)
- mongo_stats.remove({:stat => stat})
+ mongo_stats.find(stat: stat).remove
end
end
end
View
22 lib/resque/worker.rb
@@ -24,7 +24,7 @@ class Worker
# Returns an array of all worker objects.
def self.all
- mongo_workers.distinct(:worker).map { |worker| find(worker) }.compact
+ mongo_workers.find.distinct(:worker).map { |worker| find(worker) }.compact
end
# Returns an array of all worker objects currently processing
@@ -53,7 +53,7 @@ def self.attach(worker_id)
# Given a string worker id, return a boolean indicating whether the
# worker exists
def self.exists?(worker_id)
- mongo_workers.find(:worker => worker_id.to_s).count > 0
+ mongo_workers.find(worker: worker_id.to_s).count > 0
end
# Workers should be initialized with an array of string queue
@@ -333,7 +333,7 @@ def prune_dead_workers
# Registers ourself as a worker. Useful when entering the worker
# lifecycle on startup.
def register_worker
- mongo_workers << { :worker => self.to_s}
+ mongo_workers.insert(worker: self.to_s)
started!
end
@@ -359,7 +359,7 @@ def unregister_worker
job.fail(DirtyExit.new)
end
- mongo_workers.remove :worker => self.to_s
+ mongo_workers.find(worker: self.to_s).remove
Stat.clear("processed:#{self}")
Stat.clear("failed:#{self}")
@@ -371,14 +371,14 @@ def working_on(job)
data = { :queue => job.queue,
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"),
:payload => job.payload }
- mongo_workers.update({:worker => self.to_s}, { '$set' => { 'working_on' => data}}, :upsert => true)
+ mongo_workers.find(worker: self.to_s).upsert('$set' => { 'working_on' => data })
end
# Called when we are done working - clears our `working_on` state
# and tells Mongo we processed a job.
def done_working
processed!
- mongo_workers.remove({ :worker => self.to_s})
+ mongo_workers.find(worker: self.to_s).remove
end
# How many jobs has this worker processed? Returns an int.
@@ -405,19 +405,19 @@ def failed!
# What time did this worker start? Returns an instance of `Time`
def started
- worker = mongo_workers.find_one(:worker => self.to_s)
+ worker = mongo_workers.find(worker: self.to_s).first
worker.nil? ? nil : worker['started']
end
# Tell Mongo we've started
def started!
- mongo_workers.update({ :worker => self.to_s}, { '$set' => { :started => Time.now.to_s}})
+ mongo_workers.find(worker: self.to_s).update('$set' => { started: Time.now.to_s })
end
# Returns a hash explaining the Job we're currently processing, if any.
def job
- worker = mongo_workers.find_one :worker => self.to_s
- worker.nil? ? { } : worker['working_on'] #decode(worker['working_on'])
+ worker = mongo_workers.find(worker: self.to_s).first
+ worker.nil? ? {} : worker['working_on']
end
alias_method :processing, :job
@@ -434,7 +434,7 @@ def idle?
# Returns a symbol representing the current worker state,
# which can be either :working or :idle
def state
- mongo_workers.find_one(:worker => self.to_s) ? :working : :idle
+ mongo_workers.find(worker: self.to_s).first ? :working : :idle
end
# Is this worker the same as another worker?
View
4 mongo-resque.gemspec
@@ -5,7 +5,7 @@ Gem::Specification.new do |s|
s.name = "mongo-resque"
s.version = Resque::Version
s.authors = ["David Backeus"]
- s.email = ["david@streamio.se"]
+ s.email = ["david@streamio.com"]
s.homepage = "https://github.com/streamio/mongo-resque"
s.summary = "Mongo-Resque is a mongo-backed queueing system"
s.description = <<-description
@@ -16,7 +16,7 @@ Gem::Specification.new do |s|
without the work of defunkt and ctrochalakis on github.
description
- s.add_dependency "mongo", "~> 1.6.2"
+ s.add_dependency "moped", "~> 1.3"
s.add_dependency "vegas", "~> 0.1.2"
s.add_dependency "sinatra", ">= 0.9.2"
s.add_dependency "multi_json", "~> 1.0"
View
12 test/resque_test.rb
@@ -10,12 +10,12 @@
end
test "uses database called resque by default" do
- assert 'resque', Resque.mongo.name
+ assert 'resque', Resque.mongo[:asdf].database.name
end
test "can set a database with a Mongo::DB" do
- Resque.mongo = Mongo::Connection.new.db('resque-test-with-specific-database')
- assert_equal 'resque-test-with-specific-database', Resque.mongo.name
+ Resque.mongo = Moped::Session.new(["127.0.0.1:27017"]).with(database: 'resque-test-with-specific-database')
+ assert_equal 'resque-test-with-specific-database', Resque.mongo[:asdf].database.name
end
test "can not set a database with a uri string" do
@@ -206,7 +206,7 @@ def pop_no_id(queue)
end
test "does not confuse normal collections on the same database with queues" do
- Resque.mongo["some_other_collection"] << {:foo => 'bar'}
+ Resque.mongo["some_other_collection"].insert(foo: 'bar')
Resque.push(:cars, { 'make' => 'bmw' })
assert_equal %w( cars people ), Resque.queues.sort
end
@@ -218,8 +218,8 @@ def pop_no_id(queue)
test "can get the collection for a queue" do
collection = Resque.collection_for_queue(:people)
- assert_equal Mongo::Collection, collection.class
- assert_equal 3, collection.count
+ assert_equal Moped::Collection, collection.class
+ assert_equal 3, collection.find.count
end
test "can delete a queue" do

0 comments on commit cca937b

Please sign in to comment.