-
Notifications
You must be signed in to change notification settings - Fork 0
/
Worker.php
127 lines (105 loc) · 3.22 KB
/
Worker.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
<?php
namespace Silverslice\QuickRabbit;
use PhpAmqpLib\Message\AMQPMessage;
class Worker
{
/** @var Connection Connection */
protected $connection;
protected $queue;
protected $shouldExit = false;
protected $debug = false;
/** @var callable */
protected $failedCallback;
public function __construct(Connection $connection)
{
$this->connection = $connection;
$this->queue = new Queue($connection);
}
public function run()
{
$this->debug('Worker started');
$this->registerSignalHandler();
$callback = function (AMQPMessage $msg) {
$this->handle($msg);
};
$channel = $this->connection->getChanel();
$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->connection->queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming() && !$this->shouldExit) {
$channel->wait();
}
}
/**
* Sets callback for failed jobs.
* Will be executed if job is not retryable
*
* @param callable $callback
*/
public function setFailedCallback(callable $callback)
{
$this->failedCallback = $callback;
}
/**
* Enables or disables debug messages
*
* @param $val
*/
public function setDebug($val)
{
$this->debug = $val;
}
protected function registerSignalHandler()
{
pcntl_async_signals(true);
foreach ([SIGINT, SIGTERM, SIGHUP] as $sig) {
pcntl_signal($sig, function () {
$this->shouldExit = true;
$this->debug("Worker stopped");
});
}
}
protected function handle(AMQPMessage $msg)
{
$this->debug('Received job: ' . $msg->body);
/** @var AbstractJob $job */
$job = unserialize($msg->body);
$retries = 0;
$headers = [];
if ($msg->has('application_headers')) {
$headers = $msg->get('application_headers');
$headers = $headers->getNativeData();
}
if (isset($headers['retries'])) {
$retries = $headers['retries'];
}
try {
$this->debug('Execute job, retries=' . $retries);
$job->execute();
$msg->ack();
$this->debug('Job is done');
} catch (\Throwable $e) {
$msg->ack();
$retries = $retries + 1;
if ($job->isRetryable($retries)) {
// redeliver
$delay = $job->getRetryDelay($retries);
$this->debug("Job failed. Redeliver with delay $delay, retry $retries");
$this->queue->pushWithDelay($job, $job->getRetryDelay($retries), ['retries' => $retries]);
} else { // not retryable
$this->debug('Job failed. Not retryable, reject');
if ($this->failedCallback) {
$func = $this->failedCallback;
$func($job, $e);
}
}
}
}
protected function debug($msg)
{
if ($this->debug) {
$date = date('Y-m-d H:i:s');
$pid = getmypid();
echo "[$date] [$pid] $msg" . PHP_EOL;
}
}
}