Skip to content

Commit

Permalink
Align adapter according to PES v3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Miertsch committed May 1, 2015
1 parent 7ff2474 commit 999f94e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 116 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: php

php:
- 5.4
- 5.5
- 5.6

Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
}
],
"require": {
"php": ">=5.4",
"php": ">=5.5",
"zendframework/zend-db" : "~2.3",
"zendframework/zend-serializer" : "~2.3",
"beberlei/assert": "~2.0"
},
"require-dev": {
"prooph/event-store": "~2.0",
"prooph/event-store": "dev-master",
"phpunit/phpunit": "3.7.*",
"satooshi/php-coveralls": "dev-master"
},
Expand Down
9 changes: 0 additions & 9 deletions examples/create-schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@
array('aggregate_id' => 'string')
);

//When using the \Prooph\EventStore\Stream\MappedSuperclassStreamStrategy
//you need to create a stream table for each aggregate root superclass
//The aggregate_type metadata column is required for this strategy.
//It stores the subclass information.
$eventStore->getAdapter()->createSchemaFor(
new \Prooph\EventStore\Stream\StreamName('My\Model\Superclass'),
array('aggregate_id' => 'string', 'aggregate_type' => 'string')
);


//The \Prooph\EventStore\Stream\AggregateStreamStrategy needs no existing tables. It creates a new table for
//each aggregate instance. This strategy is not the best choice when working with a RDBMS,
Expand Down
9 changes: 5 additions & 4 deletions scripts/mysql-aggregate-type-stream-template.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
-- Replace [shortclassname] with the lowercase short classname of your aggregate roots, e.g. My\Model\User = user = user_stream

CREATE TABLE IF NOT EXISTS `[shortclassname]_stream` (
`eventId` varchar(200) COLLATE utf8_unicode_ci NOT NULL,
`event_id` varchar(36) COLLATE utf8_unicode_ci NOT NULL,
`version` int(11) NOT NULL,
`eventName` text COLLATE utf8_unicode_ci NOT NULL,
`event_name` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
`event_class` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
`payload` text COLLATE utf8_unicode_ci NOT NULL,
`occurredOn` text COLLATE utf8_unicode_ci NOT NULL,
`created_at` varchar(50) COLLATE utf8_unicode_ci NOT NULL,
`aggregate_id` text COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`eventId`)
PRIMARY KEY (`event_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
13 changes: 0 additions & 13 deletions scripts/mysql-mapped-superclass-stream-template.sql

This file was deleted.

13 changes: 7 additions & 6 deletions scripts/mysql-single-stream-default-schema.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
CREATE TABLE IF NOT EXISTS `event_stream` (
`eventId` varchar(200) COLLATE utf8_unicode_ci NOT NULL,
`event_id` varchar(36) COLLATE utf8_unicode_ci NOT NULL,
`version` int(11) NOT NULL,
`eventName` text COLLATE utf8_unicode_ci NOT NULL,
`event_name` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
`event_class` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
`payload` text COLLATE utf8_unicode_ci NOT NULL,
`occurredOn` text COLLATE utf8_unicode_ci NOT NULL,
`aggregate_id` text COLLATE utf8_unicode_ci NOT NULL,
`aggregate_type` text COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`eventId`)
`created_at` varchar(50) COLLATE utf8_unicode_ci NOT NULL,
`aggregate_id` varchar(36) COLLATE utf8_unicode_ci NOT NULL,
`aggregate_type` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`event_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
97 changes: 50 additions & 47 deletions src/Zf2EventStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,17 @@

namespace Prooph\EventStore\Adapter\Zf2;

use Prooph\EventStore\Adapter\AdapterInterface;
use Prooph\Common\Messaging\DomainEvent;
use Prooph\EventStore\Adapter\Exception\ConfigurationException;
use Prooph\EventStore\Adapter\Feature\TransactionFeatureInterface;
use Prooph\EventStore\Adapter\Feature\CanHandleTransaction;
use Prooph\EventStore\Exception\RuntimeException;
use Prooph\EventStore\Stream\EventId;
use Prooph\EventStore\Stream\EventName;
use Prooph\EventStore\Stream\Stream;
use Prooph\EventStore\Stream\StreamEvent;
use Prooph\EventStore\Stream\StreamName;
use Zend\Db\Adapter\Adapter as ZendDbAdapter;
use Zend\Db\Sql\Ddl\Column\Integer;
use Zend\Db\Sql\Ddl\Column\Text;
use Zend\Db\Sql\Ddl\Column\Varchar;
use Zend\Db\Sql\Ddl\Constraint\PrimaryKey;
use Zend\Db\Sql\Ddl\Constraint\UniqueKey;
use Zend\Db\Sql\Ddl\CreateTable;
use Zend\Db\Sql\Ddl\DropTable;
use Zend\Db\TableGateway\TableGateway;
Expand All @@ -36,7 +32,7 @@
*
* @author Alexander Miertsch <contact@prooph.de>
*/
class Zf2EventStoreAdapter implements AdapterInterface, TransactionFeatureInterface
class Zf2EventStoreAdapter implements \Prooph\EventStore\Adapter\Adapter, CanHandleTransaction
{

/**
Expand Down Expand Up @@ -67,7 +63,7 @@ class Zf2EventStoreAdapter implements AdapterInterface, TransactionFeatureInterf
/**
* @var array
*/
protected $standardColumns = ['eventId', 'eventName', 'occurredOn', 'payload', 'version'];
protected $standardColumns = ['event_id', 'event_name', 'event_class', 'created_at', 'payload', 'version'];

/**
* @param array $configuration
Expand All @@ -93,63 +89,63 @@ public function __construct(array $configuration)
}

/**
* @param Stream $aStream
* @param Stream $stream
* @throws \Prooph\EventStore\Exception\RuntimeException
* @return void
*/
public function create(Stream $aStream)
public function create(Stream $stream)
{
if (count($aStream->streamEvents()) === 0) {
if (count($stream->streamEvents()) === 0) {
throw new RuntimeException(
sprintf(
"Cannot create empty stream %s. %s requires at least one event to extract metadata information",
$aStream->streamName()->toString(),
$stream->streamName()->toString(),
__CLASS__
)
);
}

$firstEvent = $aStream->streamEvents()[0];
$firstEvent = $stream->streamEvents()[0];

$this->createSchemaFor($aStream->streamName(), $firstEvent->metadata());
$this->createSchemaFor($stream->streamName(), $firstEvent->metadata());

$this->appendTo($aStream->streamName(), $aStream->streamEvents());
$this->appendTo($stream->streamName(), $stream->streamEvents());
}

/**
* @param StreamName $aStreamName
* @param StreamName $streamName
* @param array $streamEvents
* @throws \Prooph\EventStore\Exception\StreamNotFoundException If stream does not exist
* @return void
*/
public function appendTo(StreamName $aStreamName, array $streamEvents)
public function appendTo(StreamName $streamName, array $streamEvents)
{
foreach ($streamEvents as $event) {
$this->insertEvent($aStreamName, $event);
$this->insertEvent($streamName, $event);
}
}

/**
* @param StreamName $aStreamName
* @param StreamName $streamName
* @param null|int $minVersion
* @return Stream|null
*/
public function load(StreamName $aStreamName, $minVersion = null)
public function load(StreamName $streamName, $minVersion = null)
{
$events = $this->loadEventsByMetadataFrom($aStreamName, array(), $minVersion);
$events = $this->loadEventsByMetadataFrom($streamName, array(), $minVersion);

return new Stream($aStreamName, $events);
return new Stream($streamName, $events);
}

/**
* @param StreamName $aStreamName
* @param StreamName $streamName
* @param array $metadata
* @param null|int $minVersion
* @return StreamEvent[]
* @return DomainEvent[]
*/
public function loadEventsByMetadataFrom(StreamName $aStreamName, array $metadata, $minVersion = null)
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null)
{
$tableGateway = $this->getTablegateway($aStreamName);
$tableGateway = $this->getTablegateway($streamName);

$sql = $tableGateway->getSql();

Expand Down Expand Up @@ -177,11 +173,7 @@ public function loadEventsByMetadataFrom(StreamName $aStreamName, array $metadat
foreach ($eventsData as $eventData) {
$payload = Serializer::unserialize($eventData->payload, $this->serializerAdapter);

$eventId = new EventId($eventData->eventId);

$eventName = new EventName($eventData->eventName);

$occurredOn = new \DateTime($eventData->occurredOn);
$eventClass = $eventData['event_class'];

//Add metadata stored in table
foreach ($eventData as $key => $value) {
Expand All @@ -190,7 +182,16 @@ public function loadEventsByMetadataFrom(StreamName $aStreamName, array $metadat
}
}

$events[] = new StreamEvent($eventId, $eventName, $payload, (int)$eventData->version, $occurredOn, $metadata);
$events[] = $eventClass::fromArray(
[
'uuid' => $eventData['event_id'],
'name' => $eventData['event_name'],
'version' => (int)$eventData['version'],
'created_at' => $eventData['created_at'],
'payload' => $payload,
'metadata' => $metadata
]
);
}

return $events;
Expand All @@ -212,26 +213,27 @@ public function rollback()
}

/**
* @param StreamName $aStreamName
* @param StreamName $streamName
* @param array $metadata
* @param bool $returnSql
* @return string|null Whether $returnSql is true or not function will return generated sql or execute it directly
*/
public function createSchemaFor(StreamName $aStreamName, array $metadata = array(), $returnSql = false)
public function createSchemaFor(StreamName $streamName, array $metadata = array(), $returnSql = false)
{
$createTable = new CreateTable($this->getTable($aStreamName));
$createTable = new CreateTable($this->getTable($streamName));

$createTable->addColumn(new Varchar('eventId', 200))
$createTable->addColumn(new Varchar('event_id', 100))
->addColumn(new Integer('version'))
->addColumn(new Text('eventName'))
->addColumn(new Varchar('event_name', 100))
->addColumn(new Varchar('event_class', 100))
->addColumn(new Text('payload'))
->addColumn(new Text('occurredOn'));
->addColumn(new Varchar('created_at', 50));

foreach ($metadata as $key => $value) {
$createTable->addColumn(new Varchar($key, 100));
}

$createTable->addConstraint(new PrimaryKey('eventId'));
$createTable->addConstraint(new PrimaryKey('event_id'));

if ($returnSql) {
return $createTable->getSqlString($this->dbAdapter->getPlatform());
Expand All @@ -247,13 +249,13 @@ public function createSchemaFor(StreamName $aStreamName, array $metadata = array
*
* Use this function with caution. All your events will be lost! But it can be useful in migration scenarios.
*
* @param StreamName $aStreamName
* @param StreamName $streamName
* @param bool $returnSql
* @return string|null Whether $returnSql is true or not function will return generated sql or execute it directly
*/
public function dropSchemaFor(StreamName $aStreamName, $returnSql = false)
public function dropSchemaFor(StreamName $streamName, $returnSql = false)
{
$dropTable = new DropTable($this->getTable($aStreamName));
$dropTable = new DropTable($this->getTable($streamName));

if ($returnSql) {
return $dropTable->getSqlString($this->dbAdapter->getPlatform());
Expand All @@ -268,17 +270,18 @@ public function dropSchemaFor(StreamName $aStreamName, $returnSql = false)
* Insert an event
*
* @param StreamName $streamName
* @param StreamEvent $e
* @param DomainEvent $e
* @return void
*/
protected function insertEvent(StreamName $streamName, StreamEvent $e)
protected function insertEvent(StreamName $streamName, DomainEvent $e)
{
$eventData = array(
'eventId' => $e->eventId()->toString(),
'event_id' => $e->uuid()->toString(),
'version' => $e->version(),
'eventName' => $e->eventName()->toString(),
'event_name' => $e->messageName(),
'event_class' => get_class($e),
'payload' => Serializer::serialize($e->payload(), $this->serializerAdapter),
'occurredOn' => $e->occurredOn()->format('Y-m-d\TH:i:s.uO')
'created_at' => $e->createdAt()->format(\DateTime::ISO8601)
);

foreach ($e->metadata() as $key => $value) {
Expand Down

0 comments on commit 999f94e

Please sign in to comment.