Permalink
Browse files

Merge branch 'upstream2' of github.com:salimane/php-resque into salim…

…ane-upstream2
  • Loading branch information...
2 parents 67aece0 + 2030742 commit 3e73f71b6be8ce34b291187d30d78aa50207f5fa @pilif pilif committed Feb 14, 2012
Showing with 71 additions and 24 deletions.
  1. +33 −1 lib/Resque.php
  2. +3 −0 lib/Resque/Worker.php
  3. +35 −23 resque.php
View
@@ -20,6 +20,16 @@ class Resque
public static $redis = null;
/**
+ * @var redis backend server address
+ */
+ public static $server = null;
+
+ /**
+ * @var integer Db on which the Redis backend server is selected
+ */
+ public static $database = null;
+
+ /**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
*
@@ -28,6 +38,10 @@ class Resque
*/
public static function setBackend($server, $database = 0)
{
+ //save the params for later use
+ self::$server = $server;
+ self::$database = $database;
+
if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
self::$redis = new Resque_RedisCluster($server);
@@ -48,13 +62,31 @@ public static function setBackend($server, $database = 0)
}
/**
+ * Reconnect to the redis backend specified in during __construct
+ * @return Resque_Redis Instance of Resque_Redis. (redis ressource handle)
+ */
+ public static function ResetBackend()
+ {
+ if(is_array(self::$server)) {
+ self::$redis = new Resque_RedisCluster(self::$server);
+ }
+ else {
+ list($host, $port) = explode(':', self::$server);
+ self::$redis = new Resque_Redis($host, $port);
+ }
+ self::$redis->select(self::$database);
+ return self::$redis;
+ }
+
+ /**
* Return an instance of the Resque_Redis class instantiated for Resque.
*
* @return Resque_Redis Instance of Resque_Redis.
*/
public static function redis()
{
- if(is_null(self::$redis)) {
+ //try to reset the backend specified during __construct
+ if(is_null(self::$redis) && is_null(self::ResetBackend())) {
self::setBackend('localhost:6379');
}
View
@@ -206,6 +206,9 @@ public function work($interval = 5)
// We're the child. Run the job.
if($this->child === 0 || $this->child === false) {
+ //reconnect to Redis in child to avoid sharing same connection with parent process
+ // leading to race condition reading from the same socket, thus errors
+ Resque::ResetBackend();
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
View
@@ -44,36 +44,48 @@
$count = $COUNT;
}
-$perChild = 1;
-$PERCHILD = getenv('PERCHILD');
-if(!empty($PERCHILD) && $PERCHILD > 1) {
- $perChild = $PERCHILD;
-}
+$PIDFILE = getenv('PIDFILE');
+if($count > 1) {//start multiple workers by forking this process
+ $pids = '';
+ for($i = 0; $i < $count; ++$i) {
+ $pid = pcntl_fork();
+ if($pid === -1) {
+ die("Could not fork worker ".$i."\n");
+ }
+ // Child, start the worker
+ else if($pid === 0) {
+ //reconnect to Redis in child to avoid sharing same connection with parent process
+ // leading to race condition reading from the same socket, thus errors
+ Resque::ResetBackend();
+ $queues = explode(',', $QUEUE);
+ $worker = new Resque_Worker($queues);
+ $worker->logLevel = $logLevel;
+ fwrite(STDOUT, '*** Starting worker '.$worker."\n");
+ //to kill self before exit()
+ register_shutdown_function(create_function('$pars', 'posix_kill(getmypid(), SIGKILL);'), array());
+ $worker->work($interval);
+ //to avoid foreach loop in child process, exit with its order of creation $i
+ exit($i);
+ }
+ }
-if($count > 1) {
- for($i = 0; $i < $count; ++$i) {
- $pid = pcntl_fork();
- if($pid == -1) {
- die("Could not fork worker ".$i."\n");
- }
- // Child, start the worker
- else if(!$pid) {
- $queues = explode(',', $QUEUE);
- $worker = new Resque_Worker($queues, $perChild);
- $worker->logLevel = $logLevel;
- fwrite(STDOUT, '*** Starting worker '.$worker."\n");
- $worker->work($interval);
- break;
- }
- }
+ if ($PIDFILE) {
+ file_put_contents($PIDFILE, getmypid()) or
+ die('Could not write PID information to ' . $PIDFILE);
+ }
+
+ //in parent, wait for all child processes to terminate
+ while (pcntl_waitpid(0, $status) != -1) {
+ $status = pcntl_wexitstatus($status);
+ fwrite(STDOUT, '*** Worker '.$status." terminated\n");
+ }
}
// Start a single worker
else {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues, $perChild);
$worker->logLevel = $logLevel;
-
- $PIDFILE = getenv('PIDFILE');
+
if ($PIDFILE) {
file_put_contents($PIDFILE, getmypid()) or
die('Could not write PID information to ' . $PIDFILE);

0 comments on commit 3e73f71

Please sign in to comment.