diff --git a/library/Tracks/EventStore/IRepository.php b/library/Tracks/EventStore/IRepository.php new file mode 100644 index 0000000..a6a6b58 --- /dev/null +++ b/library/Tracks/EventStore/IRepository.php @@ -0,0 +1,12 @@ +_query = $query; + $this->_bufferSize = $bufferSize; + } + + /** + * Get the current element + * @see Iterator::current() + */ + public function current() + { + if (empty($this->_rowBuffer)) { + $this->loadMore(); + } + + return unserialize($this->_rowBuffer[$this->_cursor]['data']); + } + + /** + * Get the key for the current element + * @see Iterator::key() + */ + public function key() + { + return $this->_cursor; + } + + /** + * Move the cursor to the next element + * @see Iterator::next() + */ + public function next() + { + if (++$this->_cursor >= $this->_bufferSize) { + $this->loadMore(); + $this->_cursor = 0; + } + } + + /** + * Rewind the cursor to the first element + * @see Iterator::rewind() + */ + public function rewind() + { + $this->_cursor = 0; + + if (empty($this->_rowBuffer)) { + $this->loadMore(); + } + } + + /** + * Is the current key valid? + * @see Iterator::valid() + */ + public function valid() + { + return $this->_cursor < count($this->_rowBuffer); + } + + /** + * Get the SQL string for the current query + * + * @return string + */ + public function getQueryAsString() + { + return $this->_query->assemble(); + } + + /** + * Load more rows from the database and put them in the buffer + */ + private function loadMore() + { + $eventTable = new Zend_Db_Table('event'); + $this->_rowBuffer = $eventTable + ->fetchAll( + $this->_query + ->limit(self::BUFFER_SIZE, $this->_offset) + )->toArray(); + + $this->_offset += self::BUFFER_SIZE; + } +} diff --git a/library/Tracks/Replay/EventStream/ZendDbBuilder.php b/library/Tracks/Replay/EventStream/ZendDbBuilder.php new file mode 100644 index 0000000..57d8929 --- /dev/null +++ b/library/Tracks/Replay/EventStream/ZendDbBuilder.php @@ -0,0 +1,178 @@ +_eventClasses = array(); + + return $this; + } + + /** + * Stream a specific event class + * + * @param string $eventClass + * @return Replay_EventStreamBuilder + */ + public function withEvent($eventClass) + { + assert('is_string($eventClass)'); + + $this->_eventClasses[] = $eventClass; + + return $this; + } + + /** + * Stream from all entities + * + * @return Replay_EventStreamBuilder + */ + public function fromAllEntities() + { + $this->_entities = array(); + + return $this; + } + + /** + * Get events that belong to an entity + * + * @param string $guid + * @return Replay_EventStreamBuilder + */ + public function fromEntity($guid) + { + assert('is_string($guid)'); + + $this->_entities[] = $guid; + + return $this; + } + + /** + * Stream events for all time + * + * @return Replay_EventStreamBuilder + */ + public function forAllTime() + { + $this->_startDate = null; + $this->_endDate = null; + + return $this; + } + + /** + * Specify a date range from which to select events + * Only one date range may be specified + * + * @param Zend_Date $startDate + * @param Zend_Date $endDate + * @return Replay_EventStreamBuilder + */ + public function inDateRange(Zend_Date $startDate, Zend_Date $endDate) + { + $this->_startDate = $startDate; + $this->_endDate = $endDate; + + return $this; + } + + /** + * Get the Replay_EventStream object + * + * @return Replay_EventStream + */ + public function build() + { + $this->buildBaseQuery(); + $this->buildEventsQuery(); + $this->buildDateRangeQuery(); + $this->buildEntitiesQuery(); + + return new Replay_EventStream($this->_query); + } + + /** + * Build the base query to load events + */ + private function buildBaseQuery() + { + $eventTable = new Table_Event; + $this->_query = $eventTable + ->select() + ->from('event', '*') + ->setIntegrityCheck(false) + ->order('date_created ASC') + ->order('event.id'); + } + + /** + * Build the event class filter part of the query + */ + private function buildEventsQuery() + { + if (count($this->_eventClasses) > 0) { + $this->_query->where(implode('OR', array_map(function($x) { return " data LIKE '%{$x}%' "; }, $this->_eventClasses))); + } + } + + /** + * Build the entity guid filter part of the query + */ + private function buildEntitiesQuery() + { + if (count($this->_entities) > 0) { + $this->_query + ->join('event_provider', 'event.event_provider_id = event_provider.id') + ->where(new Zend_Db_Expr("event_provider.guid IN (". implode(',', array_map(function($x) { return "'{$x}'"; }, $this->_entities)) .")")); + } + } + + /** + * Build the date range filter part of the query + */ + private function buildDateRangeQuery() + { + if ($this->_startDate) { + $this->_query->where('date_created >= ?', $this->_startDate->toString('Y-m-d')); + } + + if ($this->_endDate) { + $this->_query->where('date_created <= ?', $this->_endDate->toString('Y-m-d')); + } + } +} diff --git a/library/Tracks/Replay/Player.php b/library/Tracks/Replay/Player.php new file mode 100644 index 0000000..2f2be71 --- /dev/null +++ b/library/Tracks/Replay/Player.php @@ -0,0 +1,56 @@ +_router = $router; + $this->_stream = $stream; + } + + /** + * Play the event stream and execute event handlers + * + * @param ITransaction $transaction + */ + public function play(ITransaction $transaction = NULL) + { + $transaction and $transaction->begin(); + + try { + foreach ($this->_stream as $event) { + $this->_router->route($event); + } + } catch (Exception $e) { + $transaction and $transaction->rollback(); + throw $e; + } + + $transaction and $transaction->commit(); + } + + /** + * Print debug information about what would happen if current stream was played + */ + public function dryRun() + { + echo $this->_stream->getDryRunInfo() . PHP_EOL . PHP_EOL; + + foreach ($this->_stream as $event) { + $handlers = $this->_router->getHandlersRegisteredFor(get_class($event)); + echo get_class($event) .': '. $event->guid .': ('. implode(',', array_map('get_class', $handlers)) .')'. PHP_EOL; + } + + echo PHP_EOL; + } +} diff --git a/library/Tracks/Replay/Router/DirectRouter.php b/library/Tracks/Replay/Router/DirectRouter.php new file mode 100644 index 0000000..ce49875 --- /dev/null +++ b/library/Tracks/Replay/Router/DirectRouter.php @@ -0,0 +1,50 @@ +_handlers[get_class($event)])) { + foreach ($this->_handlers[get_class($event)] as $handler) { + if (is_string($handler)) { + $handler = new $handler; + } + + $handler->handle($event); + } + } + } + + public function addHandler($eventClass, $handler) + { + assert('is_string($eventClass)'); + assert('is_string($handler) or is_object($handler)'); + + if (is_object($handler) + && !($handler instanceof IEventHandler) + ) { + throw new LogicException('Event handlers must implement \Tracks\EventHandler\IEventHandler'); + } + + if (!isset($this->_handlers[$eventClass])) { + $this->_handlers[$eventClass] = array(); + } + + $this->_handlers[$eventClass][] = $handler; + + return $this; + } + + public function getHandlersRegisteredFor($eventClass) + { + assert('is_string($eventClass)'); + + return (isset($this->_handlers[$eventClass]) ? $this->_handlers[$eventClass] : array()); + } +} diff --git a/library/Tracks/Replay/Router/IRouter.php b/library/Tracks/Replay/Router/IRouter.php new file mode 100644 index 0000000..e820d99 --- /dev/null +++ b/library/Tracks/Replay/Router/IRouter.php @@ -0,0 +1,10 @@ +_repository = $repository; + } + + public function registerSave(AggregateRoot $aggregateRoot) + { + $this->_registeredSaves[] = $aggregateRoot; + } + + public function commit(ITransaction $transaction = NULL) + { + $transaction and $transaction->begin(); + + try { + foreach ($this->_registeredSaves as $aggregateRoot) { + $this->_repository->save($aggregateRoot); + } + } catch (Exception $e) { + $transaction and $transaction->rollback(); + throw $e; + } + + $transaction and $transaction->commit(); + } +}