Skip to content

Commit

Permalink
implementing multi threading, pushing processes into background (#72)
Browse files Browse the repository at this point in the history
* implementing multi threading, pushing processes into background

* added test for process runner, changed visibility of methods for testing

* reverted to older phpunit implementation

* Fix travis for PHP7 and dist trusty for HHHVM. (#74)

* Fix travis for PHP7 and dist trusty for HHHVM.

* Cache composer.

* Fix error "Only variables should be passed by reference"

* Fix PHPUnit reports

* Add codecov.

* implementing multi threading, pushing processes into background

* added test for process runner, changed visibility of methods for testing

* reverted to older phpunit implementation

* fixed issue with losing pid when process is finished but not cleaned up

* making sure the process is actually running and has pid before trying to send signal

* catching logical process exception
  • Loading branch information
marekpetras authored and petrabarus committed Sep 24, 2017
1 parent 14a258b commit 9007970
Show file tree
Hide file tree
Showing 4 changed files with 622 additions and 121 deletions.
109 changes: 15 additions & 94 deletions src/Console/Controller.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* 'queue' => 'UrbanIndo\Yii2\Queue\Console\QueueController'
* ],
* ];
*
*
* OR
*
* return [
Expand All @@ -51,7 +51,7 @@ class Controller extends \yii\console\Controller
* @var string|array|Queue the name of the queue component. default to 'queue'.
*/
public $queue = 'queue';

/**
* @var integer sleep timeout for infinite loop in second
*/
Expand All @@ -61,23 +61,24 @@ class Controller extends \yii\console\Controller
* @var string the name of the command.
*/
private $_name = 'queue';

/**
* @return void
*/
public function init()
{
parent::init();

if (!is_numeric($this->sleepTimeout)) {
throw new InvalidParamException('($sleepTimeout) must be an number');
}

if ($this->sleepTimeout < 0) {
throw new InvalidParamException('($sleepTimeout) must be greater or equal than 0');
}

$this->queue = \yii\di\Instance::ensure($this->queue, Queue::className());
$this->queue->processRunner->setScriptPath($this->getScriptPath());
}

/**
Expand Down Expand Up @@ -110,101 +111,21 @@ protected function getScriptPath()
* The format for each element is 'KEY=VAL'.
* @return void
*/
public function actionListen($cwd = null, $timeout = null, array $env = null)
{
$this->stdout("Listening to queue...\n");
$this->initSignalHandler();
$command = PHP_BINARY . " {$this->getScriptPath()} {$this->_name}/run";
declare(ticks = 1);

$queueSize = intval($this->queue->getSize());

while (true) {

if($this->queue->waitSecondsIfNoQueue > 0){
$this->stdout("Queue {$queueSize} \n");
if($queueSize == 0){
$this->stdout("NO Queue, Waiting {$this->queue->waitSecondsIfNoQueue}s to save cpu... \n");
sleep($this->queue->waitSecondsIfNoQueue);
$queueSize = $this->queue->getSize();
}else{
$queueSize--;
if($queueSize<=0){
$queueSize = $this->queue->getSize();
}
}
}

$this->stdout("Running new process...\n");
$this->runQueueFetching($command, $cwd, $timeout, $env);

if ($this->sleepTimeout > 0) {
sleep($this->sleepTimeout);
}
}
$this->stdout("Exiting...\n");
}

/**
* Run the queue fetching process.
* @param string $command The command.
* @param string $cwd The working directory.
* @param integer $timeout The timeout.
* @param array $env The environment to be passed.
* @return void
*/
protected function runQueueFetching(
$command,
public function actionListen(
$cwd = null,
$timeout = null,
$timeout = null, // moved to queue config
array $env = null
) {
$process = new \Symfony\Component\Process\Process(
$command,
isset($cwd) ? $cwd : getcwd(),
$env,
null,
$timeout
);
$process->setTimeout($timeout);
$process->setIdleTimeout(null);
$process->run();

$out = $process->getOutput();
$err = $process->getErrorOutput();
$this->stdout("Listening to queue...\n");

if ($process->isSuccessful()) {
$this->stdout($out . PHP_EOL);
$this->stdout($err . PHP_EOL);
} else {
$this->stderr($out . PHP_EOL);
$this->stderr($err . PHP_EOL);
Yii::warning($out, 'yii2queue');
Yii::warning($err, 'yii2queue');
try {
$this->queue->processRunner->listen($cwd,$timeout,$env);
}
catch (Exception $e) {
Yii::error($e->getMessage(),__METHOD__);
}

}

/**
* Initialize signal handler for the process.
* @return void
*/
protected function initSignalHandler()
{
$signalHandler = function ($signal) {
switch ($signal) {
case SIGTERM:
$this->stderr('Caught SIGTERM');
Yii::error('Caught SIGTERM', 'yii2queue');
exit;
case SIGINT:
$this->stderr('Caught SIGINT');
Yii::error('Caught SIGINT', 'yii2queue');
exit;
}
};
pcntl_signal(SIGTERM, $signalHandler);
pcntl_signal(SIGINT, $signalHandler);
$this->stdout("Exiting...\n");
}

/**
Expand Down
Loading

0 comments on commit 9007970

Please sign in to comment.