Skip to content

Commit

Permalink
Resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
bubba-h57 committed May 20, 2018
2 parents 9a75ae9 + c49283b commit c041ddd
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 3 deletions.
6 changes: 3 additions & 3 deletions config/tunneler.php
Expand Up @@ -7,14 +7,14 @@
'bash_path' => env('TUNNELER_BASH_PATH', 'bash'),
'ssh_path' => env('TUNNELER_SSH_PATH', 'ssh'),
'nohup_path' => env('TUNNELER_NOHUP_PATH', 'nohup'),

'local_address' => env('TUNNELER_LOCAL_ADDRESS', '127.0.0.1'),
'local_port' => env('TUNNELER_LOCAL_PORT'),
'identity_file' => env('TUNNELER_IDENTITY_FILE'),

'bind_address' => env('TUNNELER_BIND_ADDRESS', '127.0.0.1'),
'bind_port' => env('TUNNELER_BIND_PORT'),

'user' => env('TUNNELER_USER'),
'hostname' => env('TUNNELER_HOSTNAME'),
'port' => env('TUNNELER_PORT'),
Expand Down
152 changes: 152 additions & 0 deletions src/Console/DaemonCommand.php
@@ -0,0 +1,152 @@
<?php namespace STS\Supervisor\Console;

use Illuminate\Console\Command;
use STS\Supervisor\Daemon\Listener;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;

class DaemonCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:daemon';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Listens to a given queue and gracefully handles PCNTL Signals';

/**
* The queue listener instance.
*
* @var \Illuminate\Queue\Listener
*/
protected $listener;

/**
* DaemonCommand constructor.
*
* @param Listener $listener
*/
public function __construct(Listener $listener)
{
parent::__construct();

$this->listener = $listener;
}

/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$this->setListenerOptions();

$delay = $this->input->getOption('delay');

// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->input->getOption('memory');

$connection = $this->input->getArgument('connection');

$timeout = $this->input->getOption('timeout');

// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);

$this->listener->listen(
$connection, $queue, $delay, $memory, $timeout
);
}

/**
* Get the name of the queue connection to listen on.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
if (is_null($connection)) {
$connection = $this->laravel['config']['queue.default'];
}

$queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default');

return $this->input->getOption('queue') ?: $queue;
}

/**
* Set the options on the queue listener.
*
* @return void
*/
protected function setListenerOptions()
{
$this->listener->setEnvironment($this->laravel->environment());

$this->listener->setSleep($this->option('sleep'));

$this->listener->setMaxTries($this->option('tries'));

$this->listener->setOutputHandler(function ($type, $line) {
switch ($type){
case 'warn':
$this->warn($line);
break;
case 'info':
$this->info($line);
break;
case 'out':
default:
$this->output->write($line);
}

});
}

/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection'],
];
}

/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null],

['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],

['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],

['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60],

['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3],

['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}
116 changes: 116 additions & 0 deletions src/Daemon/Listener.php
@@ -0,0 +1,116 @@
<?php namespace STS\Supervisor\Daemon;

use Illuminate\Queue\Listener as QueueListener;
use Symfony\Component\Process\Process;

pcntl_async_signals(true);

class Listener extends QueueListener{

/**
* Set to true if we catch a signal and need to
* stop after we finish what we are doing.
*
* @var bool
*/
protected $stopGracefully = false;


/**
* Create a new queue listener.
*
* @param string $commandPath
*/
public function __construct($commandPath)
{
if (!function_exists('pcntl_signal')){
throw new \RuntimeException('You must have PCNTL functions to use this Listener');
}
parent::__construct($commandPath);

// Install the signal handler
$this->setSignalHandler(SIGHUP); /* Hangup (POSIX). */
$this->setSignalHandler(SIGINT); /* Interrupt (ANSI). */
$this->setSignalHandler(SIGQUIT); /* Quit (POSIX). */
$this->setSignalHandler(SIGABRT); /* Abort (ANSI). */
$this->setSignalHandler(SIGTERM); /* Termination (ANSI). */
$this->setSignalHandler(SIGTSTP); /* Keyboard stop (POSIX). */
}

/**
* Sets the signal to be handled by either the closure or the built in
* signal handler.
*
* @param int $signal
* @param callable|null $closure
*
* @return bool
*/
public function setSignalHandler(int $signal, callable $closure = null){
if (empty($closure)){
return pcntl_signal($signal, array($this, 'sigHandler'));
}
return pcntl_signal($signal, $closure);
}

/**
* Built in Signal Handler.
* @param int $signo
*/
public function sigHandler( int $signo ){
$this->handleWorkerOutput('warn', sprintf("Signal %d Caught, asking the daemon to stop gracefully.", $signo));
$this->stopGracefully = true;
}

/**
* Log that we are done and exiting.
*/
public function gracefulStop(){
$this->handleWorkerOutput('warn', "Work done, exiting now.");
$this->stop();
}
/**
* Run the given process.
*
* @param Process $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
try {
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});
}catch (\Exception $e){
dd($e);
}

// If we caught a signal and need to stop gracefully, this is the place to
// do it.
pcntl_signal_dispatch();
if ($this->stopGracefully){
$this->gracefulStop();
}
// Once we have run the job we'll go check if the memory limit has been
// exceeded for the script. If it has, we will kill this script so a
// process manager will restart this with a clean slate of memory.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}

/**
* Handle output from the worker process.
*
* @param string $type
* @param string $line
* @return void
*/
protected function handleWorkerOutput($type, $line)
{
if (isset($this->outputHandler)) {
call_user_func($this->outputHandler, $type, $line);
}
}
}
65 changes: 65 additions & 0 deletions src/DaemonServiceProvider.php
@@ -0,0 +1,65 @@
<?php namespace STS\Supervisor;

use Illuminate\Support\ServiceProvider;
use STS\Supervisor\Console\DaemonCommand;
use STS\Supervisor\Daemon\Listener;

class DaemonServiceProvider extends ServiceProvider {
/**
* Indicates if loading of the provider is deferred.
*
* @var bool
*/
protected $defer = true;

/**
* Register the service provider.
*
* @return void
*/
public function register()
{

$this->registerListener();
}

/**
* Register the queue listener.
*
* @return void
*/
protected function registerListener()
{
$this->registerListenCommand();

$this->app->singleton('queue.daemon.listener', function ($app) {
return new Listener($app->basePath());
});
}

/**
* Register the queue listener console command.
*
* @return void
*/
protected function registerListenCommand()
{
$this->app->singleton('command.queue.daemon', function ($app) {
return new DaemonCommand($app['queue.daemon.listener']);
});

$this->commands('command.queue.daemon');
}

/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'queue.supervisor.listener', 'command.queue.daemon'
];
}
}

0 comments on commit c041ddd

Please sign in to comment.