Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata enricher #157

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/Metadata/MetadataEnricher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

/*
* This file is part of the prooph/event-store package.
* (c) 2014 - 2016 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Prooph\EventStore\Metadata;

use Prooph\Common\Messaging\Message;

interface MetadataEnricher
{
/**
* Return the given message with added metadata.
*
* @param Message $message
*
* @return Message
*/
public function enrich(Message $message);
}
43 changes: 43 additions & 0 deletions src/Metadata/MetadataEnricherAggregate.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

/*
* This file is part of the prooph/event-store package.
* (c) 2014 - 2016 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Prooph\EventStore\Metadata;

use Prooph\Common\Messaging\Message;

final class MetadataEnricherAggregate implements MetadataEnricher
{
/**
* @var array
*/
private $metadataEnrichers;

/**
* @param array $metadataEnrichers
*/
public function __construct(array $metadataEnrichers)
{
$this->metadataEnrichers = $metadataEnrichers;
}

/**
* @param Message $message
*
* @return Message
*/
public function enrich(Message $message)
{
foreach ($this->metadataEnrichers as $metadataEnricher) {
$message = $metadataEnricher->enrich($message);
}

return $message;
}
}
94 changes: 94 additions & 0 deletions src/Metadata/MetadataEnricherPlugin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

/*
* This file is part of the prooph/event-store package.
* (c) 2014 - 2016 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Prooph\EventStore\Metadata;

use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\Plugin;
use Prooph\EventStore\Stream\Stream;

final class MetadataEnricherPlugin implements Plugin
{
/**
* @var MetadataEnricher
*/
private $metadataEnricher;

/**
* @param MetadataEnricher $metadataEnricher
*/
public function __construct(MetadataEnricher $metadataEnricher)
{
$this->metadataEnricher = $metadataEnricher;
}

/**
* @param EventStore $eventStore
*/
public function setUp(EventStore $eventStore)
{
$eventEmitter = $eventStore->getActionEventEmitter();

$eventEmitter->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
$eventEmitter->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);
}

/**
* Add event metadata on event store createStream.
*
* @param ActionEvent $createEvent
*/
public function onEventStoreCreateStream(ActionEvent $createEvent)
{
$stream = $createEvent->getParam('stream');

if (!$stream instanceof Stream) {
return;
}

$streamEvents = $stream->streamEvents();
$streamEvents = $this->handleRecordedEvents($streamEvents);

$createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));
}

/**
* Add event metadata on event store appendToStream.
*
* @param ActionEvent $appendToStreamEvent
*/
public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
{
$streamEvents = $appendToStreamEvent->getParam('streamEvents');
$streamEvents = $this->handleRecordedEvents($streamEvents);

$appendToStreamEvent->setParam('streamEvents', $streamEvents);
}

/**
* This method takes domain events as argument which are going to be added
* to the event stream and add the metadata via the MetadataEnricher.
*
* @param Iterator $events
*
* @return Iterator
*/
private function handleRecordedEvents(\Iterator $events)
{
$enrichedEvents = [];

foreach ($events as $event) {
$enrichedEvents[] = $this->metadataEnricher->enrich($event);
}

return new \ArrayIterator($enrichedEvents);
}
}
64 changes: 64 additions & 0 deletions tests/Metadata/MetadataEnricherAggregateTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

/*
* This file is part of the prooph/event-store package.
* (c) 2014 - 2016 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace ProophTest\EventStore\Metadata;

use Prooph\Common\Messaging\Message;
use Prooph\EventStore\Metadata\MetadataEnricher;
use Prooph\EventStore\Metadata\MetadataEnricherAggregate;
use ProophTest\EventStore\Mock\TestDomainEvent;
use ProophTest\EventStore\TestCase;
use Prophecy\Argument;

final class MetadataEnricherAggregateTest extends TestCase
{
/**
* @test
*/
public function it_aggregates_metadata_enrichers()
{
// Mocks
$metadataEnricher1 = $this->prophesize(MetadataEnricher::class);
$metadataEnricher2 = $this->prophesize(MetadataEnricher::class);

// Class under test
$metadataEnricherAgg = new MetadataEnricherAggregate([
$metadataEnricher1->reveal(),
$metadataEnricher2->reveal(),
]);

// Initial payload and expected data
$originalEvent = TestDomainEvent::with(['foo' => 'bar'], 1);
$eventAfterEnricher1 = $originalEvent->withAddedMetadata('meta1', 'data1');
$eventAfterEnricher2 = $eventAfterEnricher1->withAddedMetadata('meta2', 'data2');

// Prepare mock
$metadataEnricher1
->enrich(Argument::type(Message::class))
->shouldBeCalledTimes(1)
->willReturn($eventAfterEnricher1);

$metadataEnricher2
->enrich(Argument::type(Message::class))
->shouldBeCalledTimes(1)
->willReturn($eventAfterEnricher2);

// Call method under test
$enrichedEvent = $metadataEnricherAgg->enrich($originalEvent);

// Assertions
$this->assertEquals($originalEvent->payload(), $enrichedEvent->payload());
$this->assertEquals($originalEvent->version(), $enrichedEvent->version());
$this->assertEquals($originalEvent->createdAt(), $enrichedEvent->createdAt());

$expectedMetadata = ['meta1' => 'data1', 'meta2' => 'data2'];
$this->assertEquals($expectedMetadata, $enrichedEvent->metadata());
}
}
112 changes: 112 additions & 0 deletions tests/Metadata/MetadataEnricherPluginTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php

/*
* This file is part of the prooph/event-store package.
* (c) 2014 - 2016 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace ProophTest\EventStore\Metadata;

use ProophTest\EventStore\Mock\UserCreated;
use ProophTest\EventStore\TestCase;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\DefaultActionEvent;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Metadata\MetadataEnricher;
use Prooph\EventStore\Metadata\MetadataEnricherPlugin;
use Prooph\EventStore\Stream\Stream;
use Prooph\EventStore\Stream\StreamName;
use Prophecy\Argument;

final class MetadataEnricherPluginTest extends TestCase
{
/**
* @test
*/
public function it_attaches_itself_to_event_store_events()
{
$metadataEnricher = $this->prophesize(MetadataEnricher::class);
$eventStore = $this->prophesize(EventStore::class);
$eventEmitter = $this->prophesize(ActionEventEmitter::class);

$createStreamListener = null;
$appendToStreamListener = null;

$eventStore->getActionEventEmitter()->willReturn($eventEmitter);
$eventEmitter->attachListener('create.pre', Argument::any(), -1000)->will(
function ($args) use (&$createStreamListener) {
$createStreamListener = $args[1];
}
);
$eventEmitter->attachListener('appendTo.pre', Argument::any(), -1000)->will(
function ($args) use (&$appendToStreamListener) {
$appendToStreamListener = $args[1];
}
);

$plugin = new MetadataEnricherPlugin($metadataEnricher->reveal());

$plugin->setUp($eventStore->reveal());

$this->assertEquals([$plugin, 'onEventStoreCreateStream'], $createStreamListener);
$this->assertEquals([$plugin, 'onEventStoreAppendToStream'], $appendToStreamListener);
}

/**
* @test
*/
public function it_enrich_metadata_on_stream_create()
{
$metadataEnricher = $this->prophesize(MetadataEnricher::class);
$plugin = new MetadataEnricherPlugin($metadataEnricher->reveal());

$messageEvent = UserCreated::with(['name' => 'Test'], 1);
$stream = new Stream(new StreamName('test'), new \ArrayIterator([$messageEvent]));
$actionEvent = new DefaultActionEvent('create.pre');
$actionEvent->setParam('stream', $stream);

$metadataEnricher->enrich($messageEvent)->willReturn(
$messageEvent->withAddedMetadata('foo', 'bar')
);

$plugin->onEventStoreCreateStream($actionEvent);

// Assertion on event in the stream
$streamEvents = $actionEvent->getParam('stream')->streamEvents();
$this->assertCount(1, $streamEvents);
$this->assertEquals($messageEvent->payload(), $streamEvents[0]->payload());
$this->assertEquals($messageEvent->version(), $streamEvents[0]->version());
$this->assertEquals($messageEvent->createdAt(), $streamEvents[0]->createdAt());
$this->assertEquals(['foo' => 'bar'], $streamEvents[0]->metadata());
}

/**
* @test
*/
public function it_enrich_metadata_on_stream_appendTo()
{
$metadataEnricher = $this->prophesize(MetadataEnricher::class);
$plugin = new MetadataEnricherPlugin($metadataEnricher->reveal());

$messageEvent = UserCreated::with(['name' => 'Test'], 1);
$actionEvent = new DefaultActionEvent('appendTo.pre');
$actionEvent->setParam('streamEvents', new \ArrayIterator([$messageEvent]));

$metadataEnricher->enrich($messageEvent)->willReturn(
$messageEvent->withAddedMetadata('foo', 'bar')
);

$plugin->onEventStoreAppendToStream($actionEvent);

// Assertion on event in the stream
$streamEvents = $actionEvent->getParam('streamEvents');
$this->assertCount(1, $streamEvents);
$this->assertEquals($messageEvent->payload(), $streamEvents[0]->payload());
$this->assertEquals($messageEvent->version(), $streamEvents[0]->version());
$this->assertEquals($messageEvent->createdAt(), $streamEvents[0]->createdAt());
$this->assertEquals(['foo' => 'bar'], $streamEvents[0]->metadata());
}
}