Skip to content

Commit

Permalink
Merge pull request #103 from cristianbica/adapter-fixes
Browse files Browse the repository at this point in the history
Fixed qu, queue_classic, sneakers adapters
  • Loading branch information
dhh committed Aug 6, 2014
2 parents 38ee4fd + 6ff5972 commit 06ccd5f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
8 changes: 5 additions & 3 deletions lib/active_job/queue_adapters/qu_adapter.rb
Expand Up @@ -5,7 +5,9 @@ module QueueAdapters
class QuAdapter
class << self
def enqueue(job, *args)
Qu::Payload.new(klass: JobWrapper, args: [job, *args], queue: job.queue_name).push
Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end

def enqueue_at(job, timestamp, *args)
Expand All @@ -14,8 +16,8 @@ def enqueue_at(job, timestamp, *args)
end

class JobWrapper < Qu::Job
def initialize(job, *args)
@job = job
def initialize(job_name, *args)
@job = job_name.constantize
@args = args
end

Expand Down
6 changes: 3 additions & 3 deletions lib/active_job/queue_adapters/queue_classic_adapter.rb
Expand Up @@ -5,7 +5,7 @@ module QueueAdapters
class QueueClassicAdapter
class << self
def enqueue(job, *args)
QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args)
QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args)
end

def enqueue_at(job, timestamp, *args)
Expand All @@ -14,8 +14,8 @@ def enqueue_at(job, timestamp, *args)
end

class JobWrapper
def self.perform(job, *args)
job.new.execute *args
def self.perform(job_name, *args)
job_name.constantize.new.execute *args
end
end
end
Expand Down
8 changes: 5 additions & 3 deletions lib/active_job/queue_adapters/sneakers_adapter.rb
Expand Up @@ -10,7 +10,7 @@ class << self
def enqueue(job, *args)
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
JobWrapper.enqueue [ job, *args ]
JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ])
end
end

Expand All @@ -22,8 +22,10 @@ def enqueue_at(job, timestamp, *args)
class JobWrapper
include Sneakers::Worker

def work(job, *args)
job.new.execute *args
def work(msg)
job_name, *args = ActiveSupport::JSON.decode(msg)
job_name.constantize.new.execute *args
ack!
end
end
end
Expand Down

0 comments on commit 06ccd5f

Please sign in to comment.