Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: php
php:
- 5.6
- 7.0
- 7.1
- 7.2

script:
- composer install
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Queue Client Changelog

## v2.0.0

- Use new Symfony/Lock component in Filesystem handler

## v1.0.3

- Use Priority objects instead of string
Expand Down
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
}
],
"require": {
"symfony/filesystem": ">=2.7",
"symfony/finder": ">=2.7",
"symfony/filesystem": ">=3.4",
"symfony/finder": ">=3.4",
"symfony/lock": ">=3.4",
"aws/aws-sdk-php": ">=2.7"
},
"require-dev": {
"atoum/atoum": "~2",
"atoum/atoum": "~3.1",
"satooshi/php-coveralls": "dev-master"
},
"autoload": {
Expand Down
4 changes: 2 additions & 2 deletions docker/test/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM php:7
FROM php:7.1

RUN pecl install xdebug
RUN docker-php-ext-enable xdebug
Expand All @@ -9,4 +9,4 @@ RUN apt-get update && apt-get install -y \

COPY ./ssh/ssh_config /etc/ssh/ssh_config

WORKDIR /data
WORKDIR /data
65 changes: 33 additions & 32 deletions src/Adapter/FileAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
use ReputationVIP\QueueClient\PriorityHandler\Priority\Priority;
use ReputationVIP\QueueClient\PriorityHandler\PriorityHandlerInterface;
use ReputationVIP\QueueClient\PriorityHandler\StandardPriorityHandler;
use ReputationVIP\QueueClient\Utils\LockHandlerFactory;
use ReputationVIP\QueueClient\Utils\LockHandlerFactoryInterface;
use Symfony\Component\Filesystem\Exception\IOExceptionInterface;
use Symfony\Component\Filesystem\Filesystem;
use Symfony\Component\Finder\Finder;
use Symfony\Component\Finder\SplFileInfo;
use Symfony\Component\Lock\Factory;
use Symfony\Component\Lock\Store\FlockStore;

class FileAdapter extends AbstractAdapter implements AdapterInterface
{
Expand All @@ -31,7 +31,7 @@ class FileAdapter extends AbstractAdapter implements AdapterInterface
/** @var Filesystem $fs */
private $fs;

/** @var LockHandlerFactoryInterface $fs */
/** @var Factory $lockHandlerFactory */
private $lockHandlerFactory;

/** @var PriorityHandlerInterface $priorityHandler */
Expand All @@ -42,12 +42,12 @@ class FileAdapter extends AbstractAdapter implements AdapterInterface
* @param PriorityHandlerInterface $priorityHandler
* @param Filesystem $fs
* @param Finder $finder
* @param LockHandlerFactoryInterface $lockHandlerFactory
* @param Factory $lockHandlerFactory
*
* @throws \InvalidArgumentException
* @throws QueueAccessException
*/
public function __construct($repository, PriorityHandlerInterface $priorityHandler = null, Filesystem $fs = null, Finder $finder = null, LockHandlerFactoryInterface $lockHandlerFactory = null)
public function __construct($repository, PriorityHandlerInterface $priorityHandler = null, Filesystem $fs = null, Finder $finder = null, Factory $lockHandlerFactory = null)
{
if (empty($repository)) {
throw new \InvalidArgumentException('Argument repository empty or not defined.');
Expand All @@ -61,15 +61,12 @@ public function __construct($repository, PriorityHandlerInterface $priorityHandl
$finder = new Finder();
}

if (null === $lockHandlerFactory) {
$lockHandlerFactory = new LockHandlerFactory();
}

if (null === $priorityHandler) {
$priorityHandler = new StandardPriorityHandler();
}

$this->fs = $fs;

if (!$this->fs->exists($repository)) {
try {
$this->fs->mkdir($repository);
Expand All @@ -78,6 +75,10 @@ public function __construct($repository, PriorityHandlerInterface $priorityHandl
}
}

if (null === $lockHandlerFactory) {
$lockHandlerFactory = new Factory(new FlockStore($repository));
}

$this->priorityHandler = $priorityHandler;
$this->repository = $repository;
$this->finder = $finder;
Expand Down Expand Up @@ -126,8 +127,8 @@ private function getQueuePath($queueName, Priority $priority)
private function readQueueFromFile($queueName, Priority $priority, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand All @@ -148,10 +149,10 @@ private function readQueueFromFile($queueName, Priority $priority, $nbTries = 0)
}
$queue = json_decode($content, true);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();

return $queue;
}
Expand All @@ -170,8 +171,8 @@ private function readQueueFromFile($queueName, Priority $priority, $nbTries = 0)
private function writeQueueInFile($queueName, Priority $priority, $queue, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand All @@ -182,10 +183,10 @@ private function writeQueueInFile($queueName, Priority $priority, $queue, $nbTri
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();
return $this;
}

Expand All @@ -205,8 +206,8 @@ private function writeQueueInFile($queueName, Priority $priority, $queue, $nbTri
private function addMessageLock($queueName, $message, Priority $priority, $nbTries = 0, $delaySeconds = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand Down Expand Up @@ -239,10 +240,10 @@ private function addMessageLock($queueName, $message, Priority $priority, $nbTri
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();
return $this;
}

Expand Down Expand Up @@ -291,8 +292,8 @@ public function addMessage($queueName, $message, Priority $priority = null, $del
private function getMessagesLock($queueName, $nbMsg, Priority $priority, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand Down Expand Up @@ -335,10 +336,10 @@ private function getMessagesLock($queueName, $nbMsg, Priority $priority, $nbTrie
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();

return $messages;
}
Expand Down Expand Up @@ -399,8 +400,8 @@ public function getMessages($queueName, $nbMsg = 1, Priority $priority = null)
private function deleteMessageLock($queueName, $message, Priority $priority, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand Down Expand Up @@ -431,10 +432,10 @@ private function deleteMessageLock($queueName, $message, Priority $priority, $nb
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();

return $this;
}
Expand Down Expand Up @@ -570,16 +571,16 @@ private function deleteQueueLock($queueName, Priority $priority, $nbTries = 0)
}

$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
usleep(10);
return $this->deleteQueueLock($queueName, $priority, ($nbTries + 1));
}
$this->fs->remove($queueFilePath);
$lockHandler->release();
$lock->release();
return $this;
}

Expand Down
16 changes: 0 additions & 16 deletions src/Utils/LockHandlerFactory.php

This file was deleted.

17 changes: 0 additions & 17 deletions src/Utils/LockHandlerFactoryInterface.php

This file was deleted.

Loading