Skip to content

Commit

Permalink
Merge pull request #255 from prooph/upcasting
Browse files Browse the repository at this point in the history
Upcasting
  • Loading branch information
codeliner committed Feb 13, 2017
2 parents 83ec738 + a578736 commit 43f644d
Show file tree
Hide file tree
Showing 12 changed files with 744 additions and 1 deletion.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"require-dev": {
"container-interop/container-interop": "^1.1",
"sandrokeil/interop-config": "^2.0.1",
"phpunit/phpunit": "^5.7",
"phpunit/phpunit": "^6.0",
"phpspec/prophecy": "dev-patch-1 as 1.6.2",
"prooph/php-cs-fixer-config": "^0.1.1",
"satooshi/php-coveralls": "^1.0",
Expand Down
59 changes: 59 additions & 0 deletions src/Plugin/UpcastingPlugin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php
/**
* This file is part of the prooph/event-store.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Plugin;

use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\Upcasting\Upcaster;
use Prooph\EventStore\Upcasting\UpcastingIterator;

final class UpcastingPlugin extends AbstractPlugin
{
public const ACTION_EVENT_PRIORITY = -1000;

/**
* @var Upcaster
*/
private $upcaster;

public function __construct(Upcaster $upcaster)
{
$this->upcaster = $upcaster;
}

public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
$upcaster = function (ActionEvent $actionEvent): void {
$streamEvents = $actionEvent->getParam('streamEvents');

if (! $streamEvents instanceof Iterator) {
return;
}

$actionEvent->setParam('streamEvents', new UpcastingIterator($this->upcaster, $streamEvents));
};

$eventStore->attach(
ActionEventEmitterEventStore::EVENT_LOAD,
$upcaster,
self::ACTION_EVENT_PRIORITY
);

$eventStore->attach(
ActionEventEmitterEventStore::EVENT_LOAD_REVERSE,
$upcaster,
self::ACTION_EVENT_PRIORITY
);
}
}
23 changes: 23 additions & 0 deletions src/Upcasting/NoOpEventUpcaster.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php
/**
* This file is part of the prooph/event-store.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Upcasting;

use Prooph\Common\Messaging\Message;

final class NoOpEventUpcaster implements Upcaster
{
public function upcast(Message $message): array
{
return [$message];
}
}
31 changes: 31 additions & 0 deletions src/Upcasting/SingleEventUpcaster.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php
/**
* This file is part of the prooph/event-store.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Upcasting;

use Prooph\Common\Messaging\Message;

abstract class SingleEventUpcaster implements Upcaster
{
public function upcast(Message $message): array
{
if (! $this->canUpcast($message)) {
return [$message];
}

return $this->doUpcast($message);
}

abstract protected function canUpcast(Message $message): bool;

abstract protected function doUpcast(Message $message): array;
}
24 changes: 24 additions & 0 deletions src/Upcasting/Upcaster.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php
/**
* This file is part of the prooph/event-store.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Upcasting;

use Prooph\Common\Messaging\Message;

interface Upcaster
{
/**
* @param Message $message
* @return array of messages
*/
public function upcast(Message $message): array;
}
46 changes: 46 additions & 0 deletions src/Upcasting/UpcasterChain.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php
/**
* This file is part of the prooph/event-store.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Upcasting;

use Prooph\Common\Messaging\Message;

final class UpcasterChain implements Upcaster
{
/**
* @var Upcaster[]
*/
private $upcasters;

public function __construct(Upcaster ...$upcasters)
{
$this->upcasters = $upcasters;
}

public function upcast(Message $message): array
{
$result = [];
$messages = [$message];

foreach ($this->upcasters as $upcaster) {
$result = [];

foreach ($messages as $message) {
$result += $upcaster->upcast($message);
}

$messages = $result;
}

return $result;
}
}
112 changes: 112 additions & 0 deletions src/Upcasting/UpcastingIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php
/**
* This file is part of the prooph/event-store.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Upcasting;

use Iterator;
use Prooph\Common\Messaging\Message;

final class UpcastingIterator implements Iterator
{
/**
* @var Upcaster
*/
private $upcaster;

/**
* @var Iterator
*/
private $innerIterator;

/**
* @var array
*/
private $storedMessages = [];

private $position = 0;

public function __construct(Upcaster $upcaster, Iterator $iterator)
{
$this->upcaster = $upcaster;
$this->innerIterator = $iterator;
}

public function current(): ?Message
{
if (! empty($this->storedMessages)) {
return reset($this->storedMessages);
}

$current = $this->innerIterator->current();

if (null === $current) {
return null;
}

$this->storedMessages = $this->upcaster->upcast($current);

while (empty($this->storedMessages)) {
$this->innerIterator->next();

if (! $this->innerIterator->valid()) {
return null;
}

$this->storedMessages = $this->upcaster->upcast($this->innerIterator->current());
}

return reset($this->storedMessages);
}

public function next(): void
{
++$this->position;

if (! empty($this->storedMessages)) {
array_shift($this->storedMessages);
}

if (! empty($this->storedMessages)) {
return;
}

while (empty($this->storedMessages)) {
$this->innerIterator->next();

if (! $this->innerIterator->valid()) {
return;
}

$this->storedMessages = $this->upcaster->upcast($this->innerIterator->current());
}
}

/**
* @return bool|int
*/
public function key()
{
return $this->position;
}

public function valid(): bool
{
return null !== $this->current();
}

public function rewind(): void
{
$this->storedMessages = [];
$this->innerIterator->rewind();
$this->position = 0;
}
}
Loading

0 comments on commit 43f644d

Please sign in to comment.