Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.

Commit

Permalink
Merge pull request #7 from lamoda/feature/SCENTRE-5744
Browse files Browse the repository at this point in the history
SCENTRE-5744 Handle queue in "in progress" status, some optimization
  • Loading branch information
Tekill committed Nov 7, 2019
2 parents 202a596 + 817d4cd commit 0c22cfc
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 83 deletions.
12 changes: 6 additions & 6 deletions src/ConstantMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class ConstantMessage

public const PUBLISHER_NOT_FOUND = 'Publisher `%s` not found';

public const QUEUE_ENTITY_NOT_FOUND = 'The queue with id "%d" was not found';
public const QUEUE_ENTITY_NOT_FOUND_IN_STATUS_NEW = 'The queue "%s" with job "%s" was not found in status "new". Actual status is "%s"';
public const QUEUE_ATTEMPTS_REACHED = 'The queue "%s" has reached it\'s attempts count maximum';
public const QUEUE_CAN_NOT_REQUEUE = 'Can not requeue messages';
public const QUEUE_CAN_NOT_REPUBLISH = 'Can not republish messages';
public const QUEUE_SUCCESS_REPUBLISH = 'Queue republish successfully completed';
public const QUEUE_ENTITY_NOT_FOUND = 'The queue with id "%d" was not found';
public const QUEUE_ENTITY_NOT_FOUND_IN_SUITABLE_STATUS = 'The queue "%s" with job "%s" was not found in suitable status. Actual status is "%s"';
public const QUEUE_ATTEMPTS_REACHED = 'The queue "%s" has reached it\'s attempts count maximum';
public const QUEUE_CAN_NOT_REQUEUE = 'Can not requeue messages';
public const QUEUE_CAN_NOT_REPUBLISH = 'Can not republish messages';
public const QUEUE_SUCCESS_REPUBLISH = 'Queue republish successfully completed';
}
6 changes: 1 addition & 5 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ public function execute(AMQPMessage $message): int
return $this->doExecute(
$this->queueService->getToProcess($data['id'])
);
} catch (UnexpectedValueException $exception) {
$this->logger->alert($exception->getMessage(), $this->getMessageLogParams($message));

return self::MSG_REJECT;
} catch (AttemptsReachedException $exception) {
} catch (UnexpectedValueException | AttemptsReachedException $exception) {
$this->logger->alert($exception->getMessage(), $this->getMessageLogParams($message));

return self::MSG_REJECT;
Expand Down
2 changes: 0 additions & 2 deletions src/Publisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ private function releaseQueues(array $queues): void
'trace' => $e->getTraceAsString(),
]
);

continue;
}
}
}
Expand Down
64 changes: 29 additions & 35 deletions src/Service/QueueService.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class QueueService
/** @var int */
protected $maxAttempts;

/** @var array */
protected $queueSuitableStatuses = [
QueueEntityMappedSuperclass::STATUS_NEW,
QueueEntityMappedSuperclass::STATUS_IN_PROGRESS
];

public function __construct(
QueueRepository $repository,
EntityFactoryInterface $entityFactory,
Expand Down Expand Up @@ -64,45 +70,33 @@ public function getToRestore(int $limit, ?int $offset = null): array
*/
public function getToProcess(int $id): QueueEntityInterface
{
$this->repository->beginTransaction();
$queueEntity = $this->repository->find($id);

try {
$queueEntity = $this->repository->findOneBy(
[
'id' => $id,
]
);

if (!($queueEntity instanceof QueueEntityInterface)) {
throw new UnexpectedValueException(sprintf(ConstantMessage::QUEUE_ENTITY_NOT_FOUND, $id));
}

if (QueueEntityMappedSuperclass::STATUS_NEW !== $queueEntity->getStatus()) {
throw new UnexpectedValueException(sprintf(
ConstantMessage::QUEUE_ENTITY_NOT_FOUND_IN_STATUS_NEW,
$queueEntity->getName(),
$queueEntity->getJobName(),
$queueEntity->getStatusAsString()
));
}

$attemptsReached = $queueEntity->isMaxAttemptsReached($this->maxAttempts);
if ($attemptsReached) {
$queueEntity->setAttemptsReached();
} else {
$queueEntity->setInProgress();
}

$this->repository->save($queueEntity);
$this->repository->commit();
} catch (Exception $exception) {
$this->repository->rollback();

throw $exception;
if (!($queueEntity instanceof QueueEntityInterface)) {
throw new UnexpectedValueException(sprintf(ConstantMessage::QUEUE_ENTITY_NOT_FOUND, $id));
}

if (!in_array($queueEntity->getStatus(), $this->queueSuitableStatuses, true)) {
throw new UnexpectedValueException(sprintf(
ConstantMessage::QUEUE_ENTITY_NOT_FOUND_IN_SUITABLE_STATUS,
$queueEntity->getName(),
$queueEntity->getJobName(),
$queueEntity->getStatusAsString()
));
}

$attemptsReached = $queueEntity->isMaxAttemptsReached($this->maxAttempts);
if ($attemptsReached) {
$queueEntity->setAttemptsReached();
} else {
$queueEntity->setInProgress();
}

$this->repository->save($queueEntity);

if ($attemptsReached) {
$this->eventDispatcher->dispatch(QueueAttemptsReachedEvent::NAME, new QueueAttemptsReachedEvent($queueEntity));

throw new AttemptsReachedException(sprintf(ConstantMessage::QUEUE_ATTEMPTS_REACHED, $queueEntity->getName()));
}

Expand Down Expand Up @@ -131,7 +125,7 @@ public function getToRepublish(int $limit, ?int $offset = null): array
*/
public function createQueue(QueueInterface $queueable): QueueEntityInterface
{
return $this->save($this->entityFactory->createQueue($queueable));
return $this->entityFactory->createQueue($queueable);
}

public function flush(QueueEntityInterface $entity = null): void
Expand Down
54 changes: 19 additions & 35 deletions tests/unit/Service/QueueServiceTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,36 @@ public function testGetToRestore(): void
}

/**
* @param QueueEntity $queue
*
* @throws \Exception
*
* @dataProvider dataGetToProcess()
*/
public function testGetToProcess(): void
public function testGetToProcess(QueueEntity $queue): void
{
$queue = new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]);
$queue->setNew();

$queueRepository = $this->getQueueRepository();
$queueRepository
->expects($this->once())
->method('beginTransaction');
$queueRepository
->expects($this->once())
->method('findOneBy')
->method('find')
->willReturn($queue);
$queueRepository
->expects($this->once())
->method('save')
->with($queue);
$queueRepository
->expects($this->once())
->method('commit');

$this->assertEquals($queue, $this->createService($queueRepository)->getToProcess(1));
$this->assertEquals(QueueEntity::STATUS_IN_PROGRESS_TITLE, $queue->getStatusAsString());
}

public function dataGetToProcess(): array
{
return [
'new status' => [(new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]))->setNew()],
'in progress status' => [(new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]))->setInProgress()],
];
}

/**
* @param null | QueueEntity $queueEntity
* @param string $expectedExceptionMessage
Expand All @@ -83,16 +86,11 @@ public function testGetToProcessQueueNotFound(?QueueEntity $queueEntity, string
$this->expectExceptionMessage($expectedExceptionMessage);

$queueRepository = $this->getQueueRepository();

$queueRepository
->expects($this->once())
->method('beginTransaction');
$queueRepository
->expects($this->once())
->method('findOneBy')
->method('find')
->willReturn($queueEntity);
$queueRepository
->expects($this->once())
->method('rollback');

$this->createService($queueRepository)->getToProcess(1);
}
Expand All @@ -106,7 +104,7 @@ public function dataGetToProcessQueueNotFound(): array
],
'Status not NEW' => [
new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]),
'The queue "queue" with job "ClassJob" was not found in status "new". Actual status is "initial"',
'The queue "queue" with job "ClassJob" was not found in suitable status. Actual status is "initial"',
],
];
}
Expand All @@ -126,18 +124,12 @@ public function testGetToProcessAttemptsReached(): void
$queueRepository = $this->getQueueRepository();
$queueRepository
->expects($this->once())
->method('beginTransaction');
$queueRepository
->expects($this->once())
->method('findOneBy')
->method('find')
->willReturn($queue);
$queueRepository
->expects($this->once())
->method('save')
->with($queue);
$queueRepository
->expects($this->once())
->method('commit');

$attemptsReachedEvent = new QueueAttemptsReachedEvent($queue);

Expand Down Expand Up @@ -184,11 +176,6 @@ public function testCreateQueue(): void
->willReturn($queue);

$queueRepository = $this->getQueueRepository();
$queueRepository
->expects($this->once())
->method('save')
->with($queue)
->willReturnArgument(0);

$this->assertEquals(
$queue,
Expand Down Expand Up @@ -232,10 +219,7 @@ private function getQueueRepository()
{
return $this->getMockQueueRepository(
[
'beginTransaction',
'rollback',
'commit',
'findOneBy',
'find',
'getToRestore',
'save',
'isTransactionActive',
Expand Down

0 comments on commit 0c22cfc

Please sign in to comment.