Skip to content

Commit

Permalink
merge IndexingStrategy & TableNameGeneratorStrategy into PersistenceS…
Browse files Browse the repository at this point in the history
…trategy
  • Loading branch information
prolic committed Dec 2, 2016
1 parent 7679b50 commit 88ada1e
Show file tree
Hide file tree
Showing 27 changed files with 218 additions and 298 deletions.
5 changes: 2 additions & 3 deletions src/Container/AbstractEventStoreFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ public function __invoke(ContainerInterface $container): EventStore
$container->get($config['message_factory']),
$messageConverter = $container->get($config['message_converter']),
$connection,
$container->get($config['indexing_strategy']),
$container->get($config['table_name_generator_strategy']),
$container->get($config['persistence_strategy']),
$config['load_batch_size'],
$config['event_streams_table']
);
Expand All @@ -126,7 +125,7 @@ public function dimensions(): array
public function mandatoryOptions(): array
{
return [
'indexing_strategy',
'persistence_strategy',
];
}
}
2 changes: 0 additions & 2 deletions src/Container/MySQLEventStoreFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use Prooph\Common\Messaging\NoOpMessageConverter;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\PDO\MySQLEventStore;
use Prooph\EventStore\PDO\TableNameGeneratorStrategy\Sha1;

final class MySQLEventStoreFactory extends AbstractEventStoreFactory
{
Expand Down Expand Up @@ -51,7 +50,6 @@ public function defaultOptions(): array
'dbname' => 'event_store',
'port' => 3306,
],
'table_name_generator_strategy' => Sha1::class,
'load_batch_size' => 1000,
'event_streams_table' => 'event_streams',
'message_converter' => NoOpMessageConverter::class,
Expand Down
2 changes: 0 additions & 2 deletions src/Container/PostgresEventStoreFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Prooph\Common\Messaging\FQCNMessageFactory;
use Prooph\Common\Messaging\NoOpMessageConverter;
use Prooph\EventStore\PDO\PostgresEventStore;
use Prooph\EventStore\PDO\TableNameGeneratorStrategy\Sha1;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;

final class PostgresEventStoreFactory extends AbstractEventStoreFactory
Expand Down Expand Up @@ -54,7 +53,6 @@ public function defaultOptions(): array
'dbname' => 'event_store',
'port' => 5432,
],
'table_name_generator_strategy' => Sha1::class,
'load_batch_size' => 1000,
'event_streams_table' => 'event_streams',
'message_converter' => NoOpMessageConverter::class,
Expand Down
43 changes: 15 additions & 28 deletions src/MySQLEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,9 @@ final class MySQLEventStore extends AbstractActionEventEmitterEventStore
private $connection;

/**
* @var IndexingStrategy
* @var PersistenceStrategy
*/
private $indexingStrategy;

/**
* @var TableNameGeneratorStrategy
*/
private $tableNameGeneratorStrategy;
private $persistenceStrategy;

/**
* @var int
Expand All @@ -69,8 +64,7 @@ public function __construct(
MessageFactory $messageFactory,
MessageConverter $messageConverter,
PDO $connection,
IndexingStrategy $indexingStrategy,
TableNameGeneratorStrategy $tableNameGeneratorStrategy,
PersistenceStrategy $persistenceStrategy,
int $loadBatchSize = 10000,
string $eventStreamsTable = 'event_streams'
) {
Expand All @@ -82,8 +76,7 @@ public function __construct(
$this->messageFactory = $messageFactory;
$this->messageConverter = $messageConverter;
$this->connection = $connection;
$this->indexingStrategy = $indexingStrategy;
$this->tableNameGeneratorStrategy = $tableNameGeneratorStrategy;
$this->persistenceStrategy = $persistenceStrategy;
$this->loadBatchSize = $loadBatchSize;
$this->eventStreamsTable = $eventStreamsTable;

Expand All @@ -95,7 +88,7 @@ function (ActionEvent $event) use ($actionEventEmitter): void {
$streamName = $stream->streamName();

try {
$tableName = $this->tableNameGeneratorStrategy->__invoke($streamName);
$tableName = $this->persistenceStrategy->generateTableName($streamName);
$this->createSchemaFor($tableName);
} catch (RuntimeException $e) {
$this->connection->exec("DROP TABLE $tableName;");
Expand Down Expand Up @@ -133,23 +126,17 @@ function (ActionEvent $event): void {
$streamName = $event->getParam('streamName');
$streamEvents = $event->getParam('streamEvents');

$columnNames = $this->indexingStrategy->columnNames();

$data = [];
$countEntries = 0;

foreach ($streamEvents as $streamEvent) {
$countEntries++;
$data = $this->indexingStrategy->prepareData($streamEvent, $data);
}
$countEntries = iterator_count($streamEvents);
$columnNames = $this->persistenceStrategy->columnNames();
$data = $this->persistenceStrategy->prepareData($streamEvents);

if (empty($data)) {
$event->setParam('result', true);

return;
}

$tableName = $this->tableNameGeneratorStrategy->__invoke($streamName);
$tableName = $this->persistenceStrategy->generateTableName($streamName);

$rowPlaces = '(' . implode(', ', array_fill(0, count($columnNames), '?')) . ')';
$allPlaces = implode(', ', array_fill(0, $countEntries, $rowPlaces));
Expand Down Expand Up @@ -179,7 +166,7 @@ function (ActionEvent $event): void {
$this->connection->commit();
}

if (in_array($statement->errorCode(), $this->indexingStrategy->uniqueViolationErrorCodes(), true)) {
if (in_array($statement->errorCode(), $this->persistenceStrategy->uniqueViolationErrorCodes(), true)) {
throw new ConcurrencyException();
}

Expand All @@ -206,7 +193,7 @@ function (ActionEvent $event): void {
$metadataMatcher = new MetadataMatcher();
}

$tableName = $this->tableNameGeneratorStrategy->__invoke($streamName);
$tableName = $this->persistenceStrategy->generateTableName($streamName);
$sql = [
'from' => "SELECT * FROM $tableName",
'orderBy' => 'ORDER BY no ASC',
Expand Down Expand Up @@ -278,7 +265,7 @@ function (ActionEvent $event): void {
$metadataMatcher = new MetadataMatcher();
}

$tableName = $this->tableNameGeneratorStrategy->__invoke($streamName);
$tableName = $this->persistenceStrategy->generateTableName($streamName);
$sql = [
'from' => "SELECT * FROM $tableName",
'orderBy' => 'ORDER BY no DESC',
Expand Down Expand Up @@ -354,7 +341,7 @@ function (ActionEvent $event): void {
$statement = $this->connection->prepare($deleteEventStreamTableEntrySql);
$statement->execute([$streamName->toString()]);

$encodedStreamName = $this->tableNameGeneratorStrategy->__invoke($streamName);
$encodedStreamName = $this->persistenceStrategy->generateTableName($streamName);
$deleteEventStreamSql = <<<EOT
DROP TABLE IF EXISTS $encodedStreamName;
EOT;
Expand Down Expand Up @@ -416,7 +403,7 @@ function (ActionEvent $event): void {
private function addStreamToStreamsTable(Stream $stream): void
{
$realStreamName = $stream->streamName()->toString();
$streamName = $this->tableNameGeneratorStrategy->__invoke($stream->streamName());
$streamName = $this->persistenceStrategy->generateTableName($stream->streamName());
$metadata = json_encode($stream->metadata());

$sql = <<<EOT
Expand All @@ -438,7 +425,7 @@ private function addStreamToStreamsTable(Stream $stream): void

private function createSchemaFor(string $tableName): void
{
$schema = $this->indexingStrategy->createSchema($tableName);
$schema = $this->persistenceStrategy->createSchema($tableName);

foreach ($schema as $command) {
$statement = $this->connection->prepare($command);
Expand Down
9 changes: 6 additions & 3 deletions src/IndexingStrategy.php → src/PersistenceStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

namespace Prooph\EventStore\PDO;

use Prooph\Common\Messaging\Message;
use Iterator;
use Prooph\EventStore\StreamName;

interface IndexingStrategy
interface PersistenceStrategy
{
/**
* @param string $tableName
Expand All @@ -24,10 +25,12 @@ public function createSchema(string $tableName): array;

public function columnNames(): array;

public function prepareData(Message $message, array $data): array;
public function prepareData(Iterator $streamEvents): array;

/**
* @return string[]
*/
public function uniqueViolationErrorCodes(): array;

public function generateTableName(StreamName $streamName): string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@

declare(strict_types=1);

namespace Prooph\EventStore\PDO\IndexingStrategy;
namespace Prooph\EventStore\PDO\PersistenceStrategy;

use Prooph\Common\Messaging\Message;
use Iterator;
use Prooph\EventStore\PDO\Exception;
use Prooph\EventStore\PDO\IndexingStrategy;
use Prooph\EventStore\PDO\PersistenceStrategy;
use Prooph\EventStore\StreamName;

final class MySQLAggregateStreamStrategy implements IndexingStrategy
final class MySQLAggregateStreamStrategy implements PersistenceStrategy
{
/**
* @param string $tableName
Expand Down Expand Up @@ -52,18 +53,22 @@ public function columnNames(): array
];
}

public function prepareData(Message $message, array $data): array
public function prepareData(Iterator $streamEvents): array
{
if (! isset($message->metadata()['_aggregate_version'])) {
throw new Exception\RuntimeException('_aggregate_version is missing in metadata');
}
$data = [];

foreach ($streamEvents as $event) {
if (! isset($event->metadata()['_aggregate_version'])) {
throw new Exception\RuntimeException('_aggregate_version is missing in metadata');
}

$data[] = $message->metadata()['_aggregate_version'];
$data[] = $message->uuid()->toString();
$data[] = $message->messageName();
$data[] = json_encode($message->payload());
$data[] = json_encode($message->metadata());
$data[] = $message->createdAt()->format('Y-m-d\TH:i:s.u');
$data[] = $event->metadata()['_aggregate_version'];
$data[] = $event->uuid()->toString();
$data[] = $event->messageName();
$data[] = json_encode($event->payload());
$data[] = json_encode($event->metadata());
$data[] = $event->createdAt()->format('Y-m-d\TH:i:s.u');
}

return $data;
}
Expand All @@ -75,4 +80,9 @@ public function uniqueViolationErrorCodes(): array
{
return ['23000'];
}

public function generateTableName(StreamName $streamName): string
{
return '_' . sha1($streamName->toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

declare(strict_types=1);

namespace Prooph\EventStore\PDO\IndexingStrategy;
namespace Prooph\EventStore\PDO\PersistenceStrategy;

use Prooph\Common\Messaging\Message;
use Prooph\EventStore\PDO\IndexingStrategy;
use Iterator;
use Prooph\EventStore\PDO\PersistenceStrategy;
use Prooph\EventStore\StreamName;

final class MySQLSimpleStreamStrategy implements IndexingStrategy
final class MySQLSimpleStreamStrategy implements PersistenceStrategy
{
/**
* @param string $tableName
Expand Down Expand Up @@ -50,13 +51,17 @@ public function columnNames(): array
];
}

public function prepareData(Message $message, array $data): array
public function prepareData(Iterator $streamEvents): array
{
$data[] = $message->uuid()->toString();
$data[] = $message->messageName();
$data[] = json_encode($message->payload());
$data[] = json_encode($message->metadata());
$data[] = $message->createdAt()->format('Y-m-d\TH:i:s.u');
$data = [];

foreach ($streamEvents as $event) {
$data[] = $event->uuid()->toString();
$data[] = $event->messageName();
$data[] = json_encode($event->payload());
$data[] = json_encode($event->metadata());
$data[] = $event->createdAt()->format('Y-m-d\TH:i:s.u');
}

return $data;
}
Expand All @@ -68,4 +73,9 @@ public function uniqueViolationErrorCodes(): array
{
return ['23000'];
}

public function generateTableName(StreamName $streamName): string
{
return '_' . sha1($streamName->toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

declare(strict_types=1);

namespace Prooph\EventStore\PDO\IndexingStrategy;
namespace Prooph\EventStore\PDO\PersistenceStrategy;

use Prooph\Common\Messaging\Message;
use Prooph\EventStore\PDO\IndexingStrategy;
use Iterator;
use Prooph\EventStore\PDO\PersistenceStrategy;
use Prooph\EventStore\StreamName;

final class MySQLSingleStreamStrategy implements IndexingStrategy
final class MySQLSingleStreamStrategy implements PersistenceStrategy
{
/**
* @param string $tableName
Expand Down Expand Up @@ -54,13 +55,17 @@ public function columnNames(): array
];
}

public function prepareData(Message $message, array $data): array
public function prepareData(Iterator $streamEvents): array
{
$data[] = $message->uuid()->toString();
$data[] = $message->messageName();
$data[] = json_encode($message->payload());
$data[] = json_encode($message->metadata());
$data[] = $message->createdAt()->format('Y-m-d\TH:i:s.u');
$data = [];

foreach ($streamEvents as $event) {
$data[] = $event->uuid()->toString();
$data[] = $event->messageName();
$data[] = json_encode($event->payload());
$data[] = json_encode($event->metadata());
$data[] = $event->createdAt()->format('Y-m-d\TH:i:s.u');
}

return $data;
}
Expand All @@ -72,4 +77,9 @@ public function uniqueViolationErrorCodes(): array
{
return ['23000'];
}

public function generateTableName(StreamName $streamName): string
{
return '_' . sha1($streamName->toString());
}
}
Loading

0 comments on commit 88ada1e

Please sign in to comment.