A driver for Queue Manager Bundle that uses runs jobs periodically.
This driver doesn't run jobs, it requires another driver to actually process jobs.
There is no runner daemon for this driver as it just plugs into other drivers. Use it by
put
ting jobs into this driver with the period
option.
composer require mcfedr/periodic-queue-driver-bundle
Include the bundle in your AppKernel
public function registerBundles()
{
$bundles = [
...
new Mcfedr\QueueManagerBundle\McfedrQueueManagerBundle(),
new Mcfedr\PeriodicQueueDriverBundle\McfedrPeriodicQueueDriverBundle(),
With this bundle installed you can setup your queue manager config similar to this:
mcfedr_queue_manager:
managers:
periodic:
driver: periodic
options:
default_manager: delay
default_manager_options: []
This will create a QueueManager
service named "mcfedr_queue_manager.periodic"
default_manager
- Default job processor, must support delayed jobs, for example Doctrine Delaydefault_manager_options
- Default options to pass to job processorput
period
- The average number of seconds between job runsmanager
- Use a different job processor for this jobmanager_options
- Options to pass to the processorsput
method
There are two commands that can be used to investigate how well spread jobs will. A simple way to visualize is to pipe into feedgnuplot.
rand-add
uses a naive implementation of just adding a random number of seconds to get the time for the next run:
./tests/console test:distribution:rand-add -v | feedgnuplot --histogram 0 --binwidth 60 --hardcopy "rand-add.png" --exit
periodic
uses the bundle implementation:
./tests/console test:distribution:periodic -v | feedgnuplot --histogram 0 --binwidth 60 --hardcopy "periodic.png" --exit
There is job_tokens argument which can help with job uniqueness.
Usage example
When put new periodic job, you get PeriodicJob object.
$periodicJob = $this->manager->put('some_service', [
'some_argument' => 'some_value',
], [
'period' => 'some_seconds',
], 'periodic');
You can do $periodicJob->getToken() of it and store the token for job execution
While execution
public function execute(array $arguments)
{
// Get the stored token
...
if ($storedToken != $arguments['job_tokens']['token']) {
throw new InvalidTokenException();
}
...
$storedToken = ($arguments['job_tokens']['next_token']);
// Save stored token for next execition.
}
Each job execution new token is generated so if some job duplication will appear only one will be executed and other jobs will fail