Skip to content

Commit

Permalink
Merge a21a83d into 3807ead
Browse files Browse the repository at this point in the history
  • Loading branch information
codeliner committed Jul 18, 2018
2 parents 3807ead + a21a83d commit 2aa8dd1
Show file tree
Hide file tree
Showing 35 changed files with 899 additions and 47 deletions.
1 change: 1 addition & 0 deletions docs/tutorial/bonus_III.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Bonus III - Decoupled Functional Core
2 changes: 1 addition & 1 deletion docs/tutorial/part_VII.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Try to check *John* in again, while keeping an eye on the monitoring app `http:/
## The End

Congratulations! You've mastered the Event Machine tutorial. There are two bonus parts available to learn more
about **custom projections** and **testing with Event Machine**.
about **custom projections**, **testing with Event Machine** and how to achieve a "decoupled functional core".
The current implementation is available as a **demo** branch of `proophsoftware/event-machine-skeleton`.
There is a second branch called **demo-oop** available that contains a similar
implementation, but the `Building` aggregate is designed using an object oriented approach rather than
Expand Down
10 changes: 7 additions & 3 deletions src/Aggregate/ClosureAggregateTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ final class ClosureAggregateTranslator implements EventStoreAggregateTranslator

private $eventApplyMap;

public function __construct(string $aggregateId, array $eventApplyMap)
private $eventClassMap;

public function __construct(string $aggregateId, array $eventApplyMap, array $eventClassMap)
{
$this->aggregateId = $aggregateId;
$this->eventApplyMap = $eventApplyMap;
$this->eventClassMap = $eventClassMap;
}

/**
Expand Down Expand Up @@ -80,8 +83,9 @@ public function reconstituteAggregateFromHistory(AggregateType $aggregateType, I
if (null === $this->aggregateReconstructor) {
$arId = $this->aggregateId;
$eventApplyMap = $this->eventApplyMap;
$this->aggregateReconstructor = function ($historyEvents) use ($arId, $aggregateType, $eventApplyMap) {
return static::reconstituteFromHistory($arId, $aggregateType, $eventApplyMap, $historyEvents);
$eventClassMap = $this->eventClassMap;
$this->aggregateReconstructor = function ($historyEvents) use ($arId, $aggregateType, $eventApplyMap, $eventClassMap) {
return static::reconstituteFromHistory($arId, $aggregateType, $eventApplyMap, $eventClassMap, $historyEvents);
};
}

Expand Down
27 changes: 23 additions & 4 deletions src/Aggregate/GenericAggregateRoot.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Prooph\EventMachine\Aggregate;

use Prooph\EventMachine\Eventing\GenericJsonSchemaEvent;
use Prooph\EventMachine\Messaging\GenericJsonSchemaEvent;
use Prooph\EventSourcing\Aggregate\AggregateType;
use Prooph\EventSourcing\Aggregate\AggregateTypeProvider;
use Prooph\EventSourcing\Aggregate\Exception\RuntimeException;
Expand All @@ -36,6 +36,11 @@ final class GenericAggregateRoot implements AggregateTypeProvider
*/
private $eventApplyMap;

/**
* @var string[]
*/
private $eventClassMap;

/**
* @var mixed
*/
Expand All @@ -58,19 +63,20 @@ final class GenericAggregateRoot implements AggregateTypeProvider
/**
* @throws RuntimeException
*/
protected static function reconstituteFromHistory(string $aggregateId, AggregateType $aggregateType, array $eventApplyMap, \Iterator $historyEvents): self
protected static function reconstituteFromHistory(string $aggregateId, AggregateType $aggregateType, array $eventApplyMap, array $eventClassMap, \Iterator $historyEvents): self
{
$instance = new self($aggregateId, $aggregateType, $eventApplyMap);
$instance = new self($aggregateId, $aggregateType, $eventApplyMap, $eventClassMap);
$instance->replay($historyEvents);

return $instance;
}

public function __construct(string $aggregateId, AggregateType $aggregateType, array $eventApplyMap)
public function __construct(string $aggregateId, AggregateType $aggregateType, array $eventApplyMap, array $eventClassMap)
{
$this->aggregateId = $aggregateId;
$this->aggregateType = $aggregateType;
$this->eventApplyMap = $eventApplyMap;
$this->eventClassMap = $eventClassMap;
}

/**
Expand Down Expand Up @@ -134,6 +140,19 @@ private function apply(GenericJsonSchemaEvent $event): void
{
$apply = $this->eventApplyMap[$event->messageName()];

if (array_key_exists($event->messageName(), $this->eventClassMap)) {
$eventClass = $this->eventClassMap[$event->messageName()];

if (! is_callable([$eventClass, 'fromArray'])) {
throw new \RuntimeException(sprintf(
'Custom event class %s should have a static fromArray method',
$eventClass
));
}

$event = ([$eventClass, 'fromArray'])($event->toArray());
}

if ($this->aggregateState === null) {
$newArState = $apply($event);
} else {
Expand Down
65 changes: 56 additions & 9 deletions src/Commanding/CommandProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Prooph\EventMachine\Aggregate\ContextProvider;
use Prooph\EventMachine\Aggregate\Exception\AggregateNotFound;
use Prooph\EventMachine\Aggregate\GenericAggregateRoot;
use Prooph\EventMachine\Eventing\GenericJsonSchemaEvent;
use Prooph\EventMachine\Messaging\GenericJsonSchemaEvent;
use Prooph\EventSourcing\Aggregate\AggregateRepository;
use Prooph\EventSourcing\Aggregate\AggregateType;
use Prooph\EventStore\EventStore;
Expand All @@ -30,6 +30,11 @@ final class CommandProcessor
*/
private $commandName;

/**
* @var string|null
*/
private $commandClass;

/**
* @var string
*/
Expand Down Expand Up @@ -60,6 +65,11 @@ final class CommandProcessor
*/
private $eventApplyMap;

/**
* @var array
*/
private $eventClassMap;

/**
* @var string
*/
Expand Down Expand Up @@ -141,7 +151,9 @@ public static function fromDescriptionArrayAndDependencies(
$messageFactory,
$eventStore,
$snapshotStore,
$contextProvider
$contextProvider,
$description['commandClass'] ?? null,
$description['eventClassMap'] ?? []
);
}

Expand All @@ -157,7 +169,9 @@ public function __construct(
MessageFactory $messageFactory,
EventStore $eventStore,
SnapshotStore $snapshotStore = null,
ContextProvider $contextProvider = null
ContextProvider $contextProvider = null,
string $commandClass = null,
array $eventClassMap = []
) {
$this->commandName = $commandName;
$this->aggregateType = $aggregateType;
Expand All @@ -171,6 +185,8 @@ public function __construct(
$this->eventStore = $eventStore;
$this->snapshotStore = $snapshotStore;
$this->contextProvider = $contextProvider;
$this->commandClass = $commandClass;
$this->eventClassMap = $eventClassMap;
}

public function __invoke(GenericJsonSchemaCommand $command)
Expand All @@ -194,9 +210,18 @@ public function __invoke(GenericJsonSchemaCommand $command)
$arId = (string) $payload[$this->aggregateIdentifier];
$arRepository = $this->getAggregateRepository($arId);
$arFuncArgs = [];
$commandUuid = $command->uuid()->toString();

if ($this->commandClass) {
if (! is_callable([$this->commandClass, 'fromArray'])) {
throw new \RuntimeException(sprintf('Custom command class %s should have a static fromArray method', $this->commandClass));
}

$command = ([$this->commandClass, 'fromArray'])($command->toArray());
}

if ($this->createAggregate) {
$aggregate = new GenericAggregateRoot($arId, AggregateType::fromString($this->aggregateType), $this->eventApplyMap);
$aggregate = new GenericAggregateRoot($arId, AggregateType::fromString($this->aggregateType), $this->eventApplyMap, $this->eventClassMap);
$arFuncArgs[] = $command;
} else {
/** @var GenericAggregateRoot $aggregate */
Expand Down Expand Up @@ -231,16 +256,38 @@ public function __invoke(GenericJsonSchemaCommand $command)
}

if (! is_array($event) || ! array_key_exists(0, $event) || ! array_key_exists(1, $event)
|| ! is_string($event[0]) || ! is_array($event[1])) {
|| ! is_string($event[0])
|| (! is_array($event[1]) && ! is_object($event[1]))) {
throw new \RuntimeException(sprintf(
'Event returned by aggregate of type %s while handling command %s does not has the format [string eventName, array payload]!',
'Event returned by aggregate of type %s while handling command %s does not have the format [string eventName, array payload | object event]!',
$this->aggregateType,
$this->commandName
));
}

$customEvent = null;

[$eventName, $payload] = $event;

$metadata = [];
if (is_array($payload)) {
$metadata = [];
} else {
//Custom event class used instead of payload array
if (! method_exists($payload, 'toArray')) {
throw new \RuntimeException(sprintf(
'Event %s returned by aggregate of type %s while handling command %s should have a toArray method',
get_class($payload),
$this->aggregateType,
$this->commandName
));
}

$evtArr = $payload->toArray();

$payload = $evtArr['payload'] ?? $evtArr;

$metadata = $evtArr['metadata'] ?? [];
}

if (array_key_exists(2, $event)) {
$metadata = $event[2];
Expand All @@ -258,7 +305,7 @@ public function __invoke(GenericJsonSchemaCommand $command)
$event = $this->messageFactory->createMessageFromArray($eventName, [
'payload' => $payload,
'metadata' => array_merge([
'_causation_id' => $command->uuid()->toString(),
'_causation_id' => $commandUuid,
'_causation_name' => $this->commandName,
], $metadata),
]);
Expand All @@ -275,7 +322,7 @@ private function getAggregateRepository(string $aggregateId): AggregateRepositor
$this->aggregateRepository = new AggregateRepository(
$this->eventStore,
AggregateType::fromString($this->aggregateType),
new ClosureAggregateTranslator($aggregateId, $this->eventApplyMap),
new ClosureAggregateTranslator($aggregateId, $this->eventApplyMap, $this->eventClassMap),
$this->snapshotStore,
new StreamName($this->streamName)
);
Expand Down
11 changes: 9 additions & 2 deletions src/Commanding/CommandProcessorDescription.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

namespace Prooph\EventMachine\Commanding;

use Prooph\EventMachine\Eventing\EventRecorderDescription;
use Prooph\EventMachine\EventMachine;
use Prooph\EventMachine\Messaging\EventRecorderDescription;

final class CommandProcessorDescription
{
Expand All @@ -26,6 +26,11 @@ final class CommandProcessorDescription
*/
private $commandName;

/**
* @var string|null
*/
private $commandClass;

/**
* @var bool
*/
Expand Down Expand Up @@ -53,10 +58,11 @@ final class CommandProcessorDescription
*/
private $contextProvider;

public function __construct(string $commandName, EventMachine $eventMachine)
public function __construct(string $commandName, EventMachine $eventMachine, string $commandClass = null)
{
$this->commandName = $commandName;
$this->eventMachine = $eventMachine;
$this->commandClass = $commandClass;
}

public function withNew(string $aggregateType): self
Expand Down Expand Up @@ -147,6 +153,7 @@ public function __invoke(): array

return [
'commandName' => $this->commandName,
'commandClass' => $this->commandClass,
'createAggregate' => $this->createAggregate,
'aggregateType' => $this->aggregateType,
'aggregateIdentifier' => $this->aggregateIdentifier,
Expand Down
8 changes: 8 additions & 0 deletions src/Commanding/CommandToProcessorRouter.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ final class CommandToProcessorRouter extends AbstractPlugin
*/
private $aggregateDescriptions;

/**
* @var array
*/
private $eventClassMap;

/**
* @var MessageFactory
*/
Expand All @@ -56,13 +61,15 @@ final class CommandToProcessorRouter extends AbstractPlugin
public function __construct(
array $routingMap,
array $aggregateDescriptions,
array $eventClassMap,
MessageFactory $messageFactory,
EventStore $eventStore,
ContextProviderFactory $providerFactory,
SnapshotStore $snapshotStore = null
) {
$this->routingMap = $routingMap;
$this->aggregateDescriptions = $aggregateDescriptions;
$this->eventClassMap = $eventClassMap;
$this->messageFactory = $messageFactory;
$this->eventStore = $eventStore;
$this->contextProviderFactory = $providerFactory;
Expand Down Expand Up @@ -103,6 +110,7 @@ public function onRouteMessage(ActionEvent $actionEvent): void
}

$processorDesc['eventApplyMap'] = $aggregateDesc['eventApplyMap'];
$processorDesc['eventClassMap'] = $this->eventClassMap;

$contextProvider = $processorDesc['contextProvider'] ? $this->contextProviderFactory->build($processorDesc['contextProvider']) : null;

Expand Down

0 comments on commit 2aa8dd1

Please sign in to comment.