diff --git a/config/tunneler.php b/config/tunneler.php index f4b9555..77addbf 100644 --- a/config/tunneler.php +++ b/config/tunneler.php @@ -7,14 +7,14 @@ 'bash_path' => env('TUNNELER_BASH_PATH', 'bash'), 'ssh_path' => env('TUNNELER_SSH_PATH', 'ssh'), 'nohup_path' => env('TUNNELER_NOHUP_PATH', 'nohup'), - + 'local_address' => env('TUNNELER_LOCAL_ADDRESS', '127.0.0.1'), 'local_port' => env('TUNNELER_LOCAL_PORT'), 'identity_file' => env('TUNNELER_IDENTITY_FILE'), - + 'bind_address' => env('TUNNELER_BIND_ADDRESS', '127.0.0.1'), 'bind_port' => env('TUNNELER_BIND_PORT'), - + 'user' => env('TUNNELER_USER'), 'hostname' => env('TUNNELER_HOSTNAME'), 'port' => env('TUNNELER_PORT'), diff --git a/src/Console/DaemonCommand.php b/src/Console/DaemonCommand.php new file mode 100644 index 0000000..50a8a8a --- /dev/null +++ b/src/Console/DaemonCommand.php @@ -0,0 +1,152 @@ +listener = $listener; + } + + /** + * Execute the console command. + * + * @return void + */ + public function fire() + { + $this->setListenerOptions(); + + $delay = $this->input->getOption('delay'); + + // The memory limit is the amount of memory we will allow the script to occupy + // before killing it and letting a process manager restart it for us, which + // is to protect us against any memory leaks that will be in the scripts. + $memory = $this->input->getOption('memory'); + + $connection = $this->input->getArgument('connection'); + + $timeout = $this->input->getOption('timeout'); + + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queue = $this->getQueue($connection); + + $this->listener->listen( + $connection, $queue, $delay, $memory, $timeout + ); + } + + /** + * Get the name of the queue connection to listen on. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + if (is_null($connection)) { + $connection = $this->laravel['config']['queue.default']; + } + + $queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default'); + + return $this->input->getOption('queue') ?: $queue; + } + + /** + * Set the options on the queue listener. + * + * @return void + */ + protected function setListenerOptions() + { + $this->listener->setEnvironment($this->laravel->environment()); + + $this->listener->setSleep($this->option('sleep')); + + $this->listener->setMaxTries($this->option('tries')); + + $this->listener->setOutputHandler(function ($type, $line) { + switch ($type){ + case 'warn': + $this->warn($line); + break; + case 'info': + $this->info($line); + break; + case 'out': + default: + $this->output->write($line); + } + + }); + } + + /** + * Get the console command arguments. + * + * @return array + */ + protected function getArguments() + { + return [ + ['connection', InputArgument::OPTIONAL, 'The name of connection'], + ]; + } + + /** + * Get the console command options. + * + * @return array + */ + protected function getOptions() + { + return [ + ['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null], + + ['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0], + + ['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128], + + ['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60], + + ['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3], + + ['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0], + ]; + } +} \ No newline at end of file diff --git a/src/Daemon/Listener.php b/src/Daemon/Listener.php new file mode 100644 index 0000000..36e721f --- /dev/null +++ b/src/Daemon/Listener.php @@ -0,0 +1,116 @@ +setSignalHandler(SIGHUP); /* Hangup (POSIX). */ + $this->setSignalHandler(SIGINT); /* Interrupt (ANSI). */ + $this->setSignalHandler(SIGQUIT); /* Quit (POSIX). */ + $this->setSignalHandler(SIGABRT); /* Abort (ANSI). */ + $this->setSignalHandler(SIGTERM); /* Termination (ANSI). */ + $this->setSignalHandler(SIGTSTP); /* Keyboard stop (POSIX). */ + } + + /** + * Sets the signal to be handled by either the closure or the built in + * signal handler. + * + * @param int $signal + * @param callable|null $closure + * + * @return bool + */ + public function setSignalHandler(int $signal, callable $closure = null){ + if (empty($closure)){ + return pcntl_signal($signal, array($this, 'sigHandler')); + } + return pcntl_signal($signal, $closure); + } + + /** + * Built in Signal Handler. + * @param int $signo + */ + public function sigHandler( int $signo ){ + $this->handleWorkerOutput('warn', sprintf("Signal %d Caught, asking the daemon to stop gracefully.", $signo)); + $this->stopGracefully = true; + } + + /** + * Log that we are done and exiting. + */ + public function gracefulStop(){ + $this->handleWorkerOutput('warn', "Work done, exiting now."); + $this->stop(); + } + /** + * Run the given process. + * + * @param Process $process + * @param int $memory + * @return void + */ + public function runProcess(Process $process, $memory) + { + try { + $process->run(function ($type, $line) { + $this->handleWorkerOutput($type, $line); + }); + }catch (\Exception $e){ + dd($e); + } + + // If we caught a signal and need to stop gracefully, this is the place to + // do it. + pcntl_signal_dispatch(); + if ($this->stopGracefully){ + $this->gracefulStop(); + } + // Once we have run the job we'll go check if the memory limit has been + // exceeded for the script. If it has, we will kill this script so a + // process manager will restart this with a clean slate of memory. + if ($this->memoryExceeded($memory)) { + $this->stop(); + } + } + + /** + * Handle output from the worker process. + * + * @param string $type + * @param string $line + * @return void + */ + protected function handleWorkerOutput($type, $line) + { + if (isset($this->outputHandler)) { + call_user_func($this->outputHandler, $type, $line); + } + } +} diff --git a/src/DaemonServiceProvider.php b/src/DaemonServiceProvider.php new file mode 100644 index 0000000..c4c54e9 --- /dev/null +++ b/src/DaemonServiceProvider.php @@ -0,0 +1,65 @@ +registerListener(); + } + + /** + * Register the queue listener. + * + * @return void + */ + protected function registerListener() + { + $this->registerListenCommand(); + + $this->app->singleton('queue.daemon.listener', function ($app) { + return new Listener($app->basePath()); + }); + } + + /** + * Register the queue listener console command. + * + * @return void + */ + protected function registerListenCommand() + { + $this->app->singleton('command.queue.daemon', function ($app) { + return new DaemonCommand($app['queue.daemon.listener']); + }); + + $this->commands('command.queue.daemon'); + } + + /** + * Get the services provided by the provider. + * + * @return array + */ + public function provides() + { + return [ + 'queue.supervisor.listener', 'command.queue.daemon' + ]; + } +}