Skip to content

Commit

Permalink
revert command map
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Jan 22, 2017
1 parent 5e70698 commit 2074b3c
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 66 deletions.
22 changes: 13 additions & 9 deletions examples/register_and_change_username.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,30 @@
//We could also use a container here, if dependencies grow
$factories = include 'Infrastructure/factories.php';

$handlerMap = [
RegisterUser::class => function (array $state, Message $message) use (&$factories): AggregateResult {
return User\registerUser($state, $message, $factories['emailGuard']());
},
ChangeUserName::class => User\changeUserName,
$commandMap = [
RegisterUser::class => [
'handler' => function (array $state, Message $message) use (&$factories): AggregateResult {
return User\registerUser($state, $message, $factories['emailGuard']());
},
'definition' => UserAggregateDefinition::class,
],
ChangeUserName::class => [
'handler' => User\changeUserName,
'definition' => UserAggregateDefinition::class,
],
];

$dispatch = Kernel\buildCommandDispatcher(
$factories['eventStore'],
$factories['snapshotStore'],
new UserAggregateDefinition(),
$handlerMap,
$commandMap,
$factories['dummyProducer']
);

// uncomment to enable amqp publisher
//$dispatch = Kernel\buildCommandDispatcher(
// $factories['eventStore'],
// new UserAggregateDefinition(),
// $handlerMap,
// $commandMap,
// $factories['amqpProducer'],
// $factories['startAmqpTransaction'],
// $factories['commitAmqpTransaction']
Expand Down
99 changes: 66 additions & 33 deletions src/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,62 +30,75 @@
* builds a dispatcher to return a function that receives a messages and return the state
*
* usage:
* $dispatch = buildDispatcher($eventStoreFactory, $snapshotStoreFactory, $aggregateDefinition, $handlerMap, $producerFactory);
* $dispatch = buildDispatcher($eventStoreFactory, $snapshotStoreFactory, $commandMap, $producerFactory);
* $state = $dispatch($message);
*
* $producerFactory is expected to be a callback that returns an instance of Prooph\ServiceBus\Async\MessageProducer.
* $commandMap is expected to be an array like this:
* [
* RegisterUser::class => function (array $state, Message $message) use (&$factories): AggregateResult {
* return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']());
* },
* ChangeUserName::class => \Prooph\MicroExample\Model\User\changeUserName,
* RegisterUser::class => [
* 'handler' => function (array $state, Message $message) use (&$factories): AggregateResult {
* return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']());
* },
* 'definition' => UserAggregateDefinition::class,
* ],
* ChangeUserName::class => [
* 'handler' => '\Prooph\MicroExample\Model\User\changeUserName',
* 'definition' => UserAggregateDefinition::class,
* ],
* ]
* $message is expected to be an instance of Prooph\Common\Messaging\Message
*/
function buildCommandDispatcher(
callable $eventStoreFactory,
callable $snapshotStoreFactory,
AggregateDefiniton $aggregateDefiniton,
array $handlerMap,
array $commandMap,
callable $producerFactory,
callable $startProducerTransaction = null,
callable $commitProducerTransaction = null
): callable {
return function (Message $message) use (
$eventStoreFactory,
$snapshotStoreFactory,
$aggregateDefiniton,
$handlerMap,
$commandMap,
$producerFactory,
$startProducerTransaction,
$commitProducerTransaction
) {
$loadState = function (Message $message) use ($aggregateDefiniton, $snapshotStoreFactory): array {
return loadState($snapshotStoreFactory(), $message, $aggregateDefiniton);
$getDefinition = function (Message $message) use ($commandMap): AggregateDefiniton {
return getAggregateDefinition($message, $commandMap);
};

$loadEvents = function (array $state) use ($message, $aggregateDefiniton, $eventStoreFactory): Iterator {
$aggregateId = $aggregateDefiniton->extractAggregateId($message);
$loadState = function (AggregateDefiniton $definiton) use ($message, $snapshotStoreFactory): array {
return loadState($snapshotStoreFactory(), $message, $definiton);
};

$loadEvents = function (array $state) use ($message, $getDefinition, $eventStoreFactory): Iterator {
$definition = $getDefinition($message);
/* @var AggregateDefiniton $definition */
$aggregateId = $definition->extractAggregateId($message);

if (empty($state)) {
$nextVersion = 1;
} else {
$nextVersion = $aggregateDefiniton->extractAggregateVersion($state) + 1;
$nextVersion = $definition->extractAggregateVersion($state) + 1;
}

return loadEvents(
$aggregateDefiniton->streamName($aggregateId),
$aggregateDefiniton->metadataMatcher($aggregateId, $nextVersion),
$definition->streamName($aggregateId),
$definition->metadataMatcher($aggregateId, $nextVersion),
$eventStoreFactory
);
};

$reconstituteState = function (Iterator $events) use ($message, $aggregateDefiniton): array {
return $aggregateDefiniton->reconstituteState($events);
$reconstituteState = function (Iterator $events) use ($message, $getDefinition): array {
$definition = $getDefinition($message);

return $definition->reconstituteState($events);
};

$handleCommand = function (array $state) use ($message, $handlerMap): AggregateResult {
$handler = getHandler($message, $handlerMap);
$handleCommand = function (array $state) use ($message, $commandMap): AggregateResult {
$handler = getHandler($message, $commandMap);

$aggregateResult = $handler($state, $message);

Expand All @@ -96,28 +109,27 @@ function buildCommandDispatcher(
return $aggregateResult;
};

$persistEvents = function (AggregateResult $aggregateResult) use (
$eventStoreFactory,
$message,
$aggregateDefiniton
): AggregateResult {
return persistEvents(
$aggregateResult,
$eventStoreFactory,
$aggregateDefiniton,
$aggregateDefiniton->extractAggregateId($message)
);
$persistEvents = function (AggregateResult $aggregateResult) use ($eventStoreFactory, $message, $getDefinition): AggregateResult {
$definition = $getDefinition($message);

return persistEvents($aggregateResult, $eventStoreFactory, $definition, $definition->extractAggregateId($message));
};

$publishEvents = function (AggregateResult $aggregateResult) use (
$producerFactory,
$startProducerTransaction,
$commitProducerTransaction
): AggregateResult {
return publishEvents($aggregateResult, $producerFactory, $startProducerTransaction, $commitProducerTransaction);
return publishEvents(
$aggregateResult,
$producerFactory,
$startProducerTransaction,
$commitProducerTransaction
);
};

return pipleline(
$getDefinition,
$loadState,
$loadEvents,
$reconstituteState,
Expand Down Expand Up @@ -261,5 +273,26 @@ function getHandler(Message $message, array $commandMap): callable
));
}

return $commandMap[$message->messageName()];
return $commandMap[$message->messageName()]['handler'];
}

const getAggregateDefinition = 'Prooph\Micro\Kernel\getAggregateDefinition';

function getAggregateDefinition(Message $message, array $commandMap): AggregateDefiniton
{
static $cached = [];

$messageName = $message->messageName();

if (isset($cached[$messageName])) {
return $cached[$messageName];
}

if (! isset($commandMap[$messageName])) {
throw new RuntimeException(sprintf('Unknown message %s. Message name not mapped to an aggregate.', $message->messageName()));
}

$cached[$messageName] = new $commandMap[$messageName]['definition']();

return $cached[$messageName];
}
92 changes: 68 additions & 24 deletions tests/KernelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ class KernelTest extends TestCase
*/
public function it_builds_command_dispatcher_and_dispatches(): void
{
$handlerMap = [
'some_command' => function (array $state, Message $message): AggregateResult {
return new AggregateResult(['some' => 'state']);
},
$commandMap = [
'some_command' => [
'handler' => function (array $state, Message $message): AggregateResult {
return new AggregateResult(['some' => 'state']);
},
'definition' => TestAggregateDefinition::class,
],
];

$eventStoreFactory = function (): EventStore {
Expand Down Expand Up @@ -72,16 +75,14 @@ public function it_builds_command_dispatcher_and_dispatches(): void
$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher(
$eventStoreFactory,
$snapshotStoreFactory,
new TestAggregateDefinition(),
$handlerMap,
$commandMap,
$producerFactory
);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$result = $dispatch($command->reveal());

$this->assertEquals(['some' => 'state'], $result);
}

Expand All @@ -90,10 +91,13 @@ public function it_builds_command_dispatcher_and_dispatches(): void
*/
public function it_builds_command_dispatcher_and_dispatches_with_transactional_publishing(): void
{
$handlerMap = [
'some_command' => function (array $state, Message $message): AggregateResult {
return new AggregateResult(['some' => 'state']);
},
$commandMap = [
'some_command' => [
'handler' => function (array $state, Message $message): AggregateResult {
return new AggregateResult(['some' => 'state']);
},
'definition' => TestAggregateDefinition::class,
],
];

$eventStoreFactory = function (): EventStore {
Expand Down Expand Up @@ -139,8 +143,7 @@ public function it_builds_command_dispatcher_and_dispatches_with_transactional_p
$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher(
$eventStoreFactory,
$snapshotStoreFactory,
new TestAggregateDefinition(),
$handlerMap,
$commandMap,
$producerFactory,
$start,
$commit
Expand All @@ -161,10 +164,13 @@ public function it_builds_command_dispatcher_and_dispatches_with_transactional_p
*/
public function it_builds_command_dispatcher_and_dispatches_but_breaks_when_handler_returns_invalid_result(): void
{
$handlerMap = [
'some_command' => function (array $state, Message $message): string {
return 'invalid';
},
$commandMap = [
'some_command' => [
'handler' => function (array $state, Message $message): string {
return 'invalid';
},
'definition' => TestAggregateDefinition::class,
],
];

$eventStoreFactory = function (): EventStore {
Expand Down Expand Up @@ -198,8 +204,7 @@ public function it_builds_command_dispatcher_and_dispatches_but_breaks_when_hand
$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher(
$eventStoreFactory,
$snapshotStoreFactory,
new TestAggregateDefinition(),
$handlerMap,
$commandMap,
$producerFactory
);

Expand Down Expand Up @@ -404,10 +409,15 @@ public function it_gets_handler(): void
$message = $this->prophesize(Message::class);
$message->messageName()->willReturn('foo')->shouldBeCalled();

$handlerMap = ['foo' => function (): void {
}];
$commandMap = [
'foo' => [
'handler' => function (): void {
},
'definition' => TestAggregateDefinition::class,
],
];

$result = f\getHandler($message->reveal(), $handlerMap);
$result = f\getHandler($message->reveal(), $commandMap);

$this->assertInstanceOf(\Closure::class, $result);
}
Expand All @@ -422,10 +432,44 @@ public function it_throws_exception_when_no_handler_found(): void
$message = $this->prophesize(Message::class);
$message->messageName()->willReturn('unknown')->shouldBeCalled();

$handlerMap = ['foo' => ['handler' => function (): void {
$commandMap = ['foo' => ['handler' => function (): void {
}]];

f\getHandler($message->reveal(), $handlerMap);
f\getHandler($message->reveal(), $commandMap);
}

/**
* @test
*/
public function it_gets_aggregate_definition_from_cache(): void
{
$message = $this->prophesize(Message::class);
$message->messageName()->willReturn('foo')->shouldBeCalled();

$commandMap = ['foo' => ['definition' => TestAggregateDefinition::class]];

$result = f\getAggregateDefinition($message->reveal(), $commandMap);

$this->assertInstanceOf(TestAggregateDefinition::class, $result);

$result2 = f\getAggregateDefinition($message->reveal(), $commandMap);

$this->assertSame($result, $result2);
}

/**
* @test
*/
public function it_throws_exception_when_no_definition_found(): void
{
$this->expectException(\RuntimeException::class);

$message = $this->prophesize(Message::class);
$message->messageName()->willReturn('bar')->shouldBeCalled();

$commandMap = [];

f\getAggregateDefinition($message->reveal(), $commandMap);
}

/**
Expand Down

0 comments on commit 2074b3c

Please sign in to comment.