Skip to content

Commit

Permalink
track entities (#5)
Browse files Browse the repository at this point in the history
* Track persisted entities and update their ids when they are created in ES

* Fixed Symfony version to 3.2.* as 3.3.0 is breaking the KNP paginator dependency on the translator service

* Raised phpcs version to 3.*

* code style fixes

* Updated .travis.yml
  • Loading branch information
pmishev committed Jun 1, 2017
1 parent dd5d0c4 commit 8067971
Show file tree
Hide file tree
Showing 18 changed files with 458 additions and 84 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ php:
- hhvm

env:
- SYMFONY_VERSION="~3.0" ES_VERSION="2.3.2"
- SYMFONY_VERSION="3.2.*" ES_VERSION="2.3.2"

matrix:
allow_failures:
Expand All @@ -15,13 +15,13 @@ matrix:
- php: 5.6
env: SYMFONY_VERSION="~2.7" ES_VERSION="1.6.2"
- php: 7.0
env: SYMFONY_VERSION="~3.0" ES_VERSION="1.7.4"
env: SYMFONY_VERSION="3.2.*" ES_VERSION="1.7.4"
- php: 7.0
env: SYMFONY_VERSION="~3.0" ES_VERSION="1.6.2"
env: SYMFONY_VERSION="3.2.*" ES_VERSION="1.6.2"
- php: 7.0
env: SYMFONY_VERSION="~3.0" ES_VERSION="2.1.1"
env: SYMFONY_VERSION="3.2.*" ES_VERSION="2.1.1"
- php: 7.0
env: SYMFONY_VERSION="~3.0" ES_VERSION="2.2.2"
env: SYMFONY_VERSION="3.2.*" ES_VERSION="2.2.2"

install:
- composer require --no-update symfony/symfony:${SYMFONY_VERSION}
Expand Down
4 changes: 2 additions & 2 deletions DependencyInjection/Compiler/AddIndexManagersPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function process(ContainerBuilder $container)
$connectionService = sprintf('sfes.connection.%s', $indexSettings['connection']);
if (!$container->hasDefinition($connectionService)) {
throw new InvalidConfigurationException(
'There is no ES connection with name ' . $indexSettings['connection']
'There is no ES connection with name '.$indexSettings['connection']
);
}

Expand All @@ -39,7 +39,7 @@ public function process(ContainerBuilder $container)
$indexManagerClass,
$indexManagerName,
$container->getDefinition($connectionService),
$indexSettings
$indexSettings,
]
);

Expand Down
19 changes: 19 additions & 0 deletions Event/Events.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace Sineflow\ElasticsearchBundle\Event;

/**
* Class Events
*/
final class Events
{
/**
* Dispatched on persisting an entity via the index manager
*/
const PRE_PERSIST = 'sfes.pre_persist';

/**
* Dispatched after a bulk request is submitted to Elasticsearch
*/
const POST_COMMIT = 'sfes.post_commit';
}
32 changes: 32 additions & 0 deletions Event/PostCommitEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Sineflow\ElasticsearchBundle\Event;

use Symfony\Component\EventDispatcher\Event;

/**
* Class PostCommitEvent
*/
class PostCommitEvent extends Event
{
/**
* @var array
*/
private $bulkResponse;

/**
* @param array $bulkResponse
*/
public function __construct(array $bulkResponse)
{
$this->bulkResponse = $bulkResponse;
}

/**
* @return array
*/
public function getBulkResponse()
{
return $this->bulkResponse;
}
}
48 changes: 48 additions & 0 deletions Event/PrePersistEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace Sineflow\ElasticsearchBundle\Event;

use Sineflow\ElasticsearchBundle\Document\DocumentInterface;
use Symfony\Component\EventDispatcher\Event;

/**
* Class PrePersistEvent
*/
class PrePersistEvent extends Event
{
/**
* @var DocumentInterface
*/
private $document;

/**
* @var int
*/
private $bulkOperationIndex;

/**
* @param DocumentInterface $document
* @param int $bulkOperationIndex
*/
public function __construct(DocumentInterface $document, $bulkOperationIndex)
{
$this->document = $document;
$this->bulkOperationIndex = $bulkOperationIndex;
}

/**
* @return DocumentInterface
*/
public function getDocument()
{
return $this->document;
}

/**
* @return int
*/
public function getBulkOperationIndex()
{
return $this->bulkOperationIndex;
}
}
38 changes: 38 additions & 0 deletions Manager/ConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
use Elasticsearch\Common\Exceptions\InvalidArgumentException;
use Psr\Log\LoggerInterface;
use Sineflow\ElasticsearchBundle\DTO\BulkQueryItem;
use Sineflow\ElasticsearchBundle\Event\Events;
use Sineflow\ElasticsearchBundle\Event\PostCommitEvent;
use Sineflow\ElasticsearchBundle\Exception\BulkRequestException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

/**
* This class interacts with elasticsearch using injected client.
Expand Down Expand Up @@ -48,6 +51,11 @@ class ConnectionManager
*/
private $autocommit;

/**
* @var EventDispatcherInterface
*/
private $eventDispatcher;

/**
* Construct.
*
Expand Down Expand Up @@ -126,6 +134,22 @@ public function setAutocommit($autocommit)
$this->autocommit = $autocommit;
}

/**
* @return EventDispatcherInterface
*/
public function getEventDispatcher()
{
return $this->eventDispatcher;
}

/**
* @param EventDispatcherInterface $eventDispatcher
*/
public function setEventDispatcher($eventDispatcher)
{
$this->eventDispatcher = $eventDispatcher;
}

/**
* Adds query to bulk queries container.
*
Expand All @@ -141,6 +165,16 @@ public function addBulkOperation($operation, $index, $type, array $query)
$this->bulkQueries[] = new BulkQueryItem($operation, $index, $type, $query);
}

/**
* Returns the number of bulk operations currently queued
*
* @return int
*/
public function getBulkOperationsCount()
{
return count($this->bulkQueries);
}

/**
* Optional setter to change bulk query params.
*
Expand Down Expand Up @@ -203,6 +237,10 @@ public function commit($forceRefresh = true)
$e->setBulkResponseItems($response['items']);
throw $e;
}

if ($this->eventDispatcher) {
$this->eventDispatcher->dispatch(Events::POST_COMMIT, new PostCommitEvent($response));
}
}

/**
Expand Down
24 changes: 23 additions & 1 deletion Manager/ConnectionManagerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
use Elasticsearch\ClientBuilder;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

/**
* Elasticsearch connection factory class
*/
class ConnectionManagerFactory
{

/**
* @var LoggerInterface
*/
Expand All @@ -27,6 +27,11 @@ class ConnectionManagerFactory
*/
private $kernelDebug;

/**
* @var EventDispatcherInterface
*/
private $eventDispatcher;

/**
* @param boolean $kernelDebug
* @param LoggerInterface $tracer
Expand All @@ -39,6 +44,22 @@ public function __construct($kernelDebug, LoggerInterface $tracer = null, Logger
$this->logger = $logger;
}

/**
* @return EventDispatcherInterface
*/
public function getEventDispatcher()
{
return $this->eventDispatcher;
}

/**
* @param EventDispatcherInterface $eventDispatcher
*/
public function setEventDispatcher($eventDispatcher)
{
$this->eventDispatcher = $eventDispatcher;
}

/**
* @param string $connectionName
* @param array $connectionSettings
Expand All @@ -65,6 +86,7 @@ public function createConnectionManager($connectionName, $connectionSettings)
);

$connectionManager->setLogger($this->logger ?: new NullLogger());
$connectionManager->setEventDispatcher($this->eventDispatcher);

return $connectionManager;
}
Expand Down
40 changes: 34 additions & 6 deletions Manager/IndexManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
use Sineflow\ElasticsearchBundle\Document\Provider\ProviderInterface;
use Sineflow\ElasticsearchBundle\Document\Provider\ProviderRegistry;
use Sineflow\ElasticsearchBundle\Document\Repository\Repository;
use Sineflow\ElasticsearchBundle\Event\Events;
use Sineflow\ElasticsearchBundle\Event\PrePersistEvent;
use Sineflow\ElasticsearchBundle\Exception\BulkRequestException;
use Sineflow\ElasticsearchBundle\Exception\Exception;
use Sineflow\ElasticsearchBundle\Exception\IndexRebuildingException;
use Sineflow\ElasticsearchBundle\Exception\NoReadAliasException;
use Sineflow\ElasticsearchBundle\Finder\Finder;
use Sineflow\ElasticsearchBundle\Mapping\DocumentMetadataCollector;
use Sineflow\ElasticsearchBundle\Result\DocumentConverter;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

/**
* Manager class.
Expand Down Expand Up @@ -56,7 +59,7 @@ class IndexManager
protected $indexMapping;

/**
* @var RepositoryInterface[]
* @var Repository[]
*/
protected $repositories = [];

Expand All @@ -75,6 +78,11 @@ class IndexManager
*/
protected $writeAlias = null;

/**
* @var EventDispatcherInterface
*/
protected $eventDispatcher;

/**
* @param string $managerName
* @param ConnectionManager $connection
Expand Down Expand Up @@ -110,6 +118,22 @@ public function __construct(
}
}

/**
* @return EventDispatcherInterface
*/
public function getEventDispatcher()
{
return $this->eventDispatcher;
}

/**
* @param EventDispatcherInterface $eventDispatcher
*/
public function setEventDispatcher($eventDispatcher)
{
$this->eventDispatcher = $eventDispatcher;
}

/**
* Returns mapping array for index
*
Expand Down Expand Up @@ -248,12 +272,12 @@ private function getBaseIndexName()
*/
private function getUniqueIndexName()
{
$indexName = $baseName = $this->getBaseIndexName() . '_' . date('YmdHis');
$indexName = $baseName = $this->getBaseIndexName().'_'.date('YmdHis');

$i = 1;
// Keep trying other names until there is no such existing index or alias
while ($this->getConnection()->existsIndexOrAlias(array('index' => $indexName))) {
$indexName = $baseName . '_' . $i;
$indexName = $baseName.'_'.$i;
$i++;
}

Expand Down Expand Up @@ -474,9 +498,9 @@ public function rebuildIndex($deleteOld = false, $cancelExistingRebuild = false)
[
'remove' => [
'index' => $oldIndex,
'alias' => $this->writeAlias
'alias' => $this->writeAlias,
],
]
],
],
],
];
Expand Down Expand Up @@ -655,7 +679,6 @@ public function update($documentClass, $id, array $fields = [], $script = null,
}
}


/**
* Adds document to a bulk request for the next commit.
* Depending on the connection autocommit mode, the update may be committed right away.
Expand All @@ -664,6 +687,11 @@ public function update($documentClass, $id, array $fields = [], $script = null,
*/
public function persist(DocumentInterface $document)
{
if ($this->eventDispatcher) {
$bulkOperationIndex = $this->getConnection()->getBulkOperationsCount();
$this->eventDispatcher->dispatch(Events::PRE_PERSIST, new PrePersistEvent($document, $bulkOperationIndex));
}

$documentArray = $this->documentConverter->convertToArray($document);
$this->persistRaw(get_class($document), $documentArray);
}
Expand Down

0 comments on commit 8067971

Please sign in to comment.