Skip to content

Commit

Permalink
feature #51805 [Scheduler] pre_run and post_run events (alli83)
Browse files Browse the repository at this point in the history
This PR was merged into the 6.4 branch.

Discussion
----------

[Scheduler] pre_run and post_run events

| Q             | A
| ------------- | ---
| Branch?       | 6.4
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | #49803 (comment)
| License       | MIT

Based on #49803 `@kbond`  and taking into account #51553

The aim of this PR is to be able to act on the accumulated messages 'if something happens' and to be able to recalculate the heap via events (currently pre_run and post_run).
The aim is to have access to
- the  the schedule and therefore add/cancel a certain type of message.
- MessageContexte (e.g. access the id)
- The message itself

This PR would complement `@Jeroeny` #51553 PR by enabling action via events.

Commits
-------

20fd21a [Scheduler] add PRE_RUN and POST_RUN events
  • Loading branch information
fabpot committed Oct 16, 2023
2 parents c6a9dde + 20fd21a commit 50662d0
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
use Symfony\Component\Scheduler\Messenger\ServiceCallMessageHandler;

Expand All @@ -27,5 +28,11 @@
service('clock'),
])
->tag('messenger.transport_factory')
->set('scheduler.event_listener', DispatchSchedulerEventListener::class)
->args([
tagged_locator('scheduler.schedule_provider', 'name'),
service('event_dispatcher'),
])
->tag('kernel.event_subscriber')
;
};
2 changes: 2 additions & 0 deletions src/Symfony/Component/Scheduler/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ CHANGELOG
* Add `ScheduledStamp` to `RedispatchMessage`
* Allow modifying Schedule instances at runtime
* Add `MessageProviderInterface` to trigger unique messages at runtime
* Add `PreRunEvent` and `PostRunEvent` events
* Add `DispatchSchedulerEventListener`

6.3
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class AddScheduleMessengerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (!$container->has('event_dispatcher')) {
$container->removeDefinition('scheduler.event_listener');
}

$receivers = [];
foreach ($container->findTaggedServiceIds('messenger.receiver') as $tags) {
$receivers[$tags[0]['alias']] = true;
Expand Down
40 changes: 40 additions & 0 deletions src/Symfony/Component/Scheduler/Event/PostRunEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Scheduler\Event;

use Symfony\Component\Scheduler\Generator\MessageContext;
use Symfony\Component\Scheduler\ScheduleProviderInterface;

class PostRunEvent
{
public function __construct(
private readonly ScheduleProviderInterface $schedule,
private readonly MessageContext $messageContext,
private readonly object $message,
) {
}

public function getMessageContext(): MessageContext
{
return $this->messageContext;
}

public function getSchedule(): ScheduleProviderInterface
{
return $this->schedule;
}

public function getMessage(): object
{
return $this->message;
}
}
51 changes: 51 additions & 0 deletions src/Symfony/Component/Scheduler/Event/PreRunEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Scheduler\Event;

use Symfony\Component\Scheduler\Generator\MessageContext;
use Symfony\Component\Scheduler\ScheduleProviderInterface;

class PreRunEvent
{
private bool $shouldCancel = false;

public function __construct(
private readonly ScheduleProviderInterface $schedule,
private readonly MessageContext $messageContext,
private readonly object $message,
) {
}

public function getMessageContext(): MessageContext
{
return $this->messageContext;
}

public function getSchedule(): ScheduleProviderInterface
{
return $this->schedule;
}

public function getMessage(): object
{
return $this->message;
}

public function shouldCancel(bool $shouldCancel = null): bool
{
if (null !== $shouldCancel) {
$this->shouldCancel = $shouldCancel;
}

return $this->shouldCancel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Scheduler\EventListener;

use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;

class DispatchSchedulerEventListener implements EventSubscriberInterface
{
public function __construct(
private readonly ContainerInterface $scheduleProviderLocator,
private readonly EventDispatcherInterface $eventDispatcher,
) {
}

public function onMessageHandled(WorkerMessageHandledEvent $event): void
{
$envelope = $event->getEnvelope();
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
return;
}

if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
return;
}

$this->eventDispatcher->dispatch(new PostRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage()));
}

public function onMessageReceived(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
return;
}

if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
return;
}

$preRunEvent = new PreRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage());

$this->eventDispatcher->dispatch($preRunEvent);

if ($preRunEvent->shouldCancel()) {
$event->shouldHandle(false);
}
}

public static function getSubscribedEvents(): array
{
return [
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
WorkerMessageHandledEvent::class => ['onMessageHandled'],
];
}
}
14 changes: 7 additions & 7 deletions src/Symfony/Component/Scheduler/Generator/MessageGenerator.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public function getMessages(): \Generator
$checkpoint->release($now, $this->waitUntil);
}

public function getSchedule(): Schedule
{
return $this->schedule ??= $this->scheduleProvider->getSchedule();
}

private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
{
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
Expand All @@ -101,7 +106,7 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):

$heap = new TriggerHeap($time);

foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
foreach ($this->getSchedule()->getRecurringMessages() as $index => $recurringMessage) {
$trigger = $recurringMessage->getTrigger();

if ($trigger instanceof StatefulTriggerInterface) {
Expand All @@ -118,13 +123,8 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
return $this->triggerHeap = $heap;
}

private function schedule(): Schedule
{
return $this->schedule ??= $this->scheduleProvider->getSchedule();
}

private function checkpoint(): Checkpoint
{
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState());
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->getSchedule()->getLock(), $this->getSchedule()->getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope
{
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
}

public function getMessageGenerator(): MessageGeneratorInterface
{
return $this->messageGenerator;
}
}
37 changes: 35 additions & 2 deletions src/Symfony/Component/Scheduler/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,29 @@

namespace Symfony\Component\Scheduler;

use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\Exception\LogicException;
use Symfony\Contracts\Cache\CacheInterface;

final class Schedule implements ScheduleProviderInterface
{
public function __construct(
private readonly ?EventDispatcherInterface $dispatcher = null,
) {
}

/** @var array<string,RecurringMessage> */
private array $messages = [];
private ?LockInterface $lock = null;
private ?CacheInterface $state = null;
private bool $shouldRestart = false;

public static function with(RecurringMessage $message, RecurringMessage ...$messages): static
public function with(RecurringMessage $message, RecurringMessage ...$messages): static
{
return static::doAdd(new self(), $message, ...$messages);
return static::doAdd(new self($this->dispatcher), $message, ...$messages);
}

/**
Expand Down Expand Up @@ -62,6 +70,17 @@ public function remove(RecurringMessage $message): static
return $this;
}

/**
* @return $this
*/
public function removeById(string $id): static
{
unset($this->messages[$id]);
$this->setRestart(true);

return $this;
}

/**
* @return $this
*/
Expand Down Expand Up @@ -119,6 +138,20 @@ public function getSchedule(): static
return $this;
}

public function before(callable $listener, int $priority = 0): static
{
$this->dispatcher->addListener(PreRunEvent::class, $listener, $priority);

return $this;
}

public function after(callable $listener, int $priority = 0): static
{
$this->dispatcher->addListener(PostRunEvent::class, $listener, $priority);

return $this;
}

public function shouldRestart(): bool
{
return $this->shouldRestart;
Expand Down
22 changes: 21 additions & 1 deletion src/Symfony/Component/Scheduler/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@

namespace Symfony\Component\Scheduler;

use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Clock\Clock;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\Generator\MessageGenerator;

final class Scheduler
Expand All @@ -31,6 +34,7 @@ public function __construct(
private readonly array $handlers,
array $schedules,
private readonly ClockInterface $clock = new Clock(),
private readonly ?EventDispatcherInterface $dispatcher = null,
) {
foreach ($schedules as $schedule) {
$this->addSchedule($schedule);
Expand Down Expand Up @@ -62,9 +66,25 @@ public function run(array $options = []): void

$ran = false;
foreach ($this->generators as $generator) {
foreach ($generator->getMessages() as $message) {
foreach ($generator->getMessages() as $context => $message) {
if (!$this->dispatcher) {
$this->handlers[$message::class]($message);
$ran = true;

continue;
}

$preRunEvent = new PreRunEvent($generator->getSchedule(), $context, $message);
$this->dispatcher->dispatch($preRunEvent);

if ($preRunEvent->shouldCancel()) {
continue;
}

$this->handlers[$message::class]($message);
$ran = true;

$this->dispatcher->dispatch(new PostRunEvent($generator->getSchedule(), $context, $message));
}
}

Expand Down

0 comments on commit 50662d0

Please sign in to comment.