-
Notifications
You must be signed in to change notification settings - Fork 153
/
worker.rb
171 lines (151 loc) · 5.69 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# frozen_string_literal: true
require_relative 'queue'
require_relative 'conn_adapter'
module QC
# A Worker object can process jobs from one or many queues.
class Worker
attr_accessor :queues, :running
# Creates a new worker but does not start the worker. See Worker#start.
# This method takes a single hash argument. The following keys are read:
# fork_worker:: Worker forks each job execution.
# wait_interval:: Time to wait between failed lock attempts
# connection:: PG::Connection object.
# q_name:: Name of a single queue to process.
# q_names:: Names of queues to process. Will process left to right.
# top_bound:: Offset to the head of the queue. 1 == strict FIFO.
def initialize(args = {})
@fork_worker = args[:fork_worker] || QC.fork_worker?
@wait_interval = args[:wait_interval] || QC.wait_time
@conn_adapter = if args[:connection]
ConnAdapter.new(connection: args[:connection])
else
QC.default_conn_adapter
end
@queues = setup_queues(@conn_adapter,
(args[:q_name] || QC.queue),
(args[:q_names] || QC.queues),
(args[:top_bound] || QC.top_bound))
log(args.merge(at: 'worker_initialized'))
@running = true
end
# Commences the working of jobs.
# start() spins on @running –which is initialized as true.
# This method is the primary entry point to starting the worker.
# The canonical example of starting a worker is as follows:
# QC::Worker.new.start
def start
QC.unlock_jobs_of_dead_workers
@fork_worker ? fork_and_work : work while @running
end
# Signals the worker to stop taking new work.
# This method has no immediate effect. However, there are
# two loops in the worker (one in #start and another in #lock_job)
# which check the @running variable to determine if further progress
# is desirable. In the case that @running is false, the aforementioned
# methods will short circuit and cause the blocking call to #start
# to unblock.
def stop
@running = false
end
# Calls Worker#work but after the current process is forked.
# The parent process will wait on the child process to exit.
def fork_and_work
cpid = fork do
setup_child
work
end
log(at: :fork, pid: cpid)
Process.wait(cpid)
end
# Blocks on locking a job, and once a job is locked,
# it will process the job.
def work
queue, job = lock_job
return unless queue && job
QC.log_yield(at: 'work', job: job[:id]) do
process(queue, job)
end
end
# Attempt to lock a job in the queue's table.
# If a job can be locked, this method returns an array with
# 2 elements. The first element is the queue from which the job was locked
# and the second is a hash representation of the job.
# If a job is returned, its locked_at column has been set in the
# job's row. It is the caller's responsibility to delete the job row
# from the table when the job is complete.
def lock_job
log(at: 'lock_job')
job = nil
while @running
@queues.each do |queue|
if (job = queue.lock)
return [queue, job]
end
end
@conn_adapter.wait(@wait_interval, *@queues.map(&:name))
end
end
# A job is processed by evaluating the target code.
# if the job is evaluated with no exceptions
# then it is deleted from the queue.
# If the job has raised an exception the responsibility of what
# to do with the job is delegated to Worker#handle_failure.
# If the job is not finished and an INT signal is trapped,
# this method will unlock the job in the queue.
def process(queue, job)
start = Time.now
finished = false
begin
call(job).tap do
handle_success(queue, job)
finished = true
end
rescue StandardError, ScriptError, NoMemoryError => e
# We really only want to unlock the job for signal and system exit
# exceptions. If we encounter a ScriptError or a NoMemoryError any
# future run will likely encounter the same error.
handle_failure(job, e)
finished = true
ensure
queue.unlock(job[:id]) unless finished
ttp = Integer((Time.now - start) * 1000)
QC.measure("time-to-process=#{ttp} source=#{queue.name}")
end
end
# Each job includes a method column. We will use ruby's eval
# to grab the ruby object from memory. We send the method to
# the object and pass the args.
def call(job)
args = job[:args]
receiver_str, _, message = job[:method].rpartition('.')
receiver = eval(receiver_str) # rubocop:disable Security/Eval
receiver.send(message, *args)
end
def handle_success(queue, job)
queue.delete(job[:id])
end
# This method will be called when a StandardError, ScriptError or
# NoMemoryError is raised during the execution of the job.
def handle_failure(job, e)
warn("count#qc.job-error=1 job=#{job} error=#{e.inspect} at=#{e.backtrace.first}")
end
# This method should be overriden if
# your worker is forking and you need to
# re-establish database connections
def setup_child
log(at: 'setup_child')
end
def log(data)
QC.log(data)
end
private
def setup_queues(adapter, queue, queues, top_bound)
names = queues.length.positive? ? queues : [queue]
names.map do |name|
QC::Queue.new(name, top_bound).tap do |q|
q.conn_adapter = adapter
end
end
end
end
end