Skip to content

Commit

Permalink
Refactored jobs:work, added jobs:clear.
Browse files Browse the repository at this point in the history
  - added Rake descriptions
  - factored jobs:work into Delayed::Worker for reuse in scripts
  - updated the README to suggest using Delayed::Worker.new.start
  • Loading branch information
jbarnette committed Oct 1, 2008
1 parent 3e5c373 commit 450908d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 70 deletions.
47 changes: 9 additions & 38 deletions README.textile
Expand Up @@ -14,7 +14,7 @@ It is a direct extraction from Shopify where the job table is responsible for a

h2. Changes

* 1.6 Renamed locked_until to locked_at. We now store when we start a given task instead of how long it will be locked by the worker. This allows us to get a reading on how long a task took to execute.
* 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
* 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
* 1.0 Initial release

Expand Down Expand Up @@ -56,48 +56,19 @@ This will simply create a Delayed::PerformableMethod job in the jobs table which
which are stored as their text representation and loaded from the database fresh when the job is actually run later.


h2. Running the tasks
h2. Running the jobs

You can invoke @rake jobs:work@ which will start working off jobs. You can cancel the rake task by @CTRL-C@.
You can invoke @rake jobs:work@ which will start working off jobs. You can cancel the rake task with @CTRL-C@.

At Shopify we run the the tasks from a simple script/job_runner which is being invoked by runnit:
You can also run by writing a simple @script/job_runner@, and invoking it externally:

<pre><code>
#!/usr/bin/env ruby
require File.dirname(__FILE__) + '/../config/environment'

SLEEP = 5

trap('TERM') { puts 'Exiting...'; $exit = true }
trap('INT') { puts 'Exiting...'; $exit = true }

puts "*** Starting job worker #{Delayed::Job.worker_name}"

begin

loop do
result = nil

realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end

count = result.sum

break if $exit

if count.zero?
sleep(SLEEP)
puts 'Waiting for more jobs...'
else
status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
RAILS_DEFAULT_LOGGER.info status
puts status
end

break if $exit
end
ensure
Delayed::Job.clear_locks!
end
Delayed::Worker.new.start
</code></pre>

h3. Cleaning up

You can invoke @rake jobs:clear@ to delete all jobs in the queue.
39 changes: 39 additions & 0 deletions lib/delayed/worker.rb
@@ -0,0 +1,39 @@
module Delayed
class Worker
SLEEP = 5

def initialize(options={})
@quiet = options[:quiet]
end

def start
puts "*** Starting job worker #{Delayed::Job.worker_name}" unless @quiet

trap('TERM') { puts 'Exiting...' unless @quiet; $exit = true }
trap('INT') { puts 'Exiting...' unless @quiet; $exit = true }

loop do
result = nil

realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end

count = result.sum

break if $exit

if count.zero?
sleep(SLEEP)
puts 'Waiting for more jobs...' unless @quiet
else
status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
RAILS_DEFAULT_LOGGER.info status
puts status unless @quiet
end

break if $exit
end
end
end
end
39 changes: 7 additions & 32 deletions tasks/jobs.rake
@@ -1,36 +1,11 @@
namespace :jobs do
desc "Clear the delayed_job queue."
task :clear => :environment do
Delayed::Job.delete_all
end

desc "Start a delayed_job worker."
task :work => :environment do

puts "*** Starting job worker #{Delayed::Job.worker_name}"

SLEEP = 5

trap('TERM') { puts 'Exiting...'; $exit = true }
trap('INT') { puts 'Exiting...'; $exit = true }

loop do
result = nil

realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end

count = result.sum

break if $exit

if count.zero?
sleep(SLEEP)
puts 'Waiting for more jobs...'
else
status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
RAILS_DEFAULT_LOGGER.info status
puts status
end

break if $exit
end
Delayed::Worker.new.start
end
end
end

0 comments on commit 450908d

Please sign in to comment.