-
Notifications
You must be signed in to change notification settings - Fork 185
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Some fixed and added documentation about async jobs, projections, bus…
… and use case
- Loading branch information
Showing
10 changed files
with
426 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# Async Jobs | ||
|
||
All events are published in RabbitMQ through `App\Infrastructure\Share\Event\Publisher\AsyncEventPublisher`. The reason of this is that others can consume this events in background. | ||
|
||
#### How it works? | ||
|
||
The `AsyncEventPublisher` implements 2 important interfaces. | ||
|
||
- `Broadway\EventHandling\EventListener` | ||
- It binds this class to the **EventBus** and invoke method `handle` that collect the events in memory inside the class. | ||
- `Symfony\Component\EventDispatcher\EventSubscriberInterface` | ||
- This binds the class to **{KernelEvents,ConsoleEvents}::TERMINATE** Symfony events and invoke method `publish` | ||
|
||
By that way we're sending the messages to RabbitMQ after respond to the client so we don't lock the client for things not required to wait. | ||
|
||
#### Consume this events | ||
|
||
Create you own consumer: | ||
```php | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Infrastructure\Demo\Event\Consumer; | ||
|
||
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | ||
use PhpAmqpLib\Message\AMQPMessage; | ||
|
||
class DemoEventsConsumer implements ConsumerInterface | ||
{ | ||
public function execute(AMQPMessage $msg): void | ||
{ | ||
var_dump(unserialize($msg->body)); | ||
} | ||
} | ||
``` | ||
|
||
#### Configure you consumer: | ||
|
||
```yaml | ||
old_sound_rabbit_mq: | ||
... | ||
multiple_consumers: | ||
events: | ||
.... | ||
queues: | ||
.... | ||
+ var_dump_all_events: | ||
+ name: var_dump_all_events | ||
+ routing_keys: | ||
+ - 'App.Domain.#' | ||
+ callback: App\Infrastructure\Demo\Event\Consumer\DemoEventsConsumer | ||
``` | ||
|
||
### Running the Consumer | ||
|
||
Buy default all consumers are invoked with container: | ||
|
||
`docker-compose.yml` | ||
```yaml | ||
workers: | ||
image: jorge07/alpine-php:7.2-dev-sf | ||
volumes: | ||
- .:/app | ||
command: ['/app/bin/console', 'rabbitmq:multiple-consumer', 'events'] | ||
``` | ||
|
||
Inside docker container: | ||
`./bin/console rabbitmq:consumer var_dump_all_events` | ||
|
||
Full doc with much better example here: https://github.com/php-amqplib/RabbitMqBundle | ||
|
||
#### Routing keys | ||
|
||
So simple, it replaces namespaces `\` for `.`, example: | ||
|
||
`App\Domain\User\Event` -> `App.Domain.User.Event.UserWasCreated` | ||
|
||
You can bind you consumer to: | ||
|
||
- All events: `#` | ||
- All domain events: `App.Domain.#` | ||
- All domain context boundary events: `App.Domain.User.#` | ||
- A one particular event: `App.Domain.User.Event.UserWasCreated` | ||
- Combination of keys: | ||
- `App.Domain.User.#` | ||
- `App.Domain.Payments.#` | ||
- `App.Domain.Cart.Event.OrderWasCreated` | ||
- `App.Domain.Cart.Event.OrderWasCanceled` | ||
|
||
Much better explained in the official documentation: https://www.rabbitmq.com/tutorials/tutorial-five-python.html | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# Command Bus and Query Bus | ||
|
||
### Why tactician bus and not broadway bus? | ||
|
||
Broadway has a CommandBus implementation but not a QueryBus. The interface does not allow you to return content so you | ||
need to build your own. | ||
|
||
Tactician `CommandHandlerMiddleware::execute` has: | ||
|
||
```php | ||
return $handler->{$methodName}($command); | ||
``` | ||
|
||
It allows you to return content from you `Handlers`, something required for a QueryBus. | ||
|
||
The configuration for a Symfony app will be like that: | ||
|
||
```yaml | ||
tactician: | ||
default_bus: command | ||
method_inflector: tactician.handler.method_name_inflector.invoke | ||
commandbus: | ||
query: | ||
middleware: | ||
- tactician.commandbus.query.middleware.command_handler | ||
command: | ||
middleware: | ||
- tactician.commandbus.command.middleware.command_handler | ||
``` | ||
|
||
So you can create your own middleware for example to generate a backend caching for your read model. | ||
|
||
```yaml | ||
tactician: | ||
... | ||
commandbus: | ||
query: | ||
middleware: | ||
+ - app.bus.query.middleware.cache | ||
- tactician.commandbus.query.middleware.command_handler | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
# Creating a new Projection | ||
|
||
Let's say we want to store the list of emails in a separated index for testing purpose. | ||
|
||
#### Domain definition | ||
|
||
```php | ||
<?php | ||
|
||
namespace App\Domain\User\Query\Projections; | ||
|
||
use Broadway\ReadModel\SerializableReadModel; | ||
|
||
interface UserListProjectionInterface extends SerializableReadModel | ||
{ | ||
} | ||
|
||
``` | ||
|
||
#### Infrastructure implementation | ||
|
||
```php | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Infrastructure\User\Query\Projections; | ||
|
||
use App\Domain\User\Query\Projections\UserListProjectionInterface; | ||
use App\Domain\User\ValueObject\Email; | ||
use Broadway\Serializer\Serializable; | ||
use Ramsey\Uuid\Uuid; | ||
use Ramsey\Uuid\UuidInterface; | ||
|
||
class UserListProjection implements UserListProjectionInterface | ||
{ | ||
/** @var UuidInterface */ | ||
public $uuid; | ||
|
||
/** @var Email */ | ||
public $email; | ||
|
||
public static function fromSerializable(Serializable $event): self | ||
{ | ||
return self::deserialize($event->serialize()); | ||
} | ||
|
||
public static function deserialize(array $data): self | ||
{ | ||
$instance = new self(); | ||
|
||
$instance->uuid = Uuid::fromString($data['uuid']); | ||
$instance->email = Email::fromString($data['email']); | ||
|
||
return $instance; | ||
} | ||
|
||
public function serialize(): array | ||
{ | ||
return [ | ||
'uuid' => $this->getId(), | ||
'email' => (string) $this->email, | ||
]; | ||
} | ||
|
||
public function getId(): string | ||
{ | ||
return $this->uuid->toString(); | ||
} | ||
} | ||
``` | ||
|
||
### Define your Read Model Repository | ||
|
||
```php | ||
<?php | ||
namespace App\Domain\User\Query\Repository\UserEmailListReadModelRepositoryInterface; | ||
|
||
interface UserEmailListReadModelRepositoryInterface { | ||
public function add(UserListProjection $projection): void; | ||
public function replace(string $uuid, Email $email): void; | ||
} | ||
``` | ||
|
||
Then you need to implement the Infrastructure for this. | ||
|
||
#### Create the Projector Listener | ||
|
||
```php | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Infrastructure\User\Query; | ||
|
||
use App\Domain\User\Event\UserWasCreated; | ||
use App\Domain\User\Query\Repository\UserEmailListReadModelRepositoryInterface; | ||
use App\Infrastructure\User\Query\Projections\UserListProjection; | ||
use Broadway\ReadModel\Projector; | ||
|
||
class UserEmailReadProjectionFactory extends Projector | ||
{ | ||
protected function applyUserWasCreated(UserWasCreated $userWasCreated): void | ||
{ | ||
$userReadModel = UserListProjection::deserialize([ | ||
'uuid' => $userWasCreated->uuid, | ||
'email' => $userWasCreated->credentials->email | ||
]); | ||
|
||
$this->repository->add($userReadModel); | ||
} | ||
|
||
protected function applyUserEmailChanged(UserEmailChanged $emailChanged): void | ||
{ | ||
$this->repository->replace($emailChanged->uuid, $emailChanged->email); | ||
} | ||
public function __construct(UserEmailListReadModelRepositoryInterface $repository) | ||
{ | ||
$this->repository = $repository; | ||
} | ||
|
||
/** @var UserEmailListReadModelRepositoryInterface */ | ||
private $repository; | ||
} | ||
``` | ||
|
||
And you're done. | ||
|
||
### Why this works? | ||
|
||
`Broadway\ReadModel\Projector` implements `Broadway\EventHandling\EventListener` so it's automatically added to the service container and tagged as a Broadway event listener. | ||
|
||
`config/services.yaml` | ||
```yaml | ||
|
||
services: | ||
... | ||
_instanceof: | ||
... | ||
Broadway\EventHandling\EventListener: | ||
public: true | ||
tags: | ||
- { name: broadway.domain.event_listener } | ||
``` | ||
The `Broadway/EventSourcing/EventSourcingRepository::save` method will store the events in the EventStore and publish all the events in the event bus: | ||
|
||
```php | ||
<?php | ||
... | ||
public function save(AggregateRoot $aggregate): void | ||
{ | ||
// maybe we can get generics one day.... ;) | ||
Assert::isInstanceOf($aggregate, $this->aggregateClass); | ||
$domainEventStream = $aggregate->getUncommittedEvents(); | ||
$eventStream = $this->decorateForWrite($aggregate, $domainEventStream); | ||
$this->eventStore->append($aggregate->getAggregateRootId(), $eventStream); | ||
$this->eventBus->publish($eventStream); | ||
} | ||
``` | ||
|
||
The projections are automatically added to the EventBus by the Compiler pass of `broadway-bundle`, [see here](https://github.com/broadway/broadway-bundle/blob/master/src/DependencyInjection/RegisterBusSubscribersCompilerPass.php#L66) |
Oops, something went wrong.