Skip to content
This repository has been archived by the owner on Aug 16, 2020. It is now read-only.

Commit

Permalink
add lock collection support
Browse files Browse the repository at this point in the history
  • Loading branch information
mmasiukevich committed Dec 30, 2019
1 parent 55c678b commit 2b6dba7
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 47 deletions.
30 changes: 15 additions & 15 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 18 additions & 10 deletions src/SagaModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
namespace ServiceBus\Sagas\Module;

use ServiceBus\AnnotationsReader\Reader;
use ServiceBus\Mutex\InMemoryLockCollection;
use ServiceBus\Mutex\LockCollection;
use function ServiceBus\Common\canonicalizeFilesPath;
use function ServiceBus\Common\extractNamespaceFromFile;
use function ServiceBus\Common\searchFiles;
Expand Down Expand Up @@ -177,11 +179,12 @@ public function boot(ContainerBuilder $containerBuilder): void
{
$containerBuilder->setParameter('service_bus.sagas.list', $this->sagasToRegister);

$this->registerMutexCollection($containerBuilder);
$this->registerSagaStore($containerBuilder);
$this->registerMutexFactory($containerBuilder);
$this->registerSagasProvider($containerBuilder);

if (null === $this->configurationLoaderServiceId)
if ($this->configurationLoaderServiceId === null)
{
$this->registerDefaultConfigurationLoader($containerBuilder);

Expand All @@ -191,27 +194,32 @@ public function boot(ContainerBuilder $containerBuilder): void
$this->registerRoutesConfigurator($containerBuilder);
}

/**
* @param ContainerBuilder $containerBuilder
*/
private function registerMutexCollection(ContainerBuilder $containerBuilder): void
{
if ($containerBuilder->hasDefinition(LockCollection::class) === false)
{
$containerBuilder->setDefinition(LockCollection::class, new Definition(InMemoryLockCollection::class));
}
}

private function registerMutexFactory(ContainerBuilder $containerBuilder): void
{
if (false === $containerBuilder->hasDefinition(MutexFactory::class))
if ($containerBuilder->hasDefinition(MutexFactory::class) === false)
{
$containerBuilder->setDefinition(MutexFactory::class, new Definition(InMemoryMutexFactory::class));
}
}

private function registerRoutesConfigurator(ContainerBuilder $containerBuilder): void
{
if (false === $containerBuilder->hasDefinition(ChainRouterConfigurator::class))
if ($containerBuilder->hasDefinition(ChainRouterConfigurator::class) === false)
{
$containerBuilder->setDefinition(ChainRouterConfigurator::class, new Definition(ChainRouterConfigurator::class));
}

$routerConfiguratorDefinition = $containerBuilder->getDefinition(ChainRouterConfigurator::class);

if (false === $containerBuilder->hasDefinition(Router::class))
if ($containerBuilder->hasDefinition(Router::class) === false)
{
$containerBuilder->setDefinition(Router::class, new Definition(Router::class));
}
Expand Down Expand Up @@ -251,7 +259,7 @@ private function registerSagasProvider(ContainerBuilder $containerBuilder): void

private function registerSagaStore(ContainerBuilder $containerBuilder): void
{
if (true === $containerBuilder->hasDefinition(SagasStore::class))
if ($containerBuilder->hasDefinition(SagasStore::class) === true)
{
return;
}
Expand All @@ -264,12 +272,12 @@ private function registerSagaStore(ContainerBuilder $containerBuilder): void

private function registerDefaultConfigurationLoader(ContainerBuilder $containerBuilder): void
{
if (true === $containerBuilder->hasDefinition(SagaConfigurationLoader::class))
if ($containerBuilder->hasDefinition(SagaConfigurationLoader::class) === true)
{
return;
}

if (true === $containerBuilder->hasDefinition(EventListenerProcessorFactory::class))
if ($containerBuilder->hasDefinition(EventListenerProcessorFactory::class) === true)
{
return;
}
Expand Down
48 changes: 26 additions & 22 deletions src/SagasProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

namespace ServiceBus\Sagas\Module;

use ServiceBus\Mutex\InMemoryLockCollection;
use ServiceBus\Mutex\LockCollection;
use function Amp\call;
use function ServiceBus\Common\datetimeInstantiator;
use function ServiceBus\Common\invokeReflectionMethod;
Expand Down Expand Up @@ -50,25 +52,22 @@ final class SagasProvider
*/
private $sagaMetaDataCollection = [];

/**
* @psalm-var array<string, \ServiceBus\Mutex\Lock>
*
* @var Lock[]
*/
private $lockCollection = [];

public function __construct(SagasStore $sagaStore, ?MutexFactory $mutexFactory = null)
{
$this->sagaStore = $sagaStore;
$this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
/** @var LockCollection */
private $lockCollection;

public function __construct(
SagasStore $sagaStore,
?MutexFactory $mutexFactory = null,
?LockCollection $lockCollection = null
) {
$this->sagaStore = $sagaStore;
$this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
$this->lockCollection = $lockCollection ?? new InMemoryLockCollection();
}

public function __destruct()
{
foreach ($this->lockCollection as $lock)
{
yield $lock->release();
}
unset($this->lockCollection);
}

/**
Expand Down Expand Up @@ -245,7 +244,10 @@ private function doStore(Saga $saga, ServiceBusContext $context, bool $isNew): \
$promises[] = $context->delivery($message);
}

yield $promises;
if (\count($promises) !== 0)
{
yield $promises;
}

unset($promises, $commands, $events);

Expand Down Expand Up @@ -282,20 +284,23 @@ private function appendMetaData(string $sagaClass, SagaMetadata $metadata): void
}

/**
* Setup mutes on saga.
* Setup mutex on saga.
*/
private function setupMutex(SagaId $id): \Generator
{
$mutexKey = createMutexKey($id);

if (\array_key_exists($mutexKey, $this->lockCollection) === false)
/** @var bool $hasLock */
$hasLock = yield $this->lockCollection->has($mutexKey);

if ($hasLock === false)
{
$mutex = $this->mutexFactory->create($mutexKey);

/** @var \ServiceBus\Mutex\Lock $lock */
$lock = yield $mutex->acquire();

$this->lockCollection[$mutexKey] = $lock;
yield $this->lockCollection->place($mutexKey, $lock);
}
}

Expand All @@ -306,12 +311,11 @@ private function releaseMutex(SagaId $id): \Generator
{
$mutexKey = createMutexKey($id);

$lock = $this->lockCollection[$mutexKey] ?? null;
/** @var Lock|null $lock */
$lock = yield $this->lockCollection->extract($mutexKey);

if ($lock !== null)
{
unset($this->lockCollection[$mutexKey]);

yield $lock->release();
}
}
Expand Down

0 comments on commit 2b6dba7

Please sign in to comment.