Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'dont_kill_wrong_job' into remove_exhausted_retries_error

  • Loading branch information...
commit e7c93898d9f529f7d820a4a0edd99d1bcf3d07ed 2 parents a103b1c + bfd3ce0
@myronmarston myronmarston authored
Showing with 48 additions and 7 deletions.
  1. +22 −5 lib/qless/worker.rb
  2. +26 −2 spec/integration/worker_spec.rb
View
27 lib/qless/worker.rb
@@ -336,7 +336,7 @@ def log!(message)
def start_parent_pub_sub_listener_for(client)
Subscriber.start(client, "ql:w:#{Qless.worker_name}") do |subscriber, message|
- if message["event"] == "lock_lost"
+ if message["event"] == "lock_lost" && message["jid"] == current_job_jid
fail_job_due_to_timeout
kill_child
end
@@ -358,13 +358,30 @@ def with_job(job)
@job = nil
end
+ # To prevent race conditions (with our listener thread),
+ # we cannot use a pattern like `use(@job) if @job` because
+ # the value of `@job` could change between the checking of
+ # it and the use of it. Here we use a pattern that avoids
+ # the issue -- get the job into a local, and yield that if
+ # it is set.
+ def access_current_job
+ if job = @job
+ yield job
+ end
+ end
+
+ def current_job_jid
+ access_current_job &:jid
+ end
+
JobLockLost = Class.new(StandardError)
def fail_job_due_to_timeout
- return unless job = @job
- error = JobLockLost.new
- error.set_backtrace(get_backtrace_from_child(job.client.redis))
- fail_job(job, error, caller)
+ access_current_job do |job|
+ error = JobLockLost.new
+ error.set_backtrace(get_backtrace_from_child(job.client.redis))
+ fail_job(job, error, caller)
+ end
end
def notify_parent_of_job_backtrace(client, list)
View
28 spec/integration/worker_spec.rb
@@ -35,6 +35,19 @@ def self.perform(job)
end
slow_job_line = __LINE__ - 4
+class BroadcastLockLostForDifferentJIDJob
+ def self.perform(job)
+ worker_name = job['worker_name']
+ redis = Redis.connect(url: job['redis_url'])
+ message = JSON.dump(jid: 'abc', event: 'lock_lost', worker: worker_name)
+ listener_count = redis.publish("ql:w:#{worker_name}", message)
+ raise "Worker is not listening, apparently" unless listener_count == 1
+
+ sleep 1
+ redis.set("broadcast_lock_lost_job_completed", "true")
+ end
+end
+
describe "Worker integration", :integration do
def start_worker(run_as_single_process)
unless @child = fork
@@ -91,14 +104,25 @@ def start_worker(run_as_single_process)
let(:queue) { client.queues["main"] }
let(:worker) { Qless::Worker.new(Qless::JobReservers::RoundRobin.new([queue])) }
- def enqueue_job_and_process
+ def enqueue_job_and_process(klass = SlowJob)
queue.heartbeat = 1
- jid = queue.put(SlowJob, "redis_url" => client.redis.client.id)
+ jid = queue.put(klass, "redis_url" => client.redis.client.id,
+ "worker_name" => Qless.worker_name)
worker.work(0)
jid
end
+ context 'when the lock_list message has the jid of a different job' do
+ it 'does not kill or fail the job' do
+ jid = enqueue_job_and_process(BroadcastLockLostForDifferentJIDJob)
+ expect(client.redis.get("broadcast_lock_lost_job_completed")).to eq("true")
+
+ job = client.jobs[jid]
+ expect(job.state).to eq("complete")
+ end
+ end
+
it 'kills the child process' do
enqueue_job_and_process
expect(client.redis.get("slow_job_completed")).to be_nil
Please sign in to comment.
Something went wrong with that request. Please try again.