Skip to content

Commit

Permalink
userland code does not require to handle carrying the aggregate version
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Oct 18, 2017
1 parent fea4436 commit ba485e5
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 162 deletions.
6 changes: 3 additions & 3 deletions examples/Model/User.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
function registerUser(callable $stateResolver, RegisterUser $command, UniqueEmailGuard $guard): array
{
if ($guard->isUnique($command->email())) {
return [new UserWasRegistered(array_merge($command->payload(), ['version' => 1]))];
return [new UserWasRegistered($command->payload())];
}

return [new UserWasRegisteredWithDuplicateEmail(array_merge($command->payload(), ['version' => ++$stateResolver()['version']]))];
return [new UserWasRegisteredWithDuplicateEmail($command->payload())];
}

const changeUserName = '\Prooph\MicroExample\Model\User\changeUserName';
Expand All @@ -40,7 +40,7 @@ function changeUserName(callable $stateResolver, ChangeUserName $command): array
throw new InvalidArgumentException('Username too short');
}

return [new UserNameWasChanged(array_merge($command->payload(), ['version' => ++$stateResolver()['version']]))];
return [new UserNameWasChanged($command->payload())];
}

const apply = '\Prooph\MicroExample\Model\User\apply';
Expand Down
10 changes: 5 additions & 5 deletions src/AbstractAggregateDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ public function extractAggregateVersion(Message $message): int
{
$versionKey = $this->versionName();

$payload = $message->payload();
$metadata = $message->metadata();

if (! array_key_exists($versionKey, $payload)) {
if (! array_key_exists($versionKey, $metadata)) {
throw new RuntimeException(sprintf(
'Missing aggregate version key "%s" in payload of message %s. Payload was %s',
'Missing aggregate version key "%s" in metadata of message %s. Metadata was %s',
$versionKey,
$message->messageName(),
json_encode($payload)
json_encode($metadata)
));
}

return $payload[$versionKey];
return $metadata[$versionKey];
}

public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher
Expand Down
3 changes: 1 addition & 2 deletions src/AggregateDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public function aggregateType(): string;
public function identifierName(): string;

/**
* Returns the key in message payload and state (array) to identify version number
* If state is an object, it represents the method name to call in order to receive the version
* Returns the key in message metadata to identify version number
*/
public function versionName(): string;

Expand Down
83 changes: 25 additions & 58 deletions src/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Prooph\EventStore\StreamName;
use Prooph\EventStore\TransactionalEventStore;
use Prooph\Micro\AggregateDefinition;
use Prooph\SnapshotStore\Snapshot;
use Prooph\SnapshotStore\SnapshotStore;
use RuntimeException;
use function Phunkie\Functions\function1\compose;
Expand Down Expand Up @@ -64,48 +65,25 @@ function buildCommandDispatcher(
return Failure($e);
}

$stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId) {
$state = loadState($message, $definition, $snapshotStore);
$stateType = $definition->stateType();
$lastVersion = 0;

if ('array' === $stateType && ! is_array($state)) {
throw new \UnexpectedValueException('State must be an array according to aggregate definition');
}
$stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId, &$lastVersion) {
$snapshot = loadSnapshot($message, $definition, $snapshotStore);

if ('array' !== $stateType && null !== $state && ! $state instanceof $stateType) {
throw new \UnexpectedValueException(sprintf(
'State must be an instance of %s or null according to aggregate definition',
$stateType
));
}
$nextVersion = 1;
$state = null;

switch (gettype($state)) {
case 'array':
if (empty($state)) {
$nextVersion = 1;
} else {
$versionKey = $definition->versionName();
if (! array_key_exists($versionKey, $state)) {
throw new RuntimeException(sprintf(
'Missing aggregate version key "%s" in state. State was %s',
$versionKey,
$message->messageName(),
json_encode($state)
));
}
$nextVersion = $state[$versionKey] + 1;
}
break;
case 'object':
$nextVersion = $state->{$definition->versionName()}() + 1;
break;
case 'NULL':
default:
$nextVersion = 1;
break;
if (null !== $snapshot) {
$nextVersion = $snapshot->lastVersion() + 1;
$state = $snapshot->aggregateRoot();
}

$events = loadEvents($eventStore, $definition, $aggregateId, $nextVersion);
$lastEvent = end($events);

if (false !== $lastEvent) {
$lastVersion = $definition->extractAggregateVersion($lastEvent);
}

return $definition->reconstituteState($state, $events);
};
Expand All @@ -122,8 +100,8 @@ function buildCommandDispatcher(
return ImmList(...$events);
};

$enrichEvents = function (ImmList $events) use ($message, $definition, $aggregateId): Kind {
$enricher = getEnricherFor($definition, $aggregateId, $message);
$enrichEvents = function (ImmList $events) use ($message, $definition, $aggregateId, &$lastVersion): Kind {
$enricher = getEnricherFor($definition, $aggregateId, $message, $lastVersion);

return $events->map($enricher);
};
Expand Down Expand Up @@ -153,26 +131,15 @@ function buildCommandDispatcher(
};
}

const loadState = '\Prooph\Micro\Kernel\loadState';
const loadSnapshot = '\Prooph\Micro\Kernel\loadSnapshot';

/**
* @return mixed
*/
function loadState(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null)
function loadSnapshot(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null): ?Snapshot
{
$arrayState = 'array' === $definition->stateType();

if (null === $snapshotStore) {
return $arrayState ? [] : null;
return null;
}

$aggregate = $snapshotStore->get($definition->aggregateType(), $definition->extractAggregateId($message));

if (! $aggregate) {
return $arrayState ? [] : null;
}

return $aggregate->aggregateRoot();
return $snapshotStore->get($definition->aggregateType(), $definition->extractAggregateId($message));
}

const loadEvents = '\Prooph\Micro\Kernel\loadEvents';
Expand All @@ -191,19 +158,19 @@ function loadEvents(
}

if ($definition->hasOneStreamPerAggregate()) {
$streamName = new StreamName($streamName->toString() . '-' . $aggregateId); // append aggregate id to stream name
// append aggregate id to stream name
$streamName = new StreamName($streamName->toString() . '-' . $aggregateId);
}

return $eventStore->load($streamName, $nextVersion, null, $metadataMatcher);
}

const getEnricherFor = '\Prooph\Micro\Kernel\getEnricherFor';

function getEnricherFor(AggregateDefinition $definition, string $aggregateId, Message $message): callable
function getEnricherFor(AggregateDefinition $definition, string $aggregateId, Message $message, int &$lastVersion): callable
{
return function (Message $event) use ($definition, $aggregateId, $message): Message {
$aggregateVersion = $definition->extractAggregateVersion($event);
$metadataEnricher = $definition->metadataEnricher($aggregateId, $aggregateVersion, $message);
return function (Message $event) use ($definition, $aggregateId, $message, &$lastVersion): Message {
$metadataEnricher = $definition->metadataEnricher($aggregateId, ++$lastVersion, $message);

if (null !== $metadataEnricher) {
$event = $metadataEnricher->enrich($event);
Expand Down
4 changes: 2 additions & 2 deletions tests/AbstractAggregateDefinitionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public function it_throws_exception_when_no_id_property_found_during_extraction(
public function it_extracts_aggregate_version(): void
{
$message = $this->prophesize(Message::class);
$message->payload()->willReturn(['version' => 5])->shouldBeCalled();
$message->metadata()->willReturn(['version' => 5])->shouldBeCalled();

$this->assertEquals(5, $this->createDefinition()->extractAggregateVersion($message->reveal()));
}
Expand All @@ -75,7 +75,7 @@ public function it_throws_exception_when_no_version_property_found_during_extrac
$this->expectException(\RuntimeException::class);

$message = $this->prophesize(Message::class);
$message->payload()->willReturn([])->shouldBeCalled();
$message->metadata()->willReturn([])->shouldBeCalled();

$this->createDefinition()->extractAggregateVersion($message->reveal());
}
Expand Down
108 changes: 16 additions & 92 deletions tests/KernelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,82 +175,6 @@ public function it_builds_command_dispatcher_and_dispatches_without_object_state
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_throws_exception_when_invalid_state_returned(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition2::class,
],
];

$eventStore = $this->prophesize(EventStore::class);

$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot('test', 'some_id', ['version' => 4], 4, new \DateTimeImmutable()));

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->assertEquals(
'Failure(UnexpectedValueException(State must be an instance of ProophTest\Micro\TestAsset\TestState or null according to aggregate definition))',
$validation->toString()
);
} else {
$this->fail('No exception thrown');
}
}

/**
* @test
*/
public function it_builds_command_dispatcher_and_throws_exception_when_invalid_state_returned2(): void
{
$commandMap = [
'some_command' => [
'handler' => function (callable $stateResolver, Message $message): array {
$stateResolver();

return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])];
},
'definition' => SingleStreamTestAggregateDefinition::class,
],
];

$eventStore = $this->prophesize(EventStore::class);

$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot('test', 'some_id', new TestState(), 4, new \DateTimeImmutable()));

$dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore);

$command = $this->prophesize(Message::class);
$command->messageName()->willReturn('some_command')->shouldBeCalled();

$validation = $dispatch($command->reveal());

if ($validation instanceof Failure) {
$this->assertEquals(
'Failure(UnexpectedValueException(State must be an array according to aggregate definition))',
$validation->toString()
);
} else {
$this->fail('No exception thrown');
}
}

/**
* @test
*/
Expand Down Expand Up @@ -285,7 +209,7 @@ public function it_does_not_load_events_when_state_is_not_resolved(): void
/**
* @test
*/
public function it_loads_state_from_empty_snapshot_store(): void
public function it_loads_snapshot_from_empty_snapshot_store(): void
{
$snapshotStore = $this->prophesize(SnapshotStore::class);

Expand All @@ -295,17 +219,16 @@ public function it_loads_state_from_empty_snapshot_store(): void
$definition = $this->prophesize(AggregateDefinition::class);
$definition->aggregateType()->willReturn('test')->shouldBeCalled();
$definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled();
$definition->stateType()->willReturn('array')->shouldBeCalled();

$result = k\loadState($message, $definition->reveal(), $snapshotStore->reveal());
$result = k\loadSnapshot($message, $definition->reveal(), $snapshotStore->reveal());

$this->assertEquals([], $result);
$this->assertNull($result);
}

/**
* @test
*/
public function it_loads_state_from_snapshot_store(): void
public function it_loads_snapshot_from_snapshot_store(): void
{
$snapshotStore = new InMemorySnapshotStore();
$snapshotStore->save(new Snapshot(
Expand All @@ -322,25 +245,24 @@ public function it_loads_state_from_snapshot_store(): void
$definition = $this->prophesize(AggregateDefinition::class);
$definition->aggregateType()->willReturn('test')->shouldBeCalled();
$definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled();
$definition->stateType()->willReturn('array')->shouldBeCalled();

$result = k\loadState($message, $definition->reveal(), $snapshotStore);
$this->assertEquals(['foo' => 'bar'], $result);
$result = k\loadSnapshot($message, $definition->reveal(), $snapshotStore);
$this->assertInstanceOf(Snapshot::class, $result);
$this->assertEquals(['foo' => 'bar'], $result->aggregateRoot());
}

/**
* @test
*/
public function it_returns_early_when_loading_state_and_no_snapshot_store_given(): void
public function it_returns_early_when_loading_snapshot_and_no_snapshot_store_given(): void
{
$message = $this->prophesize(Message::class);
$message = $message->reveal();

$definition = $this->prophesize(AggregateDefinition::class);
$definition->stateType()->willReturn('array')->shouldBeCalled();

$result = k\loadState($message, $definition->reveal(), null);
$this->assertEquals([], $result);
$result = k\loadSnapshot($message, $definition->reveal(), null);
$this->assertNull($result);
}

/**
Expand Down Expand Up @@ -506,7 +428,6 @@ public function it_appends_to_stream_during_persist_when_stream_found_using_one_

/**
* @test
* @group by
*/
public function it_creates_stream_during_persist_when_no_stream_found_and_enriches_with_metadata(): void
{
Expand All @@ -525,8 +446,7 @@ public function it_creates_stream_during_persist_when_no_stream_found_and_enrich
$aggregateDefinition = $this->prophesize(AggregateDefinition::class);
$aggregateDefinition->streamName()->willReturn($streamName)->shouldBeCalled();
$aggregateDefinition->hasOneStreamPerAggregate()->willReturn(false)->shouldBeCalled();
$aggregateDefinition->extractAggregateVersion($message)->willReturn(5)->shouldBeCalled();
$aggregateDefinition->metadataEnricher('some_id', 5, $message)->willReturn(new class() implements MetadataEnricher {
$aggregateDefinition->metadataEnricher('some_id', 1, $message)->willReturn(new class() implements MetadataEnricher {
public function enrich(Message $message): Message
{
return $message->withAddedMetadata('key', 'value');
Expand All @@ -539,12 +459,16 @@ public function enrich(Message $message): Message

$events = ImmList($message);

$enricher = k\getEnricherFor($aggregateDefinition, 'some_id', $message);
$lastVersion = 0;

$enricher = k\getEnricherFor($aggregateDefinition, 'some_id', $message, $lastVersion);

$events = $events->map($enricher);

$validation = k\persistEvents($events, $eventStore->reveal(), $aggregateDefinition, 'some_id');

$this->assertEquals(1, $lastVersion);

if ($validation instanceof Failure) {
$this->fail($validation->toString());
}
Expand Down

0 comments on commit ba485e5

Please sign in to comment.