-
Notifications
You must be signed in to change notification settings - Fork 4
/
worker.py
111 lines (103 loc) · 4.4 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os, sys, signal, traceback
import time
from contextlib import contextmanager
from db import DBConnector
from job import Job
from logger import Logger
from util import get_current_time, get_time_delta
class TimeoutException(Exception): pass
class TerminatedException(Exception): pass
class Worker(object):
def __init__(self, dbstring, logger=None):
super(Worker, self).__init__()
self.logger = Logger(logger)
self.logger.info('Starting pyworker...')
self.database = DBConnector(dbstring, self.logger)
self.sleep_delay = 10
self.max_attempts = 3
self.max_run_time = 3600
self.queue_names = 'default'
hostname = os.uname()[1]
pid = os.getpid()
self.name = 'host:%s pid:%d' % (hostname, pid)
@contextmanager
def _time_limit(self, seconds):
def signal_handler(signum, frame):
raise TimeoutException, 'Execution expired. Either do ' + \
'the job faster or raise max_run_time > %d seconds' % \
self.max_run_time
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
@contextmanager
def _terminatable(self):
def signal_handler(signum, frame):
signal_name = 'SIGTERM' if signum == 15 else 'SIGINT'
self.logger.info('Received signal: %s' % signal_name)
raise TerminatedException(signal_name)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
yield
def run(self):
# continuously check for new jobs on specified queue from db
self._cursor = self.database.connect().cursor()
with self._terminatable():
while True:
self.logger.debug('Picking up jobs...')
job = self.get_job()
self._current_job = job # used in signal handlers
start_time = time.time()
try:
if type(job) == Job:
raise ValueError(('Unsupported Job: %s, please import it ' \
+ 'before you can handle it') % job.class_name)
elif job is not None:
self.logger.info('Running Job %d' % job.job_id)
with self._time_limit(self.max_run_time):
job.before()
job.run()
job.after()
job.success()
job.remove()
time.sleep(self.sleep_delay)
except Exception as exception:
if job is not None:
error_str = traceback.format_exc()
job.set_error_unlock(error_str)
if type(exception) == TerminatedException:
break
finally:
if job is not None:
time_diff = time.time() - start_time
self.logger.info('Job %d finished in %d seconds' % \
(job.job_id, time_diff))
self.database.disconnect()
def get_job(self):
def get_job_row():
now = get_current_time()
expired = now - get_time_delta(seconds=self.max_run_time)
now, expired = str(now), str(expired)
queues = self.queue_names.split(',')
queues = ', '.join(["'%s'" % q for q in queues])
query = '''
UPDATE delayed_jobs SET locked_at = '%s', locked_by = '%s'
WHERE id IN (SELECT delayed_jobs.id FROM delayed_jobs
WHERE ((run_at <= '%s'
AND (locked_at IS NULL OR locked_at < '%s')
OR locked_by = '%s') AND failed_at IS NULL)
AND delayed_jobs.queue IN (%s)
ORDER BY priority ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING
id, attempts, handler
''' % (now, self.name, now, expired, self.name, queues)
self.logger.debug('query: %s' % query)
self._cursor.execute(query)
return self._cursor.fetchone()
job_row = get_job_row()
if job_row:
return Job.from_row(job_row, max_attempts=self.max_attempts,
database=self.database, logger=self.logger)
else:
return None