Skip to content

Commit

Permalink
we have only one aggregate defintion per microservice
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Jan 22, 2017
1 parent 38f3bc3 commit 3c334b0
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 106 deletions.
2 changes: 1 addition & 1 deletion examples/Infrastructure/factories.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

return $snapshotStore;
},
'producer' => function (): callable {
'dummyProducer' => function (): callable {
return function (Message $message): void {
};
},
Expand Down
24 changes: 10 additions & 14 deletions examples/register_and_change_username.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,26 @@
//We could also use a container here, if dependencies grow
$factories = include 'Infrastructure/factories.php';

$commandMap = [
RegisterUser::class => [
'handler' => function (array $state, Message $message) use (&$factories): AggregateResult {
return User\registerUser($state, $message, $factories['emailGuard']());
},
'definition' => UserAggregateDefinition::class,
],
ChangeUserName::class => [
'handler' => User\changeUserName,
'definition' => UserAggregateDefinition::class,
],
$handlerMap = [
RegisterUser::class => function (array $state, Message $message) use (&$factories): AggregateResult {
return User\registerUser($state, $message, $factories['emailGuard']());
},
ChangeUserName::class => User\changeUserName,
];

$dispatch = Kernel\buildCommandDispatcher(
$factories['eventStore'],
$factories['snapshotStore'],
$commandMap,
$factories['producer']
new UserAggregateDefinition(),
$handlerMap,
$factories['dummyProducer']
);

// uncomment to enable amqp publisher
//$dispatch = Kernel\buildCommandDispatcher(
// $factories['eventStore'],
// $commandMap,
// new UserAggregateDefinition(),
// $handlerMap,
// $factories['amqpProducer'],
// $factories['startAmqpTransaction'],
// $factories['commitAmqpTransaction']
Expand Down
20 changes: 10 additions & 10 deletions src/AbstractAggregateDefiniton.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,35 @@ public function versionName(): string

public function extractAggregateId(Message $message): string
{
$idProperty = $this->identifierName();
$idKey = $this->identifierName();

$payload = $message->payload();

if (! array_key_exists($idProperty, $payload)) {
if (! array_key_exists($idKey, $payload)) {
throw new RuntimeException(sprintf(
'Missing aggregate id %s in payload of message %s. Payload was %s',
$idProperty,
'Missing aggregate id key "%s" in payload of message %s. Payload was %s',
$idKey,
$message->messageName(),
json_encode($payload)
));
}

return $payload[$idProperty];
return $payload[$idKey];
}

public function extractAggregateVersion(array $state): int
{
$versionProperty = $this->versionName();
$versionKey = $this->versionName();

if (! array_key_exists($versionProperty, $state)) {
if (! array_key_exists($versionKey, $state)) {
throw new RuntimeException(sprintf(
'Missing aggregate version property "%s" in state. State was %s',
$versionProperty,
'Missing aggregate version key "%s" in state. State was %s',
$versionKey,
json_encode($state)
));
}

return $state[$versionProperty];
return $state[$versionKey];
}

public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher
Expand Down
94 changes: 48 additions & 46 deletions src/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,74 +30,62 @@
* builds a dispatcher to return a function that receives a messages and return the state
*
* usage:
* $dispatch = buildDispatcher($eventStoreFactory, $snapshotStoreFactory, $commandMap, $producerFactory);
* $dispatch = buildDispatcher($eventStoreFactory, $snapshotStoreFactory, $aggregateDefinition, $handlerMap, $producerFactory);
* $state = $dispatch($message);
*
* $producerFactory is expected to be a callback that returns an instance of Prooph\ServiceBus\Async\MessageProducer.
* $commandMap is expected to be an array like this:
* [
* RegisterUser::class => [
* 'handler' => function (array $state, Message $message) use (&$factories): AggregateResult {
* return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']());
* },
* 'definition' => UserAggregateDefinition::class,
* ],
* ChangeUserName::class => [
* 'handler' => '\Prooph\MicroExample\Model\User\changeUserName',
* 'definition' => UserAggregateDefinition::class,
* ],
* RegisterUser::class => function (array $state, Message $message) use (&$factories): AggregateResult {
* return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']());
* },
* ChangeUserName::class => \Prooph\MicroExample\Model\User\changeUserName,
* ]
* $message is expected to be an instance of Prooph\Common\Messaging\Message
*/
function buildCommandDispatcher(
callable $eventStoreFactory,
callable $snapshotStoreFactory,
array $commandMap,
AggregateDefiniton $aggregateDefiniton,
array $handlerMap,
callable $producerFactory,
callable $startProducerTransaction = null,
callable $commitProducerTransaction = null
): callable {
return function (Message $message) use (
$eventStoreFactory,
$snapshotStoreFactory,
$commandMap,
$aggregateDefiniton,
$handlerMap,
$producerFactory,
$startProducerTransaction,
$commitProducerTransaction
) {
$getDefinition = function (Message $message) use ($commandMap): AggregateDefiniton {
return getAggregateDefinition($message, $commandMap);
$loadState = function (Message $message) use ($aggregateDefiniton, $snapshotStoreFactory): array {
return loadState($snapshotStoreFactory(), $message, $aggregateDefiniton);
};

$loadState = function (AggregateDefiniton $definiton) use ($message, $snapshotStoreFactory): array {
return loadState($snapshotStoreFactory(), $message, $definiton);
};

$loadEvents = function (array $state) use ($message, $getDefinition, $eventStoreFactory): Iterator {
$definition = $getDefinition($message);
/* @var AggregateDefiniton $definition */
$aggregateId = $definition->extractAggregateId($message);
$loadEvents = function (array $state) use ($message, $aggregateDefiniton, $eventStoreFactory): Iterator {
$aggregateId = $aggregateDefiniton->extractAggregateId($message);
if (empty($state)) {
$nextVersion = 1;
} else {
$nextVersion = $definition->extractAggregateVersion($state) + 1;
$nextVersion = $aggregateDefiniton->extractAggregateVersion($state) + 1;
}

return loadEvents(
$definition->streamName($aggregateId),
$definition->metadataMatcher($aggregateId, $nextVersion),
$aggregateDefiniton->streamName($aggregateId),
$aggregateDefiniton->metadataMatcher($aggregateId, $nextVersion),
$eventStoreFactory
);
};

$reconstituteState = function (Iterator $events) use ($message, $getDefinition): array {
$definition = $getDefinition($message);

return $definition->reconstituteState($events);
$reconstituteState = function (Iterator $events) use ($message, $aggregateDefiniton): array {
return $aggregateDefiniton->reconstituteState($events);
};

$handleCommand = function (array $state) use ($message, $commandMap): AggregateResult {
$handler = getHandler($message, $commandMap);
$handleCommand = function (array $state) use ($message, $handlerMap): AggregateResult {
$handler = getHandler($message, $handlerMap);

$aggregateResult = $handler($state, $message);

Expand All @@ -108,10 +96,17 @@ function buildCommandDispatcher(
return $aggregateResult;
};

$persistEvents = function (AggregateResult $aggregateResult) use ($eventStoreFactory, $message, $getDefinition): AggregateResult {
$definition = $getDefinition($message);

return persistEvents($aggregateResult, $eventStoreFactory, $definition, $definition->extractAggregateId($message));
$persistEvents = function (AggregateResult $aggregateResult) use (
$eventStoreFactory,
$message,
$aggregateDefiniton
): AggregateResult {
return persistEvents(
$aggregateResult,
$eventStoreFactory,
$aggregateDefiniton,
$aggregateDefiniton->extractAggregateId($message)
);
};

$publishEvents = function (AggregateResult $aggregateResult) use (
Expand All @@ -123,7 +118,6 @@ function buildCommandDispatcher(
};

return pipleline(
$getDefinition,
$loadState,
$loadEvents,
$reconstituteState,
Expand Down Expand Up @@ -172,23 +166,25 @@ function loadState(SnapshotStore $snapshotStore, Message $message, AggregateDefi
function loadEvents(
StreamName $streamName,
?MetadataMatcher $metadataMatcher,
callable $eventStoreFactory,
int $fromVersion = 1,
array $state = []
callable $eventStoreFactory
): Iterator {
$eventStore = $eventStoreFactory();

if ($eventStore->hasStream($streamName)) {
return $eventStore->load($streamName, $fromVersion, null, $metadataMatcher)->streamEvents();
return $eventStore->load($streamName, 1, null, $metadataMatcher)->streamEvents();
}

return new ArrayIterator();
}

const persistEvents = 'Prooph\Micro\Kernel\persistEvents';

function persistEvents(AggregateResult $aggregateResult, callable $eventStoreFactory, AggregateDefiniton $definition, string $aggregateId): AggregateResult
{
function persistEvents(
AggregateResult $aggregateResult,
callable $eventStoreFactory,
AggregateDefiniton $definition,
string $aggregateId
): AggregateResult {
$events = $aggregateResult->raisedEvents();

$metadataEnricher = function (Message $event) use ($aggregateResult, $definition, $aggregateId) {
Expand Down Expand Up @@ -259,10 +255,13 @@ function aggregateState(AggregateResult $aggregateResult): array
function getHandler(Message $message, array $commandMap): callable
{
if (! array_key_exists($message->messageName(), $commandMap)) {
throw new RuntimeException(sprintf('Unknown message "%s". Message name not mapped to an aggregate.', $message->messageName()));
throw new RuntimeException(sprintf(
'Unknown message "%s". Message name not mapped to an aggregate.',
$message->messageName()
));
}

return $commandMap[$message->messageName()]['handler'];
return $commandMap[$message->messageName()];
}

const getAggregateDefinition = 'Prooph\Micro\Kernel\getAggregateDefinition';
Expand All @@ -278,7 +277,10 @@ function getAggregateDefinition(Message $message, array $commandMap): AggregateD
}

if (! isset($commandMap[$messageName])) {
throw new RuntimeException(sprintf('Unknown message "%s". Message name not mapped to an aggregate.', $message->messageName()));
throw new RuntimeException(sprintf(
'Unknown message "%s". Message name not mapped to an aggregate.',
$message->messageName()
));
}

$cached[$messageName] = new $commandMap[$messageName]['definition']();
Expand Down
Loading

0 comments on commit 3c334b0

Please sign in to comment.