From 3c334b0141a8f14311fb05e3a395b19f18a77cb7 Mon Sep 17 00:00:00 2001 From: prolic Date: Sun, 22 Jan 2017 17:10:42 +0800 Subject: [PATCH] we have only one aggregate defintion per microservice --- examples/Infrastructure/factories.php | 2 +- examples/register_and_change_username.php | 24 +++--- src/AbstractAggregateDefiniton.php | 20 ++--- src/Kernel.php | 94 ++++++++++++----------- tests/KernelTest.php | 81 ++++++++++--------- 5 files changed, 115 insertions(+), 106 deletions(-) diff --git a/examples/Infrastructure/factories.php b/examples/Infrastructure/factories.php index b0c3187..ef9643d 100644 --- a/examples/Infrastructure/factories.php +++ b/examples/Infrastructure/factories.php @@ -37,7 +37,7 @@ return $snapshotStore; }, - 'producer' => function (): callable { + 'dummyProducer' => function (): callable { return function (Message $message): void { }; }, diff --git a/examples/register_and_change_username.php b/examples/register_and_change_username.php index 7be2dfe..a4262db 100644 --- a/examples/register_and_change_username.php +++ b/examples/register_and_change_username.php @@ -31,30 +31,26 @@ //We could also use a container here, if dependencies grow $factories = include 'Infrastructure/factories.php'; -$commandMap = [ - RegisterUser::class => [ - 'handler' => function (array $state, Message $message) use (&$factories): AggregateResult { - return User\registerUser($state, $message, $factories['emailGuard']()); - }, - 'definition' => UserAggregateDefinition::class, - ], - ChangeUserName::class => [ - 'handler' => User\changeUserName, - 'definition' => UserAggregateDefinition::class, - ], +$handlerMap = [ + RegisterUser::class => function (array $state, Message $message) use (&$factories): AggregateResult { + return User\registerUser($state, $message, $factories['emailGuard']()); + }, + ChangeUserName::class => User\changeUserName, ]; $dispatch = Kernel\buildCommandDispatcher( $factories['eventStore'], $factories['snapshotStore'], - $commandMap, - $factories['producer'] + new UserAggregateDefinition(), + $handlerMap, + $factories['dummyProducer'] ); // uncomment to enable amqp publisher //$dispatch = Kernel\buildCommandDispatcher( // $factories['eventStore'], -// $commandMap, +// new UserAggregateDefinition(), +// $handlerMap, // $factories['amqpProducer'], // $factories['startAmqpTransaction'], // $factories['commitAmqpTransaction'] diff --git a/src/AbstractAggregateDefiniton.php b/src/AbstractAggregateDefiniton.php index 9e33800..1afcf9b 100644 --- a/src/AbstractAggregateDefiniton.php +++ b/src/AbstractAggregateDefiniton.php @@ -33,35 +33,35 @@ public function versionName(): string public function extractAggregateId(Message $message): string { - $idProperty = $this->identifierName(); + $idKey = $this->identifierName(); $payload = $message->payload(); - if (! array_key_exists($idProperty, $payload)) { + if (! array_key_exists($idKey, $payload)) { throw new RuntimeException(sprintf( - 'Missing aggregate id %s in payload of message %s. Payload was %s', - $idProperty, + 'Missing aggregate id key "%s" in payload of message %s. Payload was %s', + $idKey, $message->messageName(), json_encode($payload) )); } - return $payload[$idProperty]; + return $payload[$idKey]; } public function extractAggregateVersion(array $state): int { - $versionProperty = $this->versionName(); + $versionKey = $this->versionName(); - if (! array_key_exists($versionProperty, $state)) { + if (! array_key_exists($versionKey, $state)) { throw new RuntimeException(sprintf( - 'Missing aggregate version property "%s" in state. State was %s', - $versionProperty, + 'Missing aggregate version key "%s" in state. State was %s', + $versionKey, json_encode($state) )); } - return $state[$versionProperty]; + return $state[$versionKey]; } public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher diff --git a/src/Kernel.php b/src/Kernel.php index e72ed23..f233c52 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -30,29 +30,24 @@ * builds a dispatcher to return a function that receives a messages and return the state * * usage: - * $dispatch = buildDispatcher($eventStoreFactory, $snapshotStoreFactory, $commandMap, $producerFactory); + * $dispatch = buildDispatcher($eventStoreFactory, $snapshotStoreFactory, $aggregateDefinition, $handlerMap, $producerFactory); * $state = $dispatch($message); * * $producerFactory is expected to be a callback that returns an instance of Prooph\ServiceBus\Async\MessageProducer. * $commandMap is expected to be an array like this: * [ - * RegisterUser::class => [ - * 'handler' => function (array $state, Message $message) use (&$factories): AggregateResult { - * return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']()); - * }, - * 'definition' => UserAggregateDefinition::class, - * ], - * ChangeUserName::class => [ - * 'handler' => '\Prooph\MicroExample\Model\User\changeUserName', - * 'definition' => UserAggregateDefinition::class, - * ], + * RegisterUser::class => function (array $state, Message $message) use (&$factories): AggregateResult { + * return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']()); + * }, + * ChangeUserName::class => \Prooph\MicroExample\Model\User\changeUserName, * ] * $message is expected to be an instance of Prooph\Common\Messaging\Message */ function buildCommandDispatcher( callable $eventStoreFactory, callable $snapshotStoreFactory, - array $commandMap, + AggregateDefiniton $aggregateDefiniton, + array $handlerMap, callable $producerFactory, callable $startProducerTransaction = null, callable $commitProducerTransaction = null @@ -60,44 +55,37 @@ function buildCommandDispatcher( return function (Message $message) use ( $eventStoreFactory, $snapshotStoreFactory, - $commandMap, + $aggregateDefiniton, + $handlerMap, $producerFactory, $startProducerTransaction, $commitProducerTransaction ) { - $getDefinition = function (Message $message) use ($commandMap): AggregateDefiniton { - return getAggregateDefinition($message, $commandMap); + $loadState = function (Message $message) use ($aggregateDefiniton, $snapshotStoreFactory): array { + return loadState($snapshotStoreFactory(), $message, $aggregateDefiniton); }; - $loadState = function (AggregateDefiniton $definiton) use ($message, $snapshotStoreFactory): array { - return loadState($snapshotStoreFactory(), $message, $definiton); - }; - - $loadEvents = function (array $state) use ($message, $getDefinition, $eventStoreFactory): Iterator { - $definition = $getDefinition($message); - /* @var AggregateDefiniton $definition */ - $aggregateId = $definition->extractAggregateId($message); + $loadEvents = function (array $state) use ($message, $aggregateDefiniton, $eventStoreFactory): Iterator { + $aggregateId = $aggregateDefiniton->extractAggregateId($message); if (empty($state)) { $nextVersion = 1; } else { - $nextVersion = $definition->extractAggregateVersion($state) + 1; + $nextVersion = $aggregateDefiniton->extractAggregateVersion($state) + 1; } return loadEvents( - $definition->streamName($aggregateId), - $definition->metadataMatcher($aggregateId, $nextVersion), + $aggregateDefiniton->streamName($aggregateId), + $aggregateDefiniton->metadataMatcher($aggregateId, $nextVersion), $eventStoreFactory ); }; - $reconstituteState = function (Iterator $events) use ($message, $getDefinition): array { - $definition = $getDefinition($message); - - return $definition->reconstituteState($events); + $reconstituteState = function (Iterator $events) use ($message, $aggregateDefiniton): array { + return $aggregateDefiniton->reconstituteState($events); }; - $handleCommand = function (array $state) use ($message, $commandMap): AggregateResult { - $handler = getHandler($message, $commandMap); + $handleCommand = function (array $state) use ($message, $handlerMap): AggregateResult { + $handler = getHandler($message, $handlerMap); $aggregateResult = $handler($state, $message); @@ -108,10 +96,17 @@ function buildCommandDispatcher( return $aggregateResult; }; - $persistEvents = function (AggregateResult $aggregateResult) use ($eventStoreFactory, $message, $getDefinition): AggregateResult { - $definition = $getDefinition($message); - - return persistEvents($aggregateResult, $eventStoreFactory, $definition, $definition->extractAggregateId($message)); + $persistEvents = function (AggregateResult $aggregateResult) use ( + $eventStoreFactory, + $message, + $aggregateDefiniton + ): AggregateResult { + return persistEvents( + $aggregateResult, + $eventStoreFactory, + $aggregateDefiniton, + $aggregateDefiniton->extractAggregateId($message) + ); }; $publishEvents = function (AggregateResult $aggregateResult) use ( @@ -123,7 +118,6 @@ function buildCommandDispatcher( }; return pipleline( - $getDefinition, $loadState, $loadEvents, $reconstituteState, @@ -172,14 +166,12 @@ function loadState(SnapshotStore $snapshotStore, Message $message, AggregateDefi function loadEvents( StreamName $streamName, ?MetadataMatcher $metadataMatcher, - callable $eventStoreFactory, - int $fromVersion = 1, - array $state = [] + callable $eventStoreFactory ): Iterator { $eventStore = $eventStoreFactory(); if ($eventStore->hasStream($streamName)) { - return $eventStore->load($streamName, $fromVersion, null, $metadataMatcher)->streamEvents(); + return $eventStore->load($streamName, 1, null, $metadataMatcher)->streamEvents(); } return new ArrayIterator(); @@ -187,8 +179,12 @@ function loadEvents( const persistEvents = 'Prooph\Micro\Kernel\persistEvents'; -function persistEvents(AggregateResult $aggregateResult, callable $eventStoreFactory, AggregateDefiniton $definition, string $aggregateId): AggregateResult -{ +function persistEvents( + AggregateResult $aggregateResult, + callable $eventStoreFactory, + AggregateDefiniton $definition, + string $aggregateId +): AggregateResult { $events = $aggregateResult->raisedEvents(); $metadataEnricher = function (Message $event) use ($aggregateResult, $definition, $aggregateId) { @@ -259,10 +255,13 @@ function aggregateState(AggregateResult $aggregateResult): array function getHandler(Message $message, array $commandMap): callable { if (! array_key_exists($message->messageName(), $commandMap)) { - throw new RuntimeException(sprintf('Unknown message "%s". Message name not mapped to an aggregate.', $message->messageName())); + throw new RuntimeException(sprintf( + 'Unknown message "%s". Message name not mapped to an aggregate.', + $message->messageName() + )); } - return $commandMap[$message->messageName()]['handler']; + return $commandMap[$message->messageName()]; } const getAggregateDefinition = 'Prooph\Micro\Kernel\getAggregateDefinition'; @@ -278,7 +277,10 @@ function getAggregateDefinition(Message $message, array $commandMap): AggregateD } if (! isset($commandMap[$messageName])) { - throw new RuntimeException(sprintf('Unknown message "%s". Message name not mapped to an aggregate.', $message->messageName())); + throw new RuntimeException(sprintf( + 'Unknown message "%s". Message name not mapped to an aggregate.', + $message->messageName() + )); } $cached[$messageName] = new $commandMap[$messageName]['definition'](); diff --git a/tests/KernelTest.php b/tests/KernelTest.php index bb34baa..9734b76 100644 --- a/tests/KernelTest.php +++ b/tests/KernelTest.php @@ -34,13 +34,10 @@ class KernelTest extends TestCase */ public function it_builds_command_dispatcher_and_dispatches(): void { - $commandMap = [ - 'some_command' => [ - 'handler' => function (array $state, Message $message): AggregateResult { - return new AggregateResult(['some' => 'state']); - }, - 'definition' => TestAggregateDefinition::class, - ], + $handlerMap = [ + 'some_command' => function (array $state, Message $message): AggregateResult { + return new AggregateResult(['some' => 'state']); + }, ]; $eventStoreFactory = function (): EventStore { @@ -72,7 +69,13 @@ public function it_builds_command_dispatcher_and_dispatches(): void }; }; - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStoreFactory, $snapshotStoreFactory, $commandMap, $producerFactory); + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher( + $eventStoreFactory, + $snapshotStoreFactory, + new TestAggregateDefinition(), + $handlerMap, + $producerFactory + ); $command = $this->prophesize(Message::class); $command->messageName()->willReturn('some_command')->shouldBeCalled(); @@ -87,13 +90,10 @@ public function it_builds_command_dispatcher_and_dispatches(): void */ public function it_builds_command_dispatcher_and_dispatches_with_transactional_publishing(): void { - $commandMap = [ - 'some_command' => [ - 'handler' => function (array $state, Message $message): AggregateResult { - return new AggregateResult(['some' => 'state']); - }, - 'definition' => TestAggregateDefinition::class, - ], + $handlerMap = [ + 'some_command' => function (array $state, Message $message): AggregateResult { + return new AggregateResult(['some' => 'state']); + }, ]; $eventStoreFactory = function (): EventStore { @@ -136,7 +136,15 @@ public function it_builds_command_dispatcher_and_dispatches_with_transactional_p $commited = true; }; - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStoreFactory, $snapshotStoreFactory, $commandMap, $producerFactory, $start, $commit); + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher( + $eventStoreFactory, + $snapshotStoreFactory, + new TestAggregateDefinition(), + $handlerMap, + $producerFactory, + $start, + $commit + ); $command = $this->prophesize(Message::class); $command->messageName()->willReturn('some_command')->shouldBeCalled(); @@ -153,13 +161,10 @@ public function it_builds_command_dispatcher_and_dispatches_with_transactional_p */ public function it_builds_command_dispatcher_and_dispatches_but_breaks_when_handler_returns_invalid_result(): void { - $commandMap = [ - 'some_command' => [ - 'handler' => function (array $state, Message $message): string { - return 'invalid'; - }, - 'definition' => TestAggregateDefinition::class, - ], + $handlerMap = [ + 'some_command' => function (array $state, Message $message): string { + return 'invalid'; + }, ]; $eventStoreFactory = function (): EventStore { @@ -190,14 +195,20 @@ public function it_builds_command_dispatcher_and_dispatches_but_breaks_when_hand }; }; - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStoreFactory, $snapshotStoreFactory, $commandMap, $producerFactory); + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher( + $eventStoreFactory, + $snapshotStoreFactory, + new TestAggregateDefinition(), + $handlerMap, + $producerFactory + ); $command = $this->prophesize(Message::class); $command->messageName()->willReturn('some_command')->shouldBeCalled(); $result = $dispatch($command->reveal()); - $this->assertInstanceOf(\Exception::class, $result); + $this->assertInstanceOf(\RuntimeException::class, $result); $this->assertEquals('Invalid aggregate result returned', $result->getMessage()); } @@ -393,10 +404,10 @@ public function it_gets_handler(): void $message = $this->prophesize(Message::class); $message->messageName()->willReturn('foo')->shouldBeCalled(); - $commandMap = ['foo' => ['handler' => function (): void { - }]]; + $handlerMap = ['foo' => function (): void { + }]; - $result = f\getHandler($message->reveal(), $commandMap); + $result = f\getHandler($message->reveal(), $handlerMap); $this->assertInstanceOf(\Closure::class, $result); } @@ -411,10 +422,10 @@ public function it_throws_exception_when_no_handler_found(): void $message = $this->prophesize(Message::class); $message->messageName()->willReturn('unknown')->shouldBeCalled(); - $commandMap = ['foo' => ['handler' => function (): void { + $handlerMap = ['foo' => ['handler' => function (): void { }]]; - f\getHandler($message->reveal(), $commandMap); + f\getHandler($message->reveal(), $handlerMap); } /** @@ -425,13 +436,13 @@ public function it_gets_aggregate_definition_from_cache(): void $message = $this->prophesize(Message::class); $message->messageName()->willReturn('foo')->shouldBeCalled(); - $commandMap = ['foo' => ['definition' => TestAggregateDefinition::class]]; + $handlerMap = ['foo' => ['definition' => TestAggregateDefinition::class]]; - $result = f\getAggregateDefinition($message->reveal(), $commandMap); + $result = f\getAggregateDefinition($message->reveal(), $handlerMap); $this->assertInstanceOf(TestAggregateDefinition::class, $result); - $result2 = f\getAggregateDefinition($message->reveal(), $commandMap); + $result2 = f\getAggregateDefinition($message->reveal(), $handlerMap); $this->assertSame($result, $result2); } @@ -446,9 +457,9 @@ public function it_throws_exception_when_no_definition_found(): void $message = $this->prophesize(Message::class); $message->messageName()->willReturn('bar')->shouldBeCalled(); - $commandMap = []; + $handlerMap = []; - f\getAggregateDefinition($message->reveal(), $commandMap); + f\getAggregateDefinition($message->reveal(), $handlerMap); } /**