Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When, where and how to create Summary Events #49

Open
webdevilopers opened this issue Sep 24, 2020 · 3 comments
Open

When, where and how to create Summary Events #49

webdevilopers opened this issue Sep 24, 2020 · 3 comments

Comments

@webdevilopers
Copy link
Owner

webdevilopers commented Sep 24, 2020

Currently we have a service A that publishes messages to the outside world (e.g Service B) whenever 1 of about 20 stage changes happen in our context.

final class StaffMember extends AggregateRoot
{
    public function assignToJobFunction(JobFunctionId $jobFunctionId, string $jobFunctionName): void
    {
        $this->recordThat(AssignedToJobFunction::with($this->staffMemberId, $jobFunctionId, $jobFunctionName));
    }

    // 19 more state changes
}
final class StaffPublisher extends Producer implements MessageSubscriberInterface
{
    /** @var ProducerInterface */
    protected $producer;

    public function whenAssignedToJobFunction(AssignedToJobFunction $event): void
    {
        $this->publish($event->serialize(), 'acme.staff.staff.member_assigned_to_job_function');
    }
}

The consumer does not require to know all the details leading to the actual state change. And we do not want to add a message broker routing key all the time.

After reading the following article by @mathiasverraes we decided to go for a "Summary Event":

You can use a Projection to generate the Summary Event. A Projection is an event listener that persists state from events it listens to, and either exposes that state by responding to queries, or by emitting new events.

The problem with our current implementation:

When the event reaches our Publisher ("Event Listener") only some state data is in-memory of the event. We could try to query the read model. But there is no guarantee that is has already been updated before.

One way (1) to solve that is to force a delay. Or use a cronjob (2 that sends summaries every 2 minutes. The latter solution would require some kind of publishing log to know which summaries were already published.

Another hack (3) would be to query the read model which may not be up-to-date yet. But then overwrite its data only with the properties that changed taken from the event e.g. "jobFunctionId" + Name.

A different approach (4) we are thinking about is adding an event e.g. "StaffMemberModified" or "StaffMemberDetailsSummarized". This could be fired in addition to all the other domain events and include the entire state.

final class StaffMember extends AggregateRoot
{
    /** @var StaffMemberId */
    private $staffMemberId;

    /** @var JobFunctionId */
    private $jobFunctionId;

    /** @var string */
    private $jobFunctionName;

    // 19+x more properties holding the state only for the extra event, currently not required for protecting invariants
    public function assignToJobFunction(JobFunctionId $jobFunctionId, string $jobFunctionName): void
    {
        $this->recordThat(AssignedToJobFunction::with($this->staffMemberId, $jobFunctionId, $jobFunctionName));
        $this->recordThat(StaffMemberDetailsSummarized::with($this->staffMemberId, $jobFunctionId, $jobFunctionName, // entire state...));
    }

    // 19 more state changes
}

This solves the publisher problem. But it adds the entire state on the event-sourced aggregate-root. This has no technical disadvantages though. It's just more code.

Do you have any suggestions?

Thanks in advance.

Came from:

@tPl0ch
Copy link

tPl0ch commented Sep 24, 2020

The state of the projection can be a separate layer from read and write models. In this case it would be a segregated event layer. In our applications we do this a lot when process summaries need to aggregate over multiple steps. We then store the aggregated state in a separate store/table.

@dgafka
Copy link

dgafka commented Sep 24, 2020

If I understand correctly the problem is that event handler that is supposted to publish "summary event",
is called before corresponding read model is regenerated and both listen on the same event.

If you will run publishing summary event asynchronously and regenerating read model synchronously, then you should avoid the problem.
This will work just as indexes in database. When columns which are indexed are modified, then index (read model) is synchronously updated.

@webdevilopers
Copy link
Owner Author

Thanks for your feedback @dgafka . We prefer to create our read models asynch and run the projections as long-running processes.

I added approach 3. We load the read model when a certain event has happened. It does not matter if the read model already has the current data. We take it as it is. But we use the current data from the event and simply overwrite it in the read model.

Then we publish an summary event.

A consumer takes the event and acts as a process manager. It sends commands that change the state on an aggregate that publishes the events to a segregated event layer. As suggested by @tPl0ch .

This seems to work far so fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants