Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

!!! FEATURE: Event Publisher #256

Merged
merged 12 commits into from Jan 28, 2020
27 changes: 4 additions & 23 deletions Classes/Command/ProjectionCommandController.php
Expand Up @@ -32,12 +32,6 @@ class ProjectionCommandController extends CommandController
*/
protected $projectionManager;

/**
* @Flow\InjectConfiguration(package="Neos.Flow")
* @var array
*/
protected $flowSettings;

/**
* @var array in the format ['<shortIdentifier>' => '<fullIdentifier>', ...]
*/
Expand Down Expand Up @@ -78,7 +72,7 @@ public function listCommand(): void
* @see neos.eventsourcing:projection:list
* @throws StopActionException
*/
public function describeCommand($projection): void
public function describeCommand(string $projection): void
{
$projectionDto = $this->resolveProjectionOrQuit($projection);

Expand All @@ -90,12 +84,6 @@ public function describeCommand($projection): void
$this->outputLine();
$this->outputLine('<b>PROJECTOR:</b>');
$this->outputLine(' %s', [$projectionDto->getProjectorClassName()]);
$this->outputLine();

$this->outputLine('<b>HANDLED EVENT TYPES:</b>');
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
foreach ($projectionDto->getEventClassNames() as $eventClassName) {
$this->outputLine(' * %s', [$eventClassName]);
}
}

/**
Expand All @@ -111,7 +99,7 @@ public function describeCommand($projection): void
* @see neos.eventsourcing:projection:list
* @see neos.eventsourcing:projection:replayall
*/
public function replayCommand($projection, $quiet = false): void
public function replayCommand(string $projection, $quiet = false): void
{
$projectionDto = $this->resolveProjectionOrQuit($projection);

Expand All @@ -137,26 +125,19 @@ public function replayCommand($projection, $quiet = false): void
*
* This command allows you to replay all relevant events for all known projections.
*
* @param bool $onlyEmpty If specified, only projections which are currently empty will be considered
* @param bool $quiet If specified, this command won't produce any output apart from errors (useful for automation)
* @return void
* @see neos.eventsourcing:projection:replay
* @see neos.eventsourcing:projection:list
* @throws EventCouldNotBeAppliedException
*/
public function replayAllCommand($onlyEmpty = false, $quiet = false): void
public function replayAllCommand($quiet = false): void
{
if (!$quiet) {
$this->outputLine('Replaying all%s projections', [$onlyEmpty ? ' empty' : '']);
$this->outputLine('Replaying all projections');
}
$eventsCount = 0;
foreach ($this->projectionManager->getProjections() as $projection) {
if ($onlyEmpty && !$this->projectionManager->isProjectionEmpty($projection->getIdentifier())) {
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
if (!$quiet) {
$this->outputLine('Skipping non-empty projection "%s" ...', [$projection->getIdentifier()]);
}
continue;
}
if (!$quiet) {
$this->outputLine('Replaying events for projection "%s" ...', [$projection->getIdentifier()]);
$this->output->progressStart();
Expand Down
2 changes: 1 addition & 1 deletion Classes/Event/EventTypeResolver.php
Expand Up @@ -55,7 +55,7 @@ public function injectObjectManager(ObjectManagerInterface $objectManager): void
*/
public function initializeObject(): void
{
$this->mapping = self::eventTypeMapping($this->objectManager);
$this->mapping = static::eventTypeMapping($this->objectManager);
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
$this->reversedMapping = array_flip($this->mapping);
}

Expand Down
77 changes: 46 additions & 31 deletions Classes/EventListener/EventListenerInvoker.php
Expand Up @@ -18,81 +18,84 @@
use Neos\EventSourcing\EventListener\Exception\EventCouldNotBeAppliedException;
use Neos\EventSourcing\EventStore\EventEnvelope;
use Neos\EventSourcing\EventStore\EventStore;
use Neos\EventSourcing\EventStore\EventStoreFactory;
use Neos\EventSourcing\EventStore\StreamAwareEventListenerInterface;
use Neos\EventSourcing\EventStore\StreamName;
use Neos\Flow\Annotations as Flow;

/**
* @Flow\Scope("singleton")
*/
final class EventListenerInvoker
{

/**
* @Flow\Inject
* @var EventStoreFactory
* @var EventStore
*/
protected $eventStoreFactory;
private $eventStore;

/**
* @Flow\Inject
* @var EntityManagerInterface
*/
protected $entityManager;

/**
* @Flow\InjectConfiguration(path="EventListener.listeners")
* @var array
*/
protected $eventListenersConfiguration;
public function __construct(EventStore $eventStore)
{
$this->eventStore = $eventStore;
}

/**
* @param EventListenerInterface $listener
* @param \Closure $progressCallback
* @throws EventCouldNotBeAppliedException
*/
public function catchUp(EventListenerInterface $listener, \Closure $progressCallback = null): void
public function replay(EventListenerInterface $listener, \Closure $progressCallback = null): void
{
if ($listener instanceof ProvidesAppliedEventsStorageInterface) {
$appliedEventsStorage = $listener->getAppliedEventsStorage();
} elseif ($listener instanceof AppliedEventsStorageInterface) {
$appliedEventsStorage = $listener;
} else {
$appliedEventsStorage = new DoctrineAppliedEventsStorage($this->entityManager->getConnection(), \get_class($listener));
}
$appliedEventsStorage = $this->getAppliedEventsStorageForListener($listener);
$highestAppliedSequenceNumber = -1;
$appliedEventsStorage->saveHighestAppliedSequenceNumber($highestAppliedSequenceNumber);
$highestAppliedSequenceNumber = $appliedEventsStorage->reserveHighestAppliedEventSequenceNumber();

$streamName = $listener instanceof StreamAwareEventListenerInterface ? $listener::listensToStream() : StreamName::all();
$eventStore = $this->getEventStoreForEventListener($listener);
$eventStream = $eventStore->load($streamName, $highestAppliedSequenceNumber + 1);
$eventStream = $this->eventStore->load($streamName);
foreach ($eventStream as $eventEnvelope) {
try {
$this->applyEvent($listener, $eventEnvelope);
$highestAppliedSequenceNumber = $eventEnvelope->getRawEvent()->getSequenceNumber();
} catch (EventCouldNotBeAppliedException $exception) {
$appliedEventsStorage->saveHighestAppliedSequenceNumber($highestAppliedSequenceNumber);
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
throw $exception;
}
$appliedEventsStorage->saveHighestAppliedSequenceNumber($eventEnvelope->getRawEvent()->getSequenceNumber());
if ($progressCallback !== null) {
$progressCallback($eventEnvelope);
}
}
$appliedEventsStorage->saveHighestAppliedSequenceNumber($highestAppliedSequenceNumber);
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
}

/**
* @param EventListenerInterface $listener
* @return EventStore
* @param \Closure $progressCallback
* @throws EventCouldNotBeAppliedException
*/
public function getEventStoreForEventListener(EventListenerInterface $listener): EventStore
public function catchUp(EventListenerInterface $listener, \Closure $progressCallback = null): void
{
$listenerClassName = \get_class($listener);
$eventStoreIdentifier = $this->eventListenersConfiguration[$listenerClassName]['eventStore'] ?? 'default';
try {
return $this->eventStoreFactory->create($eventStoreIdentifier);
} catch (\InvalidArgumentException $exception) {
throw new \RuntimeException(sprintf('Failed to build Event Store for listener "%s": %s', $listenerClassName, $exception->getMessage()), 1570191582, $exception);
$appliedEventsStorage = $this->getAppliedEventsStorageForListener($listener);
$highestAppliedSequenceNumber = $appliedEventsStorage->reserveHighestAppliedEventSequenceNumber();
$streamName = $listener instanceof StreamAwareEventListenerInterface ? $listener::listensToStream() : StreamName::all();
$eventStream = $this->eventStore->load($streamName, $highestAppliedSequenceNumber + 1);
foreach ($eventStream as $eventEnvelope) {
try {
$this->applyEvent($listener, $eventEnvelope);
} catch (EventCouldNotBeAppliedException $exception) {
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
throw $exception;
}
$appliedEventsStorage->saveHighestAppliedSequenceNumber($eventEnvelope->getRawEvent()->getSequenceNumber());
if ($progressCallback !== null) {
$progressCallback($eventEnvelope);
}
}
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -124,4 +127,16 @@ private function applyEvent(EventListenerInterface $listener, EventEnvelope $eve
$listener->afterInvoke($eventEnvelope);
}
}

private function getAppliedEventsStorageForListener(EventListenerInterface $listener): AppliedEventsStorageInterface
{
if ($listener instanceof ProvidesAppliedEventsStorageInterface) {
$appliedEventsStorage = $listener->getAppliedEventsStorage();
} elseif ($listener instanceof AppliedEventsStorageInterface) {
$appliedEventsStorage = $listener;
} else {
$appliedEventsStorage = new DoctrineAppliedEventsStorage($this->entityManager->getConnection(), \get_class($listener));
}
return $appliedEventsStorage;
}
}
168 changes: 0 additions & 168 deletions Classes/EventListener/EventListenerLocator.php

This file was deleted.