Skip to content

Commit

Permalink
Merge pull request #68 from prooph/projectors
Browse files Browse the repository at this point in the history
rename projection => projector where applicable
  • Loading branch information
prolic committed Mar 13, 2017
2 parents 0062b58 + 0e86d78 commit 6495758
Show file tree
Hide file tree
Showing 15 changed files with 104 additions and 126 deletions.
4 changes: 4 additions & 0 deletions src/HasQueryHint.php
Expand Up @@ -12,6 +12,10 @@

namespace Prooph\EventStore\Pdo;

/**
* Additional interface to be implemented for persistence strategies
* to specify a query hint being used when loading events
*/
interface HasQueryHint
{
public function indexName(): string;
Expand Down
27 changes: 16 additions & 11 deletions src/MySqlEventStore.php
Expand Up @@ -22,7 +22,6 @@
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Pdo\Exception\ExtensionNotLoaded;
use Prooph\EventStore\Pdo\Exception\InvalidArgumentException;
use Prooph\EventStore\Pdo\Exception\RuntimeException;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;
Expand Down Expand Up @@ -74,12 +73,6 @@ public function __construct(
throw ExtensionNotLoaded::with('pdo_mysql');
}

if (! $persistenceStrategy instanceof HasQueryHint) {
throw new InvalidArgumentException(
MySqlEventStore::class . ' requires the persistence strategy to implement ' . HasQueryHint::class
);
}

Assertion::min($loadBatchSize, 1);

$this->messageFactory = $messageFactory;
Expand Down Expand Up @@ -238,10 +231,16 @@ public function load(
}

$tableName = $this->persistenceStrategy->generateTableName($streamName);
$indexName = $this->persistenceStrategy->indexName();

if ($this->persistenceStrategy instanceof HasQueryHint) {
$indexName = $this->persistenceStrategy->indexName();
$queryHint = "USE INDEX($indexName)";
} else {
$queryHint = '';
}

$query = <<<EOT
SELECT * FROM $tableName USE INDEX($indexName)
SELECT * FROM $tableName $queryHint
$whereCondition
ORDER BY `no` ASC
LIMIT :limit;
Expand Down Expand Up @@ -299,10 +298,16 @@ public function loadReverse(
}

$tableName = $this->persistenceStrategy->generateTableName($streamName);
$indexName = $this->persistenceStrategy->indexName();

if ($this->persistenceStrategy instanceof HasQueryHint) {
$indexName = $this->persistenceStrategy->indexName();
$queryHint = "USE INDEX($indexName)";
} else {
$queryHint = '';
}

$query = <<<EOT
SELECT * FROM $tableName USE INDEX($indexName)
SELECT * FROM $tableName $queryHint
$whereCondition
ORDER BY `no` DESC
LIMIT :limit;
Expand Down
8 changes: 1 addition & 7 deletions src/PersistenceStrategy/MySqlAggregateStreamStrategy.php
Expand Up @@ -14,11 +14,10 @@

use Iterator;
use Prooph\EventStore\Pdo\Exception;
use Prooph\EventStore\Pdo\HasQueryHint;
use Prooph\EventStore\Pdo\PersistenceStrategy;
use Prooph\EventStore\StreamName;

final class MySqlAggregateStreamStrategy implements PersistenceStrategy, HasQueryHint
final class MySqlAggregateStreamStrategy implements PersistenceStrategy
{
/**
* @param string $tableName
Expand Down Expand Up @@ -87,9 +86,4 @@ public function generateTableName(StreamName $streamName): string
{
return '_' . sha1($streamName->toString());
}

public function indexName(): string
{
return 'PRIMARY';
}
}
8 changes: 1 addition & 7 deletions src/PersistenceStrategy/MySqlSimpleStreamStrategy.php
Expand Up @@ -13,11 +13,10 @@
namespace Prooph\EventStore\Pdo\PersistenceStrategy;

use Iterator;
use Prooph\EventStore\Pdo\HasQueryHint;
use Prooph\EventStore\Pdo\PersistenceStrategy;
use Prooph\EventStore\StreamName;

final class MySqlSimpleStreamStrategy implements PersistenceStrategy, HasQueryHint
final class MySqlSimpleStreamStrategy implements PersistenceStrategy
{
/**
* @param string $tableName
Expand Down Expand Up @@ -79,9 +78,4 @@ public function generateTableName(StreamName $streamName): string
{
return '_' . sha1($streamName->toString());
}

public function indexName(): string
{
return 'PRIMARY';
}
}
30 changes: 13 additions & 17 deletions src/Projection/MySqlProjectionManager.php
Expand Up @@ -18,19 +18,15 @@
use Prooph\EventStore\Exception\OutOfRangeException;
use Prooph\EventStore\Pdo\Exception;
use Prooph\EventStore\Pdo\MySqlEventStore;
use Prooph\EventStore\Projection\Projection;
use Prooph\EventStore\Projection\ProjectionManager;
use Prooph\EventStore\Projection\ProjectionStatus;
use Prooph\EventStore\Projection\Projector;
use Prooph\EventStore\Projection\Query;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\Projection\ReadModelProjector;

final class MySqlProjectionManager implements ProjectionManager
{
public const OPTION_LOCK_TIMEOUT_MS = 'lock_timeout_ms';

public const DEFAULT_LOCK_TIMEOUT_MS = 1000;

/**
* @var EventStore
*/
Expand Down Expand Up @@ -79,35 +75,35 @@ public function createQuery(): Query
public function createProjection(
string $name,
array $options = []
): Projection {
return new PdoEventStoreProjection(
): Projector {
return new PdoEventStoreProjector(
$this->eventStore,
$this->connection,
$name,
$this->eventStreamsTable,
$this->projectionsTable,
$options[self::OPTION_LOCK_TIMEOUT_MS] ?? self::DEFAULT_LOCK_TIMEOUT_MS,
$options[self::OPTION_CACHE_SIZE] ?? self::DEFAULT_CACHE_SIZE,
$options[self::OPTION_PERSIST_BLOCK_SIZE] ?? self::DEFAULT_PERSIST_BLOCK_SIZE,
$options[self::OPTION_SLEEP] ?? self::DEFAULT_SLEEP
$options[PdoEventStoreProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreProjector::DEFAULT_LOCK_TIMEOUT_MS,
$options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE,
$options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP
);
}

public function createReadModelProjection(
string $name,
ReadModel $readModel,
array $options = []
): ReadModelProjection {
return new PdoEventStoreReadModelProjection(
): ReadModelProjector {
return new PdoEventStoreReadModelProjector(
$this->eventStore,
$this->connection,
$name,
$readModel,
$this->eventStreamsTable,
$this->projectionsTable,
$options[self::OPTION_LOCK_TIMEOUT_MS] ?? self::DEFAULT_LOCK_TIMEOUT_MS,
$options[self::OPTION_PERSIST_BLOCK_SIZE] ?? self::DEFAULT_PERSIST_BLOCK_SIZE,
$options[self::OPTION_SLEEP] ?? self::DEFAULT_SLEEP
$options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS,
$options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP
);
}

Expand Down
Expand Up @@ -26,14 +26,17 @@
use Prooph\EventStore\Exception;
use Prooph\EventStore\Pdo\MySqlEventStore;
use Prooph\EventStore\Pdo\PostgresEventStore;
use Prooph\EventStore\Projection\Projection;
use Prooph\EventStore\Projection\ProjectionStatus;
use Prooph\EventStore\Projection\Projector;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;
use Prooph\EventStore\Util\ArrayCache;

final class PdoEventStoreProjection implements Projection
final class PdoEventStoreProjector implements Projector
{
public const OPTION_LOCK_TIMEOUT_MS = 'lock_timeout_ms';
public const DEFAULT_LOCK_TIMEOUT_MS = 1000;

/**
* @var EventStore
*/
Expand Down Expand Up @@ -167,7 +170,7 @@ public function __construct(
}
}

public function init(Closure $callback): Projection
public function init(Closure $callback): Projector
{
if (null !== $this->initCallback) {
throw new Exception\RuntimeException('Projection already initialized');
Expand All @@ -186,7 +189,7 @@ public function init(Closure $callback): Projection
return $this;
}

public function fromStream(string $streamName): Projection
public function fromStream(string $streamName): Projector
{
if (null !== $this->query) {
throw new Exception\RuntimeException('From was already called');
Expand All @@ -197,7 +200,7 @@ public function fromStream(string $streamName): Projection
return $this;
}

public function fromStreams(string ...$streamNames): Projection
public function fromStreams(string ...$streamNames): Projector
{
if (null !== $this->query) {
throw new Exception\RuntimeException('From was already called');
Expand All @@ -210,7 +213,7 @@ public function fromStreams(string ...$streamNames): Projection
return $this;
}

public function fromCategory(string $name): Projection
public function fromCategory(string $name): Projector
{
if (null !== $this->query) {
throw new Exception\RuntimeException('From was already called');
Expand All @@ -221,7 +224,7 @@ public function fromCategory(string $name): Projection
return $this;
}

public function fromCategories(string ...$names): Projection
public function fromCategories(string ...$names): Projector
{
if (null !== $this->query) {
throw new Exception\RuntimeException('From was already called');
Expand All @@ -234,7 +237,7 @@ public function fromCategories(string ...$names): Projection
return $this;
}

public function fromAll(): Projection
public function fromAll(): Projector
{
if (null !== $this->query) {
throw new Exception\RuntimeException('From was already called');
Expand All @@ -245,7 +248,7 @@ public function fromAll(): Projection
return $this;
}

public function when(array $handlers): Projection
public function when(array $handlers): Projector
{
if (null !== $this->handler || ! empty($this->handlers)) {
throw new Exception\RuntimeException('When was already called');
Expand All @@ -266,7 +269,7 @@ public function when(array $handlers): Projection
return $this;
}

public function whenAny(Closure $handler): Projection
public function whenAny(Closure $handler): Projector
{
if (null !== $this->handler || ! empty($this->handlers)) {
throw new Exception\RuntimeException('When was already called');
Expand Down Expand Up @@ -569,34 +572,34 @@ private function createHandlerContext(?string &$streamName)
{
return new class($this, $streamName) {
/**
* @var Projection
* @var Projector
*/
private $projection;
private $projector;

/**
* @var ?string
*/
private $streamName;

public function __construct(Projection $projection, ?string &$streamName)
public function __construct(Projector $projector, ?string &$streamName)
{
$this->projection = $projection;
$this->projector = $projector;
$this->streamName = &$streamName;
}

public function stop(): void
{
$this->projection->stop();
$this->projector->stop();
}

public function linkTo(string $streamName, Message $event): void
{
$this->projection->linkTo($streamName, $event);
$this->projector->linkTo($streamName, $event);
}

public function emit(Message $event): void
{
$this->projection->emit($event);
$this->projector->emit($event);
}

public function streamName(): ?string
Expand Down

0 comments on commit 6495758

Please sign in to comment.