diff --git a/src/Console/Controller.php b/src/Console/Controller.php index 0726b06..9cedd85 100644 --- a/src/Console/Controller.php +++ b/src/Console/Controller.php @@ -24,7 +24,7 @@ * 'queue' => 'UrbanIndo\Yii2\Queue\Console\QueueController' * ], * ]; - * + * * OR * * return [ @@ -51,7 +51,7 @@ class Controller extends \yii\console\Controller * @var string|array|Queue the name of the queue component. default to 'queue'. */ public $queue = 'queue'; - + /** * @var integer sleep timeout for infinite loop in second */ @@ -61,14 +61,14 @@ class Controller extends \yii\console\Controller * @var string the name of the command. */ private $_name = 'queue'; - + /** * @return void */ public function init() { parent::init(); - + if (!is_numeric($this->sleepTimeout)) { throw new InvalidParamException('($sleepTimeout) must be an number'); } @@ -76,8 +76,9 @@ public function init() if ($this->sleepTimeout < 0) { throw new InvalidParamException('($sleepTimeout) must be greater or equal than 0'); } - + $this->queue = \yii\di\Instance::ensure($this->queue, Queue::className()); + $this->queue->processRunner->setScriptPath($this->getScriptPath()); } /** @@ -110,101 +111,21 @@ protected function getScriptPath() * The format for each element is 'KEY=VAL'. * @return void */ - public function actionListen($cwd = null, $timeout = null, array $env = null) - { - $this->stdout("Listening to queue...\n"); - $this->initSignalHandler(); - $command = PHP_BINARY . " {$this->getScriptPath()} {$this->_name}/run"; - declare(ticks = 1); - - $queueSize = intval($this->queue->getSize()); - - while (true) { - - if($this->queue->waitSecondsIfNoQueue > 0){ - $this->stdout("Queue {$queueSize} \n"); - if($queueSize == 0){ - $this->stdout("NO Queue, Waiting {$this->queue->waitSecondsIfNoQueue}s to save cpu... \n"); - sleep($this->queue->waitSecondsIfNoQueue); - $queueSize = $this->queue->getSize(); - }else{ - $queueSize--; - if($queueSize<=0){ - $queueSize = $this->queue->getSize(); - } - } - } - - $this->stdout("Running new process...\n"); - $this->runQueueFetching($command, $cwd, $timeout, $env); - - if ($this->sleepTimeout > 0) { - sleep($this->sleepTimeout); - } - } - $this->stdout("Exiting...\n"); - } - - /** - * Run the queue fetching process. - * @param string $command The command. - * @param string $cwd The working directory. - * @param integer $timeout The timeout. - * @param array $env The environment to be passed. - * @return void - */ - protected function runQueueFetching( - $command, + public function actionListen( $cwd = null, - $timeout = null, + $timeout = null, // moved to queue config array $env = null ) { - $process = new \Symfony\Component\Process\Process( - $command, - isset($cwd) ? $cwd : getcwd(), - $env, - null, - $timeout - ); - $process->setTimeout($timeout); - $process->setIdleTimeout(null); - $process->run(); - - $out = $process->getOutput(); - $err = $process->getErrorOutput(); + $this->stdout("Listening to queue...\n"); - if ($process->isSuccessful()) { - $this->stdout($out . PHP_EOL); - $this->stdout($err . PHP_EOL); - } else { - $this->stderr($out . PHP_EOL); - $this->stderr($err . PHP_EOL); - Yii::warning($out, 'yii2queue'); - Yii::warning($err, 'yii2queue'); + try { + $this->queue->processRunner->listen($cwd,$timeout,$env); + } + catch (Exception $e) { + Yii::error($e->getMessage(),__METHOD__); } - } - - /** - * Initialize signal handler for the process. - * @return void - */ - protected function initSignalHandler() - { - $signalHandler = function ($signal) { - switch ($signal) { - case SIGTERM: - $this->stderr('Caught SIGTERM'); - Yii::error('Caught SIGTERM', 'yii2queue'); - exit; - case SIGINT: - $this->stderr('Caught SIGINT'); - Yii::error('Caught SIGINT', 'yii2queue'); - exit; - } - }; - pcntl_signal(SIGTERM, $signalHandler); - pcntl_signal(SIGINT, $signalHandler); + $this->stdout("Exiting...\n"); } /** diff --git a/src/ProcessRunner.php b/src/ProcessRunner.php new file mode 100644 index 0000000..81c8a0d --- /dev/null +++ b/src/ProcessRunner.php @@ -0,0 +1,450 @@ + + * @since 2017.08.01 + */ + +namespace UrbanIndo\Yii2\Queue; + +use IteratorAggregate; +use ArrayIterator; +use Symfony\Component\Process\Process; +use Yii; +use yii\helpers\Console; +use yii\base\InvalidConfigException; + +/** + * The process runner is responsible for all the threads management + * Listens to the queue, based on the config launches x number of processes + * Cleans zombies after they are done, in single threaded mode, runs processes in foreground + * + * @author Marek Petras + * @since 2017.08.01 + */ +class ProcessRunner extends \yii\base\Component implements IteratorAggregate +{ + /** + * @var string $cwd working directory to launch the sub processes in; default to current + */ + protected $cwd = null; + + /** + * @var array $env enviromental vars to be passed to the sub process + */ + protected $env = []; + + /** + * @var string $scriptPath the yii executable + */ + private $_scriptPath = null; + + /** + * @var Queue $queue queue + */ + private $_queue; + + /** + * @var array $procs current processes + */ + private $procs = []; + + /** + * queue setter + * @param Queue $queue the job queue + * @return self + */ + public function setQueue( Queue $queue ) + { + $this->_queue = $queue; + return $this; + } + + /** + * queue getter + * @return Queue + */ + public function getQueue() + { + return $this->_queue; + } + + /** + * set yii executable + * @param string $scriptPath real path to the file + * @return self + * @throws InvalidConfigException on non existent file + */ + public function setScriptPath( $scriptPath ) + { + if ( !is_executable($scriptPath) ) { + throw new InvalidConfigException('Invalid script path:' . $scriptPath); + } + + $this->_scriptPath = $scriptPath; + return $this; + } + + /** + * retreive current script path + * @return string script path + * @throws InvalidConfigException on non existent file + */ + public function getScriptPath() + { + if ( !is_executable($this->_scriptPath) ) { + throw new InvalidConfigException('Invalid script path:' . $this->_scriptPath); + } + + return $this->_scriptPath; + } + + /** + * IteratorAggregate implementation + * @return ArrayIterator running processes + */ + public function getIterator() + { + return new ArrayIterator($this->procs); + } + + /** + * listen to the queue, launch processes based on queue settings + * clean up, catch signals, propagate to sub processes if required or wait for completion + * launches new jobs from the queue when current < maxprocs + * @param string $cwd current working dir + * @param int $timeout timeout to be passed on to the sub processes + * @param array $env enviromental variables for the sub proc + * @return void + */ + public function listen( $cwd = null, $timeout = 0, array $env = null ) + { + $this->cwd = $cwd; + $this->env = $env; + + $this->initSignalHandler(); + + declare(ticks = 1); + + while (true) { + + // determine the size of the queue + $queueSize = $this->getQueueSize(); + + $this->stdout(sprintf('queueSize: %d , opened: %d , limit: %d ', + $queueSize,$this->getOpenedProcsCount(),$this->getMaxProcesses()).PHP_EOL); + + // check for defunct processes + $this->cleanUpProcs(); + + // if we have queue and open spots, launch new ones + if ( $queueSize ) { + if ( $this->getCanOpenNew() ) { + $this->stdout("Running new process...\n"); + $this->runProcess( + $this->buildCommand() + ); + } + else { + $this->stdout(sprintf('Nothing to do, Waiting for processes to finish; queueSize: %d , opened: %d , limit: %d ', + $queueSize,$this->getOpenedProcsCount(),$this->getMaxProcesses()).PHP_EOL); + sleep($this->queue->waitSecondsIfNoProcesses); // wait x seconds then try cleaning up + } + } + else { + if ( $this->queue->waitSecondsIfNoQueue > 0 ) { + $this->stdout('NO Queue, Waiting '.$this->queue->waitSecondsIfNoQueue.' to save cpu...' . PHP_EOL); + sleep($this->queue->waitSecondsIfNoQueue); + } + } + + // sleep if we want to between lanuching new processes + if ($this->getSleepTimeout() > 0) { + sleep($this->getSleepTimeout()); + } + } + } + + /** + * run the sub process, register it with others, + * if we are in single threaded mode, wait for it to finish before moving on + * @param string $command the command to exec + * @param string $cwd + * @return void + */ + public function runProcess( $command ) + { + $process = new Process( + $command, + $this->cwd ? $this->cwd : getcwd(), + $this->env + ); + + $this->stdout('Running ' . $command . ' (mode: ' . ($this->getIsSingleThreaded() ? 'single' : 'multi') . ')' . PHP_EOL); + + $process->setTimeout($this->getTimeout()); + $process->setIdleTimeout($this->getIdleTimeout()); + $process->start(); + + $this->addProcess($process); + + if ( $this->getIsSingleThreaded() ) { + + $this->stdout('Running in sync mode' . PHP_EOL); + + $pid = $process->getPid(); + + $process->wait(function($type,$data){ + $method = 'std'.$type; + $this->{$method}($data); + }); + + $this->stdout('Done, cleaning:' . $pid . PHP_EOL); + + $this->cleanUpProc($process, $pid); + } + } + + /** + * add the process to the currently running to be cleaned up after finish + * @param Process $process the process object + * @return self + */ + public function addProcess( Process $process ) + { + $this->procs[$process->getPid()] = $process; + return $this; + } + + /** + * clean up defunct processes running in background + * @return void + */ + public function cleanUpProcs() + { + if ( is_array($this->procs) && ($cntProcs = count($this->procs)) > 0 ) { + + $this->stdout('Currently see ' . $cntProcs . ' processes' . PHP_EOL); + + foreach ( $this->procs as $pid => $proc) { + $this->cleanUpProc($proc,$pid); + } + } + } + + /** + * build the command to launch sub process + * @return string command + */ + protected function buildCommand() + { + // using setsid to stop signal propagation to allow background processes to finish even if we receive a signal + return "setsid " . PHP_BINARY . " {$this->scriptPath} {$this->getCommand()}"; + } + + /** + * check if process is still running, if not get stdout/error and clean up process + * @param Process $process the background process + * @param int $pid process pid + * @return void + */ + public function cleanUpProc(Process $process, $pid) + { + $process->checkTimeout(); + + if ( !$process->isRunning() ) { + + $this->stdout('Cleanning up ' . $pid . PHP_EOL); + + $process->stop(); + + $out = $process->getOutput(); + $err = $process->getErrorOutput(); + + if ($process->isSuccessful()) { + $this->stdout('Success' . PHP_EOL); + + // we already display output as it is piped in in single threaded mode + if ( !$this->getIsSingleThreaded() ) { + $this->stdout($out . PHP_EOL); + $this->stdout($err . PHP_EOL); + } + + } else { + $this->stdout('Error' . PHP_EOL); + $this->stderr($out . PHP_EOL); + $this->stderr($err . PHP_EOL); + Yii::warning($out, 'yii2queue'); + Yii::warning($err, 'yii2queue'); + } + + unset($this->procs[$pid]); + } + } + + /** + * wait for all to finish + * @return void + */ + public function cleanUpAll( $signal = null, $propagate = false ) + { + $this->stdout('Cleaning processes: ' . $this->getOpenedProcsCount() . PHP_EOL); + + while ( $this->getOpenedProcsCount() ) { + + foreach ( $this->procs as $pid => $process ) { + + if ( $process->isRunning() + && $process->getPid() + && $propagate + && $signal ) + { + $this->stdout(sprintf('Sending signal %d to pid %d', $signal, $pid) . PHP_EOL); + + try { + $process->signal($signal); + } catch ( \Symfony\Component\Process\Exception\LogicException $e ) { + $this->stdout('Process was already stopped.'); + } + } + + $this->cleanUpProc($process, $pid); + } + + sleep(1); + } + } + + /** + * Initialize signal handler for the process. + * @return void + */ + protected function initSignalHandler() + { + $signalHandler = function ($signal) { + switch ($signal) { + case SIGTERM: + // wait for procs to finish then quit + $this->stderr('Caught SIGTERM, cleaning up'.PHP_EOL); + $this->cleanUpAll($signal, $this->getPropagateSignals()); + Yii::error('Caught SIGTERM', 'yii2queue'); + exit; + case SIGINT: + // wait for procs to finish then quit + $this->stderr('Caught SIGINT, cleaning up'.PHP_EOL); + $this->cleanUpAll($signal, $this->getPropagateSignals()); + Yii::error('Caught SIGINT', 'yii2queue'); + exit; + } + }; + pcntl_signal(SIGTERM, $signalHandler); + pcntl_signal(SIGINT, $signalHandler); + } + + /** + * get the idle timeout to be passed on to Process + * @return ?int idle timeout + */ + protected function getIdleTimeout() + { + return $this->getQueue()->idleTimeout; + } + + /** + * get the timeout from the queue, seconds after which the process will timeout + * @return ?int timeout in seconds + */ + protected function getTimeout() + { + return $this->getQueue()->timeout; + } + + /** + * get sleep timeout to be slept after eaech process is launched + * @return ?int sleep timeout in seconds + */ + protected function getSleepTimeout() + { + return $this->getQueue()->sleepTimeout; + } + + /** + * check if we are running in single thread mode + * @return bool + */ + protected function getIsSingleThreaded() + { + return $this->getMaxProcesses() === 1; + } + + /** + * retrieve the size of the queue + * @return int size + */ + protected function getQueueSize() + { + return intval($this->getQueue()->getSize()); + } + + /** + * retrieve number of opened processes + * @return int + */ + protected function getOpenedProcsCount() + { + return is_array($this->procs) ? count($this->procs) : 0; + } + + /** + * retrieve max processes from the queue + * @return int maximum number of concurent processes + */ + protected function getMaxProcesses() + { + return $this->getQueue()->maxProcesses; + } + + /** + * check if we can open new ones + * @return bool + */ + protected function getCanOpenNew() + { + return $this->getOpenedProcsCount() < $this->getMaxProcesses(); + } + + /** + * if we should propagate signals to children + * @return bool + */ + protected function getPropagateSignals() + { + return $this->getQueue()->propagateSignals; + } + + /** + * get the command to launch the process + * @return string command + */ + protected function getCommand() + { + return $this->getQueue()->command; + } + + /** + * @inheritdoc + */ + protected function stdout($string) + { + return Console::stdout($string); + } + + /** + * @inheritdoc + */ + protected function stderr($string) + { + return Console::stderr($string); + } +} diff --git a/src/Queue.php b/src/Queue.php index 4d9bfa1..d22a949 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -22,7 +22,7 @@ abstract class Queue extends \yii\base\Component * Json serializer. */ const SERIALIZER_JSON = 'json'; - + /** * PHP serializer. */ @@ -37,17 +37,17 @@ abstract class Queue extends \yii\base\Component * Event executed before a job is posted to the queue. */ const EVENT_AFTER_POST = 'afterPost'; - + /** * Event executed before a job is being fetched from the queue. */ const EVENT_BEFORE_FETCH = 'beforeFetch'; - + /** * Event executed after a job is being fetched from the queue. */ const EVENT_AFTER_FETCH = 'afterFetch'; - + /** * Event executed before a job is being deleted from the queue. */ @@ -57,7 +57,7 @@ abstract class Queue extends \yii\base\Component * Event executed after a job is being deleted from the queue. */ const EVENT_AFTER_DELETE = 'afterDelete'; - + /** * Event executed before a job is being released from the queue. */ @@ -67,7 +67,7 @@ abstract class Queue extends \yii\base\Component * Event executed after a job is being released from the queue. */ const EVENT_AFTER_RELEASE = 'afterRelease'; - + /** * Event executed before a job is being executed. */ @@ -101,13 +101,13 @@ abstract class Queue extends \yii\base\Component * @var \yii\base\Module */ public $module; - + /** * Choose the serializer. * @var string */ public $serializer = 'json'; - + /** * This will release automatically on execution failure. i.e. when * the `run` method returns false or catch exception. @@ -122,6 +122,60 @@ abstract class Queue extends \yii\base\Component */ public $waitSecondsIfNoQueue = 0; + /** + * @var int multi threading + */ + public $maxProcesses = 1; + + /** + * @var int interval to check if processes have finished + */ + public $waitSecondsIfNoProcesses = 5; + + /** + * @var bool whether we want a kill signal to propagate to threads + * or if we want them to finish on their own, treat carefully + */ + public $propagateSignals = false; + + /** + * @var int idle timeout on each single process + * e.g. timeout process after x seconds if there is no output + */ + public $idleTimeout = null; + + /** + * @var int $sleepTimeout sleep timeout to be slept after each process is launched + * forced to a minimum of one in order to register the process after its run + */ + public $sleepTimeout = 1; + + /** + * @var int $timeout seconds after which the process will timeout + */ + public $timeout = null; + + /** + * @var obj ProcessRunner the process runner dependency + */ + public $processRunner = null; + + /** + * @var str $command command to run single jobs process + */ + public $command = 'queue/run'; + + /** + * register the process runner depndency + * @param ProcessRunner $processRunner + * @param array $config additional params + */ + public function __construct(ProcessRunner $processRunner, array $config = []) + { + parent::__construct($config); + $this->processRunner = $processRunner->setQueue($this); + } + /** * Initializes the module. * @return void @@ -145,16 +199,16 @@ public function post(Job $job) if (!$beforeEvent->isValid) { return false; } - + $return = $this->postJob($job); if (!$return) { return false; } - + $this->trigger(self::EVENT_AFTER_POST, new Event(['job' => $job])); return true; } - + /** * Post new job to the queue. Override this for queue implementation. * @@ -172,16 +226,16 @@ abstract protected function postJob(Job $job); public function fetch() { $this->trigger(self::EVENT_BEFORE_FETCH); - + $job = $this->fetchJob(); if ($job == false) { return false; } - + $this->trigger(self::EVENT_AFTER_FETCH, new Event(['job' => $job])); return $job; } - + /** * Return next job from the queue. Override this for queue implementation. * @return Job|boolean the job or false if not found. @@ -233,9 +287,9 @@ public function run(Job $job) 500 ); } - + $this->trigger(self::EVENT_AFTER_RUN, new Event(['job' => $job, 'returnValue' => $retval])); - + if ($retval !== false) { \Yii::info("Deleting job #: {$job->id}", 'yii2queue'); $this->delete($job); @@ -257,16 +311,16 @@ public function delete(Job $job) if (!$beforeEvent->isValid) { return false; } - + $return = $this->deleteJob($job); if (!$return) { return false; } - + $this->trigger(self::EVENT_AFTER_DELETE, new Event(['job' => $job])); return true; } - + /** * Delete the job. Override this for the queue implementation. * @@ -274,7 +328,7 @@ public function delete(Job $job) * @return boolean whether the operation succeed. */ abstract protected function deleteJob(Job $job); - + /** * Release the job. This will trigger event EVENT_BEFORE_RELEASE and * EVENT_AFTER_RELEASE. @@ -288,16 +342,16 @@ public function release(Job $job) if (!$beforeEvent->isValid) { return false; } - + $return = $this->releaseJob($job); if (!$return) { return false; } - + $this->trigger(self::EVENT_AFTER_RELEASE, new Event(['job' => $job])); return true; } - + /** * Release the job. Override this for the queue implementation. * @@ -334,7 +388,7 @@ protected function deserialize($message) $obj->header['signature'] = $signature; return $obj; } - + /** * @param array $array The message to be deserialize. * @return array @@ -355,7 +409,7 @@ protected function deserializeMessage($array) } return $data; } - + /** * Pack job so that it can be send. * @@ -397,13 +451,13 @@ protected function serializeMessage($array) } return $data; } - + /** * Returns the number of queue size. * @return integer */ abstract public function getSize(); - + /** * Purge the whole queue. * @return boolean diff --git a/tests/ProcessRunnerTest.php b/tests/ProcessRunnerTest.php new file mode 100644 index 0000000..2e00447 --- /dev/null +++ b/tests/ProcessRunnerTest.php @@ -0,0 +1,76 @@ + /dev/null'; + public function testRunnerRunSingle() + { + $runner = $this->getRunner(); + $this->assertEquals($runner->getIterator()->count(),0); + + $runner->runProcess(self::CMD); + + $this->assertEquals($runner->getIterator()->count(),0); + + $runner->cleanUpAll(); + + $this->assertEquals($runner->getIterator()->count(),0); + } + public function testRunnerRunMultiple() + { + $runner = $this->getRunner(2); + + $this->assertEquals($runner->getIterator()->count(),0); + + $runner->runProcess(self::CMD); + $runner->runProcess(self::CMD); + + $this->assertEquals($runner->getIterator()->count(),2); + + $runner->cleanUpAll(); + $this->assertEquals($runner->getIterator()->count(),0); + } + public function testRunnerCleanUpProcess() + { + $runner = $this->getRunner(); + + $process = new Process(self::CMD); + $process->run(); + + $runner->addProcess($process); + $this->assertEquals($runner->getIterator()->count(),1); + + $runner->cleanUpProc($process, $process->getPid()); + + $this->assertEquals($runner->getIterator()->count(),0); + } + public function testRunnerPropagateSignals() + { + $runner = $this->getRunner(2); + $start = time(); + + $runner->runProcess('setsid php -r "sleep(10);" > /dev/null'); + $this->assertEquals($runner->getIterator()->count(),1); + $runner->runProcess('setsid php -r "sleep(10);" > /dev/null'); + $this->assertEquals($runner->getIterator()->count(),2); + + $runner->cleanUpAll(SIGKILL, true); + $duration = time() - $start; + $this->assertEquals($runner->getIterator()->count(),0); + $this->assertLessThan(10, $duration); + } + protected function getRunner($proc = 1) + { + $queue = Yii::createObject([ + 'class' => '\UrbanIndo\Yii2\Queue\Queues\MemoryQueue', + 'maxProcesses' => $proc, + ]); + $runner = new ProcessRunner(); + $runner->setQueue($queue); + + return $runner; + } +}