Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to merge event-sourced aggregate roots (A+ES) with Prooph PDO event stream #45

Open
webdevilopers opened this issue Jul 31, 2020 · 3 comments

Comments

@webdevilopers
Copy link
Owner

webdevilopers commented Jul 31, 2020

All events of multiple aggregates have to be selected, changed in some domain relevant ways and be merged into a new aggregate.

I decided to select the events from the default repository using the metadata matcher.

I did not create a new aggregate by calling the factory methods with the new data. Since the data passed was already valid at the time to original events took place and were applied.
Instead I decided to use reflection to pass the "recorded" events.

This was inspired by @prooph code:

namespace Prooph\Common\Messaging;

abstract class DomainMessage implements Message
{
    public static function fromArray(array $messageData): DomainMessage
    {
        MessageDataAssertion::assert($messageData);

        $messageRef = new \ReflectionClass(\get_called_class());

        /** @var $message DomainMessage */
        $message = $messageRef->newInstanceWithoutConstructor();

        $message->uuid = Uuid::fromString($messageData['uuid']);
        $message->messageName = $messageData['message_name'];
        $message->metadata = $messageData['metadata'];
        $message->createdAt = $messageData['created_at'];
        $message->setPayload($messageData['payload']);

        return $message;
    }
}

At the bottom line a lot of prooph-inspired code was used. At the bottom line I think there is not too much coupling.
This example will have a unit test with an in-memory solution. Will add it soon.

Just like any other use case the new aggregate is stored to the repository. The event publisher publishes all events and the projector create the read models and process manager eventually publish some messages to the outside world.
Another process manager will catch the final MergedWithStaffMembers event and fire some "removeMergedStaffMember" commands.

Would love to have your feedback on this approach @Ocramius, @prolic.

The factory:

<?php

namespace Acme\Staff\Domain\Service;

use DomainException;
use Prooph\EventSourcing\AggregateChanged;
use Ramsey\Uuid\Uuid;
use ReflectionClass;
use ReflectionProperty;
use Acme\Staff\Domain\Model\StaffMember\ContractId;
use Acme\Staff\Domain\Model\StaffMember\EmploymentPeriod;
use Acme\Staff\Domain\Model\StaffMember\Event\MergedWithStaffMembers;
use Acme\Staff\Domain\Model\StaffMember\Event\StaffMemberAdded;
use Acme\Staff\Domain\Model\StaffMember\Event\StaffMemberContractModified;
use Acme\Staff\Domain\Model\StaffMember\StaffMember;
use Acme\Staff\Domain\Model\StaffMember\StaffMemberId;
use Acme\Staff\Domain\Model\StaffMember\StaffMemberRepository;

final class MergedStaffMember
{
    /** @var StaffMemberRepository */
    private $staffMemberRepository;

    /**
     * Current version
     *
     * @var int
     */
    private $version = 0;

    /**
     * List of events that are not committed to the EventStore
     *
     * @var AggregateChanged[]
     */
    private $recordedEvents = [];

    /** @var StaffMemberId */
    private $newStaffMemberId;

    /** @var StaffMemberId[] */
    private $mergeWithStaffMemberIds = [];

    /** @var ContractId */
    private $newContractId;

    /** @var EmploymentPeriod */
    private $newEmploymentPeriod;

    public function __construct(StaffMemberRepository $staffMemberRepository)
    {
        $this->staffMemberRepository = $staffMemberRepository;
    }

    public function fromMergedHistory(
        StaffMemberId $newStaffMemberId, array $mergeWithStaffMemberIds,
        ContractId $newContractId, EmploymentPeriod $newEmploymentPeriod
    ): StaffMember
    {
        if (0 === count($mergeWithStaffMemberIds)) {
            throw new DomainException('Missing staff members to merge');
        }

        $this->newStaffMemberId = $newStaffMemberId;
        $this->mergeWithStaffMemberIds = $mergeWithStaffMemberIds;
        $this->newContractId = $newContractId;
        $this->newEmploymentPeriod = $newEmploymentPeriod;

        $this->buildHistoryFromMergedStaffMembers();
        $this->finalizeNewStaffMemberHistory();

        $newStaffMemberRef = new ReflectionClass(StaffMember::class);

        /** @var StaffMember $newStaffMember */
        $newStaffMember = $newStaffMemberRef->newInstanceWithoutConstructor();

        $newStaffMemberRecordedEventsRef = new ReflectionProperty($newStaffMember, 'recordedEvents');
        $newStaffMemberRecordedEventsRef->setAccessible(true);
        $newStaffMemberRecordedEventsRef->setValue($newStaffMember, $this->recordedEvents);

        $newStaffMemberStaffMemberIdRef = new ReflectionProperty($newStaffMember, 'staffMemberId');
        $newStaffMemberStaffMemberIdRef->setAccessible(true);
        $newStaffMemberStaffMemberIdRef->setValue($newStaffMember, $newStaffMemberId);

        return $newStaffMember;
    }

    private function buildHistoryFromMergedStaffMembers(): void
    {
        $oldEvents = $this->staffMemberRepository->ofStaffMemberIds($this->mergeWithStaffMemberIds);

        // Ensure chronological order
        uasort($oldEvents, function(AggregateChanged $a, AggregateChanged $b) {
            return $a->createdAt() <=> $b->createdAt();
        });

        $initialStaffMemberId = StaffMemberId::fromString(reset($oldEvents)->aggregateId());

        /** @var AggregateChanged[] $oldEvent */
        foreach ($oldEvents as $oldEvent) {
            $newMessageData = $oldEvent->toArray();
            // The new event needs an own unique ID.
            $newMessageData['uuid'] = Uuid::uuid4()->toString();
            // Set the new staff member ID instead of the merged one.
            $newMessageData['metadata']['_aggregate_id'] = $this->newStaffMemberId->toString();
            // This will be automatically reset correctly.
            unset($newMessageData['metadata']['_position']);

            if ($oldEvent instanceof StaffMemberAdded) {
                /** @var StaffMemberAdded $oldEvent */
                if (!$oldEvent->staffMemberId()->sameValueAs($initialStaffMemberId)) {
                    // Only the initial event can add a staff member.
                    // All other events can only be modifications of the contract.
                    $newMessageData['message_name'] = StaffMemberContractModified::class;
                }
            }

            if ($oldEvent instanceof StaffMemberAdded || $oldEvent instanceof StaffMemberContractModified) {
                $newMessageData['payload']['contractId'] = $this->newContractId->toString();
                // Set new employment period to satisfy all time-period relevant policies.
                $newMessageData['payload']['employmentPeriod'] = $this->newEmploymentPeriod->toArray();
            }

            $eventClassName = $newMessageData['message_name'];

            $newEvent = $eventClassName::fromArray($newMessageData);

            $this->recordThat($newEvent);
        }
    }

    private function finalizeNewStaffMemberHistory(): void
    {
        // Create final event
        $mergedWithStaffMembers = MergedWithStaffMembers::with(
            $this->newStaffMemberId, $this->mergeWithStaffMemberIds,
            $this->newContractId, $this->newEmploymentPeriod
        );
        $mergedWithStaffMembers = $mergedWithStaffMembers
            ->withAddedMetadata('_aggregate_type', StaffMember::class)
        ;

        $this->recordThat($mergedWithStaffMembers);
    }

    /**
     * Record an aggregate changed event
     */
    protected function recordThat(AggregateChanged $event): void
    {
        $this->version += 1;

        $this->recordedEvents[] = $event->withVersion($this->version);
    }
}

The command handler:

<?php

namespace Acme\Staff\Application\Service\StaffMember;

use Acme\Staff\Domain\Model\StaffMember\StaffMemberRepository;
use Acme\Staff\Domain\Service\MergedStaffMember;

final class MergeStaffMembersHandler
{
    /** @var MergedStaffMember */
    private $mergedStaffMember;

    /** @var StaffMemberRepository */
    private $staffMemberRepository;

    public function __construct(MergedStaffMember $mergedStaffMember, StaffMemberRepository $staffMemberRepository)
    {
        $this->mergedStaffMember = $mergedStaffMember;
        $this->staffMemberRepository = $staffMemberRepository;
    }

    public function __invoke(MergeStaffMembers $command): void
    {
        $newStaffMember = $this->mergedStaffMember->fromMergedHistory(
            $command->newStaffMemberId(),
            $command->mergeWithStaffMemberIds(),
            $command->newContractId(),
            $command->newEmploymentPeriod()
        );

        $this->staffMemberRepository->save($newStaffMember);
    }
}

Just some framework - Symfony and YAML for Marco ;) - config:

services:

    Rewotec\Staff\Domain\Service\MergedStaffMember:
      arguments:
        - '@staff_member_collection'

    Rewotec\Staff\Application\Service\StaffMember\MergeStaffMembersHandler:
        public: true
        tags: [messenger.message_handler]
        arguments:
            - '@Rewotec\Staff\Domain\Service\MergedStaffMember'
            - '@staff_member_collection'
@prolic
Copy link

prolic commented Aug 3, 2020

I would prefer not to inject the repository into the aggregate root. Do it from the outside instead.

@webdevilopers
Copy link
Owner Author

Actually MergedEmploymentContract is NOT an aggregate root. It's just a factory using the typical recordThat method of prooph which indeed is include in the AggregateRoot class. But that one is not extended here.
The factory is just not named "Factory" but after the result aggregate to expect. There is no "natural" aggregate that comes from the merging. It's just written to the regular Employment Contract stream.

@prolic
Copy link

prolic commented Aug 3, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants