diff --git a/examples/Infrastructure/UserAggregateDefinition.php b/examples/Infrastructure/UserAggregateDefinition.php index 32615e2..fc60915 100644 --- a/examples/Infrastructure/UserAggregateDefinition.php +++ b/examples/Infrastructure/UserAggregateDefinition.php @@ -29,7 +29,7 @@ public function streamName(): StreamName return new StreamName('user_stream'); } - public function apply(array $state, Message ...$events): array + public function apply($state, Message ...$events) { return User\apply($state, ...$events); } diff --git a/examples/Model/User.php b/examples/Model/User.php index 7f22aae..07678bb 100644 --- a/examples/Model/User.php +++ b/examples/Model/User.php @@ -45,8 +45,12 @@ function changeUserName(callable $stateResolver, ChangeUserName $command): array const apply = '\Prooph\MicroExample\Model\User\apply'; -function apply(array $state, Message ...$events): array +function apply($state, Message ...$events): array { + if (null === $state) { + $state = []; + } + foreach ($events as $event) { switch ($event->messageName()) { case UserWasRegistered::class: diff --git a/src/AbstractAggregateDefinition.php b/src/AbstractAggregateDefinition.php index db6d8b0..b642c7a 100644 --- a/src/AbstractAggregateDefinition.php +++ b/src/AbstractAggregateDefinition.php @@ -110,7 +110,7 @@ public function enrich(Message $message): Message }; } - public function reconstituteState(array $state, Iterator $events): array + public function reconstituteState($state, Iterator $events) { return $this->apply($state, ...$events); } @@ -119,4 +119,9 @@ public function hasOneStreamPerAggregate(): bool { return false; } + + public function stateType(): string + { + return 'array'; + } } diff --git a/src/AggregateDefinition.php b/src/AggregateDefinition.php index fa359a1..8927e04 100644 --- a/src/AggregateDefinition.php +++ b/src/AggregateDefinition.php @@ -28,7 +28,8 @@ public function aggregateType(): string; public function identifierName(): string; /** - * Returns the key in message payload and state to identify version number + * Returns the key in message payload and state (array) to identify version number + * If state is an object, it represents the method name to call in order to receive the version */ public function versionName(): string; @@ -44,7 +45,9 @@ public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?Me public function hasOneStreamPerAggregate(): bool; - public function reconstituteState(array $state, Iterator $events): array; + public function reconstituteState($state, Iterator $events); - public function apply(array $state, Message ...$events): array; + public function apply($state, Message ...$events); + + public function stateType(): string; } diff --git a/src/Kernel.php b/src/Kernel.php index c46e58b..573781f 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -64,10 +64,46 @@ function buildCommandDispatcher( return Failure($e); } - $stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId): array { + $stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId) { $state = loadState($message, $definition, $snapshotStore); + $stateType = $definition->stateType(); - $nextVersion = empty($state) ? 1 : $state[$definition->versionName()] + 1; + if ('array' === $stateType && ! is_array($state)) { + throw new \UnexpectedValueException('State must be an array according to aggregate definition'); + } + + if ('array' !== $stateType && null !== $state && ! $state instanceof $stateType) { + throw new \UnexpectedValueException(sprintf( + 'State must be an instance of %s or null according to aggregate definition', + $stateType + )); + } + + switch (gettype($state)) { + case 'array': + if (empty($state)) { + $nextVersion = 1; + } else { + $versionKey = $definition->versionName(); + if (! array_key_exists($versionKey, $state)) { + throw new RuntimeException(sprintf( + 'Missing aggregate version key "%s" in state. State was %s', + $versionKey, + $message->messageName(), + json_encode($state) + )); + } + $nextVersion = $state[$versionKey] + 1; + } + break; + case 'object': + $nextVersion = $state->{$definition->versionName()}() + 1; + break; + case 'NULL': + default: + $nextVersion = 1; + break; + } $events = loadEvents($eventStore, $definition, $aggregateId, $nextVersion); @@ -119,16 +155,21 @@ function buildCommandDispatcher( const loadState = '\Prooph\Micro\Kernel\loadState'; -function loadState(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null): array +/** + * @return mixed + */ +function loadState(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null) { + $arrayState = 'array' === $definition->stateType(); + if (null === $snapshotStore) { - return []; + return $arrayState ? [] : null; } $aggregate = $snapshotStore->get($definition->aggregateType(), $definition->extractAggregateId($message)); if (! $aggregate) { - return []; + return $arrayState ? [] : null; } return $aggregate->aggregateRoot(); @@ -151,8 +192,6 @@ function loadEvents( if ($definition->hasOneStreamPerAggregate()) { $streamName = new StreamName($streamName->toString() . '-' . $aggregateId); // append aggregate id to stream name - } else { - $nextVersion = 1; // we don't know the event position, the metadata matcher will help, we start at 1 } return $eventStore->load($streamName, $nextVersion, null, $metadataMatcher); diff --git a/tests/AbstractAggregateDefinitionTest.php b/tests/AbstractAggregateDefinitionTest.php index b81e043..42fd9f5 100644 --- a/tests/AbstractAggregateDefinitionTest.php +++ b/tests/AbstractAggregateDefinitionTest.php @@ -170,7 +170,7 @@ public function aggregateType(): string return 'foo'; } - public function apply(array $state, Message ...$events): array + public function apply($state, Message ...$events): array { if (! isset($state['count'])) { $state['count'] = 0; diff --git a/tests/KernelTest.php b/tests/KernelTest.php index 817502e..0abd0c1 100644 --- a/tests/KernelTest.php +++ b/tests/KernelTest.php @@ -28,7 +28,9 @@ use Prooph\SnapshotStore\SnapshotStore; use ProophTest\Micro\TestAsset\OneStreamPerAggregateTestAggregateDefinition; use ProophTest\Micro\TestAsset\SingleStreamTestAggregateDefinition; +use ProophTest\Micro\TestAsset\SingleStreamTestAggregateDefinition2; use ProophTest\Micro\TestAsset\TestDomainEvent; +use ProophTest\Micro\TestAsset\TestState; use Prophecy\Argument; class KernelTest extends TestCase @@ -66,6 +68,189 @@ public function it_builds_command_dispatcher_and_dispatches(): void } } + /** + * @test + */ + public function it_builds_command_dispatcher_and_dispatches_with_state_coming_from_snapshot_store(): void + { + $commandMap = [ + 'some_command' => [ + 'handler' => function (callable $stateResolver, Message $message): array { + $stateResolver(); + + return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; + }, + 'definition' => SingleStreamTestAggregateDefinition::class, + ], + ]; + + $eventStore = $this->prophesize(EventStore::class); + $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); + $eventStore->load(Argument::type(StreamName::class), 5, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); + $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); + + $snapshotStore = new InMemorySnapshotStore(); + $snapshotStore->save(new Snapshot('test', 'some_id', ['version' => 4], 4, new \DateTimeImmutable())); + + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); + + $command = $this->prophesize(Message::class); + $command->messageName()->willReturn('some_command')->shouldBeCalled(); + + $validation = $dispatch($command->reveal()); + + if ($validation instanceof Failure) { + $this->fail($validation->toString()); + } + } + + /** + * @test + */ + public function it_builds_command_dispatcher_and_dispatches_with_object_state_coming_from_snapshot_store(): void + { + $commandMap = [ + 'some_command' => [ + 'handler' => function (callable $stateResolver, Message $message): array { + $stateResolver(); + + return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; + }, + 'definition' => SingleStreamTestAggregateDefinition2::class, + ], + ]; + + $eventStore = $this->prophesize(EventStore::class); + $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); + $eventStore->load(Argument::type(StreamName::class), 5, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); + $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); + + $snapshotStore = new InMemorySnapshotStore(); + $snapshotStore->save(new Snapshot('test', 'some_id', new TestState(), 4, new \DateTimeImmutable())); + + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); + + $command = $this->prophesize(Message::class); + $command->messageName()->willReturn('some_command')->shouldBeCalled(); + + $validation = $dispatch($command->reveal()); + + if ($validation instanceof Failure) { + $this->fail($validation->toString()); + } + } + + /** + * @test + */ + public function it_builds_command_dispatcher_and_dispatches_without_object_state_coming_from_snapshot_store(): void + { + $commandMap = [ + 'some_command' => [ + 'handler' => function (callable $stateResolver, Message $message): array { + $stateResolver(); + + return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; + }, + 'definition' => SingleStreamTestAggregateDefinition2::class, + ], + ]; + + $eventStore = $this->prophesize(EventStore::class); + $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); + $eventStore->load(Argument::type(StreamName::class), 1, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); + $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); + + $snapshotStore = new InMemorySnapshotStore(); + + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); + + $command = $this->prophesize(Message::class); + $command->messageName()->willReturn('some_command')->shouldBeCalled(); + + $validation = $dispatch($command->reveal()); + + if ($validation instanceof Failure) { + $this->fail($validation->toString()); + } + } + + /** + * @test + */ + public function it_builds_command_dispatcher_and_throws_exception_when_invalid_state_returned(): void + { + $commandMap = [ + 'some_command' => [ + 'handler' => function (callable $stateResolver, Message $message): array { + $stateResolver(); + + return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; + }, + 'definition' => SingleStreamTestAggregateDefinition2::class, + ], + ]; + + $eventStore = $this->prophesize(EventStore::class); + + $snapshotStore = new InMemorySnapshotStore(); + $snapshotStore->save(new Snapshot('test', 'some_id', ['version' => 4], 4, new \DateTimeImmutable())); + + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); + + $command = $this->prophesize(Message::class); + $command->messageName()->willReturn('some_command')->shouldBeCalled(); + + $validation = $dispatch($command->reveal()); + + if ($validation instanceof Failure) { + $this->assertEquals( + 'Failure(UnexpectedValueException(State must be an instance of ProophTest\Micro\TestAsset\TestState or null according to aggregate definition))', + $validation->toString() + ); + } else { + $this->fail('No exception thrown'); + } + } + + /** + * @test + */ + public function it_builds_command_dispatcher_and_throws_exception_when_invalid_state_returned2(): void + { + $commandMap = [ + 'some_command' => [ + 'handler' => function (callable $stateResolver, Message $message): array { + $stateResolver(); + + return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; + }, + 'definition' => SingleStreamTestAggregateDefinition::class, + ], + ]; + + $eventStore = $this->prophesize(EventStore::class); + + $snapshotStore = new InMemorySnapshotStore(); + $snapshotStore->save(new Snapshot('test', 'some_id', new TestState(), 4, new \DateTimeImmutable())); + + $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); + + $command = $this->prophesize(Message::class); + $command->messageName()->willReturn('some_command')->shouldBeCalled(); + + $validation = $dispatch($command->reveal()); + + if ($validation instanceof Failure) { + $this->assertEquals( + 'Failure(UnexpectedValueException(State must be an array according to aggregate definition))', + $validation->toString() + ); + } else { + $this->fail('No exception thrown'); + } + } + /** * @test */ @@ -110,11 +295,11 @@ public function it_loads_state_from_empty_snapshot_store(): void $definition = $this->prophesize(AggregateDefinition::class); $definition->aggregateType()->willReturn('test')->shouldBeCalled(); $definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled(); + $definition->stateType()->willReturn('array')->shouldBeCalled(); $result = k\loadState($message, $definition->reveal(), $snapshotStore->reveal()); - $this->assertInternalType('array', $result); - $this->assertEmpty($result); + $this->assertEquals([], $result); } /** @@ -137,6 +322,7 @@ public function it_loads_state_from_snapshot_store(): void $definition = $this->prophesize(AggregateDefinition::class); $definition->aggregateType()->willReturn('test')->shouldBeCalled(); $definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled(); + $definition->stateType()->willReturn('array')->shouldBeCalled(); $result = k\loadState($message, $definition->reveal(), $snapshotStore); $this->assertEquals(['foo' => 'bar'], $result); @@ -151,6 +337,7 @@ public function it_returns_early_when_loading_state_and_no_snapshot_store_given( $message = $message->reveal(); $definition = $this->prophesize(AggregateDefinition::class); + $definition->stateType()->willReturn('array')->shouldBeCalled(); $result = k\loadState($message, $definition->reveal(), null); $this->assertEquals([], $result); diff --git a/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php b/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php index 29c0c55..3a43565 100644 --- a/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php +++ b/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php @@ -61,12 +61,12 @@ public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?Me return null; } - public function reconstituteState(array $state, Iterator $events): array + public function reconstituteState($state, Iterator $events): array { return $state; } - public function apply(array $state, Message ...$events): array + public function apply($state, Message ...$events): array { return []; } @@ -75,4 +75,9 @@ public function hasOneStreamPerAggregate(): bool { return true; } + + public function stateType(): string + { + return 'array'; + } } diff --git a/tests/TestAsset/SingleStreamTestAggregateDefinition.php b/tests/TestAsset/SingleStreamTestAggregateDefinition.php index 2779b5c..af7d06a 100644 --- a/tests/TestAsset/SingleStreamTestAggregateDefinition.php +++ b/tests/TestAsset/SingleStreamTestAggregateDefinition.php @@ -61,12 +61,12 @@ public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?Me return null; } - public function reconstituteState(array $state, Iterator $events): array + public function reconstituteState($state, Iterator $events) { return $state; } - public function apply(array $state, Message ...$events): array + public function apply($state, Message ...$events) { return []; } @@ -75,4 +75,9 @@ public function hasOneStreamPerAggregate(): bool { return false; } + + public function stateType(): string + { + return 'array'; + } } diff --git a/tests/TestAsset/SingleStreamTestAggregateDefinition2.php b/tests/TestAsset/SingleStreamTestAggregateDefinition2.php new file mode 100644 index 0000000..bd02cd1 --- /dev/null +++ b/tests/TestAsset/SingleStreamTestAggregateDefinition2.php @@ -0,0 +1,83 @@ + + * (c) 2017-2017 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\Micro\TestAsset; + +use Iterator; +use Prooph\Common\Messaging\Message; +use Prooph\EventStore\Metadata\MetadataEnricher; +use Prooph\EventStore\Metadata\MetadataMatcher; +use Prooph\EventStore\StreamName; +use Prooph\Micro\AggregateDefinition; + +final class SingleStreamTestAggregateDefinition2 implements AggregateDefinition +{ + public function identifierName(): string + { + return 'id'; + } + + public function aggregateType(): string + { + return 'test'; + } + + public function versionName(): string + { + return 'version'; + } + + public function extractAggregateId(Message $message): string + { + return 'some_id'; + } + + public function extractAggregateVersion(Message $message): int + { + return 1; + } + + public function streamName(): StreamName + { + return new StreamName('foo'); + } + + public function metadataEnricher(string $aggregateId, int $aggregateVersion, Message $causation = null): ?MetadataEnricher + { + return null; + } + + public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher + { + return null; + } + + public function reconstituteState($state, Iterator $events) + { + return $state; + } + + public function apply($state, Message ...$events) + { + return []; + } + + public function hasOneStreamPerAggregate(): bool + { + return false; + } + + public function stateType(): string + { + return TestState::class; + } +} diff --git a/tests/TestAsset/TestState.php b/tests/TestAsset/TestState.php new file mode 100644 index 0000000..6c65f81 --- /dev/null +++ b/tests/TestAsset/TestState.php @@ -0,0 +1,21 @@ + + * (c) 2017-2017 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\Micro\TestAsset; + +class TestState +{ + public function version(): int + { + return 4; + } +}