Skip to content

Commit

Permalink
added specs and made them pass for couchrest adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
nightshade427 authored and rbriank committed May 6, 2010
1 parent 7e41b4e commit 5594fcc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 22 deletions.
96 changes: 77 additions & 19 deletions lib/delayed/backend/couch_rest.rb
@@ -1,17 +1,23 @@
require 'couchrest'

#make sure database exists. If not create it.
CouchRest::Server.new.database!('delayed_job')

#extent couchrest to handle delayed_job serialization.
class CouchRest::ExtendedDocument
def self.load_for_delayed_job(id)
(id)? get(id) : super
end

end
def dump_for_delayed_job
"#{self.class};#{id}"
end
def self.find(id)
get id
end
def ==(other)
if other.is_a? ::CouchRest::ExtendedDocument
self['_id'] == other['_id']
else
super
end
end
end

#couchrest adapter
Expand All @@ -25,47 +31,99 @@ class Job < ::CouchRest::ExtendedDocument
property :priority
property :attempts
property :handler
property :run_at
property :locked_at
property :run_at, :cast_as => 'Time'
property :locked_at, :cast_as => 'Time'
property :locked_by
property :failed_at
property :failed_at, :cast_as => 'Time'
property :last_error
timestamps!

view_by(:locked_by, :run_at,
view_by(:failed_at, :locked_by, :run_at,
:map => "function(doc){" +
" if(doc['couchrest-type'] == 'Delayed::Backend::CouchRest::Job' && doc.run_at) {" +
" var locked_by = doc.locked_by || '';" +
" emit([locked_by, doc.run_at], null);}" +
" if(doc['couchrest-type'] == 'Delayed::Backend::CouchRest::Job') {" +
" emit([doc.failed_at, doc.locked_by, doc.run_at], null);}" +
" }")

view_by(:failed_at, :locked_at, :run_at,
:map => "function(doc){" +
" if(doc['couchrest-type'] == 'Delayed::Backend::CouchRest::Job') {" +
" emit([doc.failed_at, doc.locked_at, doc.run_at], null);}" +
" }")

set_callback :save, :before, :set_default_run_at
set_callback :save, :before, :set_default_attempts
set_callback :save, :before, :set_default_priority
set_callback :save, :before, :set_default_priority
set_callback :save, :before, :set_default_locked_by
set_callback :save, :before, :set_default_failed_at
set_callback :save, :before, :set_default_locked_at

def self.db_time_now; Time.now; end
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
by_locked_by_and_run_at :start_key => [''], :end_key => ['', db_time_now], :limit => limit
def self.find_available(worker_name, limit = 5, max_run_time = ::Delayed::Worker.max_run_time)
ready = ready_jobs worker_name, limit, max_run_time
mine = my_jobs worker_name, limit, max_run_time
expire = expired_jobs worker_name, limit, max_run_time
jobs = (ready + mine + expire)[0..limit-1].sort_by { |j| j.priority }
jobs = jobs.find_all { |j| j.priority >= Worker.min_priority } if Worker.min_priority
jobs = jobs.find_all { |j| j.priority <= Worker.max_priority } if Worker.max_priority
jobs
end
def self.clear_locks!(worker_name)
docs = by_locked_by_and_run_at :startkey => [worker_name], :endkey => [worker_name, {}]
docs.each { |doc| doc.locked_by, doc.locked_at = nil, nil; }
docs = by_failed_at_and_locked_by_and_run_at :startkey => ['', worker_name], :endkey => ['', worker_name, {}]
docs.each { |doc| doc.locked_by, doc.locked_at = '', ''; }
database.bulk_save docs
end
def self.delete_all
database.bulk_save all.each { |doc| doc['_deleted'] = true }
end
def self.db=(db_to_use)
use_database ::CouchRest::Server.new.database(db_to_use)
end

def lock_exclusively!(max_run_time, worker = worker_name)
self.locked_at, self.locked_by = self.class.db_time_now, worker
return false if locked_by_other?(worker) and not expired?(max_run_time)
case
when locked_by_me?(worker)
self.locked_at = self.class.db_time_now
when (unlocked? or (locked_by_other?(worker) and expired?(max_run_time)))
self.locked_at, self.locked_by = self.class.db_time_now, worker
end
save
rescue RestClient::Conflict
false
end
def set_default_priority
self.priority = 0 if priority.nil?
end
def set_default_attempts
self.attempts = 0 if attempts.nil?
end
def set_default_locked_by
self.locked_by = '' if locked_by.nil?
end
def set_default_failed_at
self.failed_at = '' if failed_at.nil?
end
def set_default_locked_at
self.locked_at = '' if locked_at.nil?
end
def reload; end

private
def self.ready_jobs(worker_name, limit, max_run_time)
options = {:startkey => ['', ''], :endkey => ['', '', db_time_now]}
by_failed_at_and_locked_by_and_run_at options
end
def self.my_jobs(worker_name, limit, max_run_time)
options = {:startkey => ['', worker_name], :endkey => ['', worker_name, {}]}
by_failed_at_and_locked_by_and_run_at options
end
def self.expired_jobs(worker_name, limit, max_run_time)
options = {:startkey => ['','0'], :endkey => ['', db_time_now - max_run_time, db_time_now]}
by_failed_at_and_locked_at_and_run_at options
end
def unlocked?; locked_by == ''; end
def expired?(time); locked_at < self.class.db_time_now - time; end
def locked_by_me?(worker); locked_by != '' and locked_by == worker; end
def locked_by_other?(worker); locked_by != '' and locked_by != worker; end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/backend/couch_rest_job_spec.rb
Expand Up @@ -8,7 +8,7 @@
end

before(:each) do
CouchRest.recreate!('delayed_job_spec')
@backend.delete_all
end

it_should_behave_like 'a backend'
Expand Down
5 changes: 3 additions & 2 deletions spec/setup/couch_rest.rb
@@ -1,5 +1,6 @@
require 'couchrest'
require 'delayed/backend/couch_rest'

CouchRest.logger = Delayed::Worker.logger
CouchRest::Server.new.database!('delayed_job_spec')
db = 'delayed_job_spec'
CouchRest::Server.new.database!(db)
Delayed::Backend::CouchRest::Job.db = db

0 comments on commit 5594fcc

Please sign in to comment.