Skip to content

Commit

Permalink
Merge 84882df into d74b3ce
Browse files Browse the repository at this point in the history
  • Loading branch information
atsheehan committed Jul 6, 2017
2 parents d74b3ce + 84882df commit 8989b59
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
5 changes: 5 additions & 0 deletions lib/delayed/heartbeat/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def unlock_jobs(mark_attempt_failed: true)
orphaned_jobs
end

# Unlocks jobs held by the worker as an atomic operation.
def clear_locks
jobs.update_all(locked_at: nil, locked_by: nil)
end

def self.dead_workers(timeout_seconds)
where('last_heartbeat_at < ?', Time.now.utc - timeout_seconds)
end
Expand Down
17 changes: 14 additions & 3 deletions lib/delayed/heartbeat/worker_heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,20 @@ def run_heartbeat_loop
exit(false)
ensure
@stop_reader.close
@worker_model.delete
# Note: The built-in Delayed::Plugins::ClearLocks will unlock the jobs for us
Delayed::Backend::ActiveRecord::Job.clear_active_connections!

# Note: The built-in Delayed::Plugins::ClearLocks will try to
# unlock the jobs for us, but in some cases this may fail
# (e.g. the worker is interrupted in a critical section and
# the database connection is in an unusable state). Here we
# first close all existing connections to release any locks on
# the delayed jobs/workers table, then reconnect with a known
# good connection to clear locks on jobs held by this worker.
Delayed::Backend::ActiveRecord::Job.clear_all_connections!

Delayed::Backend::ActiveRecord::Job.transaction do
@worker_model.clear_locks
@worker_model.delete
end
end

def update_heartbeat
Expand Down
5 changes: 5 additions & 0 deletions spec/db/database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@ sqlite3:
adapter: sqlite3
pool: 5
timeout: 15000
# The checkout_timeout determines how long we wait before forcing a
# disconnect on an in-use connection. We rely on this behavior when
# testing shutting down workers, so this is set to a small value to
# ensure the tests run fast.
checkout_timeout: 0.01
database: tmp/sqlite3.db
5 changes: 5 additions & 0 deletions spec/delayed/heartbeat/worker_heartbeat_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
it "destroys the worker model" do
expect(find_worker_model(worker_name)).not_to be_present
end

it "unlocks the worker jobs" do
expect(job.reload.locked_by).to be_nil
expect(job.reload.locked_at).to be_nil
end
end

context "when the heartbeat times out" do
Expand Down

0 comments on commit 8989b59

Please sign in to comment.