Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix the way we fetch jobs to work better for a large number of workers.

Instead of fetching 5 candidates at attempting to lock each one by one, just lock the next job in 1 query.

Example of old system and 100 workers (worst but not uncommon case):
1) 100 workers wake up and fetch 5 candidate jobs (they all get the same, or very similar set of 5 jobs), making a total of 100 SELECT calls
2) they all try to lock the first job (only 1 of 99 succeeds). The failing workers try the next, making a total of roughly 500 UPDATE calls
3) a total of 5 jobs get processed from about 600 database calls, and we restart the whole process

New system:
1) 100 workers wake up and fetch the next available job, each getting a single unique job (1 SELECT and 1 UPDATE call each).
2) a total of 100 jobs are processed with exactly 200 SQL calls

I've also included an example to make it all work in 1 call, avoiding an extra round-trip. This requires custom SQL only tested with PostgreSQL
  • Loading branch information...
commit 940bf92f0a90f07661f8683996a425dd09572222 1 parent f276ec2
@scosman authored
Showing with 25 additions and 10 deletions.
  1. +25 −10 lib/delayed/backend/active_record.rb
View
35 lib/delayed/backend/active_record.rb
@@ -54,16 +54,31 @@ def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
- # Find a few candidate jobs to run (in case some immediately get locked by others).
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
- scope = self.ready_to_run(worker_name, max_run_time)
- scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
- scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
- scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?
-
- ::ActiveRecord::Base.silence do
- scope.by_priority.all(:limit => limit)
- end
+ def self.reserve(worker, max_run_time = Worker.max_run_time)
+ # scope to filter to records that are "ready to run"
+ readyScope = self.ready_to_run(worker.name, max_run_time)
+
+ # scope to filter to the single next eligible job (locking it for update http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE)
+ nextScope = readyScope.scoped
+ nextScope = nextScope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
+ nextScope = nextScope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
+ nextScope = nextScope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?
+ nextScope = nextScope.scoped.by_priority.limit(1).lock(true)
+ nextScope = nextScope.scoped.select('id')
+
+ now = self.db_time_now
+
+ # This works on any database and uses seperate queries to lock and return the job
+ # Databases like PostgreSQL and MySQL that support "SELECT .. FOR UPDATE" (ActiveRecord Pessimistic locking) don't need the second application
+ # of 'readyScope' but it doesn't hurt and it ensures that the job being locked still meets ready_to_run criteria.
+ count = readyScope.where(:id => nextScope).update_all(:locked_at => now, :locked_by => worker.name)
+ return nil if count == 0
+ return self.where(:locked_at => now, :locked_by => worker.name).first
+
+ # This works on PostgreSQL and uses 1 less query, but uses SQL not supported nativly through ActiveRecord
+ #quotedTableName = ::ActiveRecord::Base.connection.quote_column_name(self.table_name)
+ #reserved = self.find_by_sql(["UPDATE #{quotedTableName} SET locked_at = ?, locked_by = ? WHERE id IN (#{nextScope.to_sql}) RETURNING *",now,worker.name])
+ #return reserved[0]
end
# Lock this job for this worker.
Please sign in to comment.
Something went wrong with that request. Please try again.