Skip to content

Commit

Permalink
Merge branch 'dont_kill_wrong_job' into remove_exhausted_retries_error
Browse files Browse the repository at this point in the history
  • Loading branch information
myronmarston committed May 2, 2013
2 parents a103b1c + bfd3ce0 commit e7c9389
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
27 changes: 22 additions & 5 deletions lib/qless/worker.rb
Expand Up @@ -336,7 +336,7 @@ def log!(message)

def start_parent_pub_sub_listener_for(client)
Subscriber.start(client, "ql:w:#{Qless.worker_name}") do |subscriber, message|
if message["event"] == "lock_lost"
if message["event"] == "lock_lost" && message["jid"] == current_job_jid
fail_job_due_to_timeout
kill_child
end
Expand All @@ -358,13 +358,30 @@ def with_job(job)
@job = nil
end

# To prevent race conditions (with our listener thread),
# we cannot use a pattern like `use(@job) if @job` because
# the value of `@job` could change between the checking of
# it and the use of it. Here we use a pattern that avoids
# the issue -- get the job into a local, and yield that if
# it is set.
def access_current_job
if job = @job
yield job
end
end

def current_job_jid
access_current_job &:jid
end

JobLockLost = Class.new(StandardError)

def fail_job_due_to_timeout
return unless job = @job
error = JobLockLost.new
error.set_backtrace(get_backtrace_from_child(job.client.redis))
fail_job(job, error, caller)
access_current_job do |job|
error = JobLockLost.new
error.set_backtrace(get_backtrace_from_child(job.client.redis))
fail_job(job, error, caller)
end
end

def notify_parent_of_job_backtrace(client, list)
Expand Down
28 changes: 26 additions & 2 deletions spec/integration/worker_spec.rb
Expand Up @@ -35,6 +35,19 @@ def self.perform(job)
end
slow_job_line = __LINE__ - 4

class BroadcastLockLostForDifferentJIDJob
def self.perform(job)
worker_name = job['worker_name']
redis = Redis.connect(url: job['redis_url'])
message = JSON.dump(jid: 'abc', event: 'lock_lost', worker: worker_name)
listener_count = redis.publish("ql:w:#{worker_name}", message)
raise "Worker is not listening, apparently" unless listener_count == 1

sleep 1
redis.set("broadcast_lock_lost_job_completed", "true")
end
end

describe "Worker integration", :integration do
def start_worker(run_as_single_process)
unless @child = fork
Expand Down Expand Up @@ -91,14 +104,25 @@ def start_worker(run_as_single_process)
let(:queue) { client.queues["main"] }
let(:worker) { Qless::Worker.new(Qless::JobReservers::RoundRobin.new([queue])) }

def enqueue_job_and_process
def enqueue_job_and_process(klass = SlowJob)
queue.heartbeat = 1

jid = queue.put(SlowJob, "redis_url" => client.redis.client.id)
jid = queue.put(klass, "redis_url" => client.redis.client.id,
"worker_name" => Qless.worker_name)
worker.work(0)
jid
end

context 'when the lock_list message has the jid of a different job' do
it 'does not kill or fail the job' do
jid = enqueue_job_and_process(BroadcastLockLostForDifferentJIDJob)
expect(client.redis.get("broadcast_lock_lost_job_completed")).to eq("true")

job = client.jobs[jid]
expect(job.state).to eq("complete")
end
end

it 'kills the child process' do
enqueue_job_and_process
expect(client.redis.get("slow_job_completed")).to be_nil
Expand Down

0 comments on commit e7c9389

Please sign in to comment.