Skip to content

Commit

Permalink
Added #before_fork and #after_fork on the backends that is called bef…
Browse files Browse the repository at this point in the history
…ore and after forking the background process
  • Loading branch information
bkeepers committed Mar 26, 2010
1 parent 800755a commit 855f1ba
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 1 deletion.
4 changes: 4 additions & 0 deletions lib/delayed/backend/active_record.rb
Expand Up @@ -27,6 +27,10 @@ class Job < ::ActiveRecord::Base
{:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
}
named_scope :by_priority, :order => 'priority ASC, run_at ASC'

def self.after_fork
ActiveRecord::Base.connection.reconnect!
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
Expand Down
8 changes: 8 additions & 0 deletions lib/delayed/backend/base.rb
Expand Up @@ -20,6 +20,14 @@ def enqueue(*args)
run_at = args[1]
self.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

# Hook method that is called before a new worker is forked
def before_fork
end

# Hook method that is called after a new worker is forked
def after_fork
end
end

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
Expand Down
8 changes: 8 additions & 0 deletions lib/delayed/backend/mongo_mapper.rb
Expand Up @@ -38,6 +38,14 @@ class Job

ensure_index [[:priority, 1], [:run_at, 1]]

def self.before_fork
::MongoMapper.connection.close
end

def self.after_fork
::MongoMapper.connection.connect_to_master
end

def self.db_time_now
::MongoMapper.time_class.now.utc
end
Expand Down
4 changes: 3 additions & 1 deletion lib/delayed/command.rb
Expand Up @@ -36,6 +36,8 @@ def initialize(args)
end

def daemonize
Delayed::Worker.backend.before_fork

ObjectSpace.each_object(File) do |file|
@files_to_reopen << file unless file.closed?
end
Expand Down Expand Up @@ -64,7 +66,7 @@ def run(worker_name = nil)
if Delayed::Worker.logger.respond_to? :auto_flushing=
Delayed::Worker.logger.auto_flushing = true
end
ActiveRecord::Base.connection.reconnect!
Delayed::Worker.backend.after_fork

worker = Delayed::Worker.new(@options)
worker.name_prefix = "#{worker_name} "
Expand Down

0 comments on commit 855f1ba

Please sign in to comment.