Skip to content

Commit

Permalink
Merge pull request #45 from prooph/aggregate_state
Browse files Browse the repository at this point in the history
allow aggregate state to be an object
  • Loading branch information
codeliner committed Oct 2, 2017
2 parents 6e8b868 + e5887f8 commit db15268
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/Infrastructure/UserAggregateDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function streamName(): StreamName
return new StreamName('user_stream');
}

public function apply(array $state, Message ...$events): array
public function apply($state, Message ...$events)
{
return User\apply($state, ...$events);
}
Expand Down
6 changes: 5 additions & 1 deletion examples/Model/User.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ function changeUserName(callable $stateResolver, ChangeUserName $command): array

const apply = '\Prooph\MicroExample\Model\User\apply';

function apply(array $state, Message ...$events): array
function apply($state, Message ...$events): array
{
if (null === $state) {
$state = [];
}

foreach ($events as $event) {
switch ($event->messageName()) {
case UserWasRegistered::class:
Expand Down
7 changes: 6 additions & 1 deletion src/AbstractAggregateDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function enrich(Message $message): Message
};
}

public function reconstituteState(array $state, Iterator $events): array
public function reconstituteState($state, Iterator $events)
{
return $this->apply($state, ...$events);
}
Expand All @@ -119,4 +119,9 @@ public function hasOneStreamPerAggregate(): bool
{
return false;
}

public function stateType(): string
{
return 'array';
}
}
9 changes: 6 additions & 3 deletions src/AggregateDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public function aggregateType(): string;
public function identifierName(): string;

/**
* Returns the key in message payload and state to identify version number
* Returns the key in message payload and state (array) to identify version number
* If state is an object, it represents the method name to call in order to receive the version
*/
public function versionName(): string;

Expand All @@ -44,7 +45,9 @@ public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?Me

public function hasOneStreamPerAggregate(): bool;

public function reconstituteState(array $state, Iterator $events): array;
public function reconstituteState($state, Iterator $events);

public function apply(array $state, Message ...$events): array;
public function apply($state, Message ...$events);

public function stateType(): string;
}
53 changes: 46 additions & 7 deletions src/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,46 @@ function buildCommandDispatcher(
return Failure($e);
}

$stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId): array {
$stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId) {
$state = loadState($message, $definition, $snapshotStore);
$stateType = $definition->stateType();

$nextVersion = empty($state) ? 1 : $state[$definition->versionName()] + 1;
if ('array' === $stateType && ! is_array($state)) {
throw new \UnexpectedValueException('State must be an array according to aggregate definition');
}

if ('array' !== $stateType && null !== $state && ! $state instanceof $stateType) {
throw new \UnexpectedValueException(sprintf(
'State must be an instance of %s or null according to aggregate definition',
$stateType
));
}

switch (gettype($state)) {
case 'array':
if (empty($state)) {
$nextVersion = 1;
} else {
$versionKey = $definition->versionName();
if (! array_key_exists($versionKey, $state)) {
throw new RuntimeException(sprintf(
'Missing aggregate version key "%s" in state. State was %s',
$versionKey,
$message->messageName(),
json_encode($state)
));
}
$nextVersion = $state[$versionKey] + 1;
}
break;
case 'object':
$nextVersion = $state->{$definition->versionName()}() + 1;
break;
case 'NULL':
default:
$nextVersion = 1;
break;
}

$events = loadEvents($eventStore, $definition, $aggregateId, $nextVersion);

Expand Down Expand Up @@ -119,16 +155,21 @@ function buildCommandDispatcher(

const loadState = '\Prooph\Micro\Kernel\loadState';

function loadState(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null): array
/**
* @return mixed
*/
function loadState(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null)
{
$arrayState = 'array' === $definition->stateType();

if (null === $snapshotStore) {
return [];
return $arrayState ? [] : null;
}

$aggregate = $snapshotStore->get($definition->aggregateType(), $definition->extractAggregateId($message));

if (! $aggregate) {
return [];
return $arrayState ? [] : null;
}

return $aggregate->aggregateRoot();
Expand All @@ -151,8 +192,6 @@ function loadEvents(

if ($definition->hasOneStreamPerAggregate()) {
$streamName = new StreamName($streamName->toString() . '-' . $aggregateId); // append aggregate id to stream name
} else {
$nextVersion = 1; // we don't know the event position, the metadata matcher will help, we start at 1
}

return $eventStore->load($streamName, $nextVersion, null, $metadataMatcher);
Expand Down
2 changes: 1 addition & 1 deletion tests/AbstractAggregateDefinitionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public function aggregateType(): string
return 'foo';
}

public function apply(array $state, Message ...$events): array
public function apply($state, Message ...$events): array
{
if (! isset($state['count'])) {
$state['count'] = 0;
Expand Down
191 changes: 189 additions & 2 deletions tests/KernelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
use Prooph\SnapshotStore\SnapshotStore;
use ProophTest\Micro\TestAsset\OneStreamPerAggregateTestAggregateDefinition;
use ProophTest\Micro\TestAsset\SingleStreamTestAggregateDefinition;
use ProophTest\Micro\TestAsset\SingleStreamTestAggregateDefinition2;
use ProophTest\Micro\TestAsset\TestDomainEvent;
use ProophTest\Micro\TestAsset\TestState;
use Prophecy\Argument;

class KernelTest extends TestCase
Expand Down Expand Up @@ -66,6 +68,189 @@ public function it_builds_command_dispatcher_and_dispatches(): void
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_dispatches_with_state_coming_from_snapshot_store(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition::class,
],
];

$eventStore = $this->prophesize(EventStore::class);
$eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled();
$eventStore->load(Argument::type(StreamName::class), 5, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled();
$eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled();

$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot('test', 'some_id', ['version' => 4], 4, new \DateTimeImmutable()));

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->fail($validation->toString());
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_dispatches_with_object_state_coming_from_snapshot_store(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition2::class,
],
];

$eventStore = $this->prophesize(EventStore::class);
$eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled();
$eventStore->load(Argument::type(StreamName::class), 5, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled();
$eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled();

$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot('test', 'some_id', new TestState(), 4, new \DateTimeImmutable()));

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->fail($validation->toString());
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_dispatches_without_object_state_coming_from_snapshot_store(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition2::class,
],
];

$eventStore = $this->prophesize(EventStore::class);
$eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled();
$eventStore->load(Argument::type(StreamName::class), 1, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled();
$eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled();

$snapshotStore = new InMemorySnapshotStore();

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->fail($validation->toString());
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_throws_exception_when_invalid_state_returned(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition2::class,
],
];

$eventStore = $this->prophesize(EventStore::class);

$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot('test', 'some_id', ['version' => 4], 4, new \DateTimeImmutable()));

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->assertEquals(
'Failure(UnexpectedValueException(State must be an instance of ProophTest\Micro\TestAsset\TestState or null according to aggregate definition))',
$validation->toString()
);
} else {
$this->fail('No exception thrown');
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_throws_exception_when_invalid_state_returned2(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition::class,
],
];

$eventStore = $this->prophesize(EventStore::class);

$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot('test', 'some_id', new TestState(), 4, new \DateTimeImmutable()));

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->assertEquals(
'Failure(UnexpectedValueException(State must be an array according to aggregate definition))',
$validation->toString()
);
} else {
$this->fail('No exception thrown');
}
}

/**
* @test
*/
Expand Down Expand Up @@ -110,11 +295,11 @@ public function it_loads_state_from_empty_snapshot_store(): void
$definition = $this->prophesize(AggregateDefinition::class);
$definition->aggregateType()->willReturn('test')->shouldBeCalled();
$definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled();
$definition->stateType()->willReturn('array')->shouldBeCalled();

$result = k\loadState($message, $definition->reveal(), $snapshotStore->reveal());

$this->assertInternalType('array', $result);
$this->assertEmpty($result);
$this->assertEquals([], $result);
}

/**
Expand All @@ -137,6 +322,7 @@ public function it_loads_state_from_snapshot_store(): void
$definition = $this->prophesize(AggregateDefinition::class);
$definition->aggregateType()->willReturn('test')->shouldBeCalled();
$definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled();
$definition->stateType()->willReturn('array')->shouldBeCalled();

$result = k\loadState($message, $definition->reveal(), $snapshotStore);
$this->assertEquals(['foo' => 'bar'], $result);
Expand All @@ -151,6 +337,7 @@ public function it_returns_early_when_loading_state_and_no_snapshot_store_given(
$message = $message->reveal();

$definition = $this->prophesize(AggregateDefinition::class);
$definition->stateType()->willReturn('array')->shouldBeCalled();

$result = k\loadState($message, $definition->reveal(), null);
$this->assertEquals([], $result);
Expand Down
Loading

0 comments on commit db15268

Please sign in to comment.