Skip to content

Commit

Permalink
Implement Asynchronous Operation status change based on a lookup key
Browse files Browse the repository at this point in the history
  • Loading branch information
nuzil committed May 28, 2020
1 parent 55af84f commit 92fab51
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 126 deletions.
14 changes: 9 additions & 5 deletions app/code/Magento/AsynchronousOperations/Model/BulkManagement.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
public function retryBulk($bulkUuid, array $errorCodes)
{
$metadata = $this->metadataPool->getMetadata(BulkSummaryInterface::class);
$connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());

$connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
/** @var \Magento\AsynchronousOperations\Model\ResourceModel\Operation[] $retriablyFailedOperations */
$retriablyFailedOperations = $this->operationCollectionFactory->create()
->addFieldToFilter('error_code', ['in' => $errorCodes])
Expand All @@ -157,23 +157,27 @@ public function retryBulk($bulkUuid, array $errorCodes)
/** @var OperationInterface $operation */
foreach ($retriablyFailedOperations as $operation) {
if ($currentBatchSize === $maxBatchSize) {
$whereCondition = $connection->quoteInto('operation_key IN (?)', $operationIds)
. " AND "
. $connection->quoteInto('bulk_uuid = ?', $bulkUuid);
$connection->delete(
$this->resourceConnection->getTableName('magento_operation'),
$connection->quoteInto('id IN (?)', $operationIds)
$whereCondition
);
$operationIds = [];
$currentBatchSize = 0;
}
$currentBatchSize++;
$operationIds[] = $operation->getId();
// Rescheduled operations must be put in queue in 'open' state (i.e. without ID)
$operation->setId(null);
}
// remove operations from the last batch
if (!empty($operationIds)) {
$whereCondition = $connection->quoteInto('operation_key IN (?)', $operationIds)
. " AND "
. $connection->quoteInto('bulk_uuid = ?', $bulkUuid);
$connection->delete(
$this->resourceConnection->getTableName('magento_operation'),
$connection->quoteInto('id IN (?)', $operationIds)
$whereCondition
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Magento\AsynchronousOperations\Api\Data\AsyncResponseInterfaceFactory;
use Magento\AsynchronousOperations\Api\Data\ItemStatusInterface;
use Magento\AsynchronousOperations\Api\Data\ItemStatusInterfaceFactory;
use Magento\AsynchronousOperations\Model\ResourceModel\Operation\OperationRepository;
use Magento\Authorization\Model\UserContextInterface;
use Magento\Framework\Bulk\BulkManagementInterface;
use Magento\Framework\DataObject\IdentityGeneratorInterface;
Expand Down Expand Up @@ -144,7 +143,6 @@ public function publishMass($topicName, array $entitiesArray, $groupId = null, $
foreach ($entitiesArray as $key => $entityParams) {
/** @var \Magento\AsynchronousOperations\Api\Data\ItemStatusInterface $requestItem */
$requestItem = $this->itemStatusInterfaceFactory->create();

try {
$operation = $this->operationRepository->create($topicName, $entityParams, $groupId, $key);
$operations[] = $operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
namespace Magento\AsynchronousOperations\Model;

use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
use Magento\Framework\EntityManager\EntityManager;
use Magento\Framework\App\ResourceConnection;
use Psr\Log\LoggerInterface;
use Magento\Framework\Bulk\OperationManagementInterface;

/**
* Class for managing Bulk Operations
*/
class OperationManagement implements \Magento\Framework\Bulk\OperationManagementInterface
class OperationManagement implements OperationManagementInterface
{
/**
* @var EntityManager
* @var ResourceConnection
*/
private $entityManager;
private $connection;

/**
* @var OperationInterfaceFactory
Expand All @@ -32,40 +34,44 @@ class OperationManagement implements \Magento\Framework\Bulk\OperationManagement
/**
* OperationManagement constructor.
*
* @param EntityManager $entityManager
* @param OperationInterfaceFactory $operationFactory
* @param \Psr\Log\LoggerInterface $logger
* @param LoggerInterface $logger
* @param ResourceConnection $connection
*/
public function __construct(
EntityManager $entityManager,
OperationInterfaceFactory $operationFactory,
\Psr\Log\LoggerInterface $logger
LoggerInterface $logger,
ResourceConnection $connection
) {
$this->entityManager = $entityManager;
$this->operationFactory = $operationFactory;
$this->logger = $logger;
$this->connection = $connection;
}

/**
* @inheritDoc
*/
public function changeOperationStatus(
$operationId,
$bulkUuid,
$operationKey,
$status,
$errorCode = null,
$message = null,
$data = null,
$resultData = null
) {
try {
$operationEntity = $this->operationFactory->create();
$this->entityManager->load($operationEntity, $operationId);
$operationEntity->setErrorCode($errorCode);
$operationEntity->setStatus($status);
$operationEntity->setResultMessage($message);
$operationEntity->setSerializedData($data);
$operationEntity->setResultSerializedData($resultData);
$this->entityManager->save($operationEntity);
$connection = $this->connection->getConnection();
$table = $this->connection->getTableName('magento_operation');
$bind = [
'error_code' => $errorCode,
'status' => $status,
'result_message' => $message,
'serialized_data' => $data,
'result_serialized_data' => $resultData
];
$where = ['bulk_uuid = ?' => $bulkUuid, 'operation_key = ?' => $operationKey];
$connection->update($table, $bind, $where);
} catch (\Exception $exception) {
$this->logger->critical($exception->getMessage());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public function process(string $encodedMessage)

$serializedData = (isset($errorCode)) ? $operation->getSerializedData() : null;
$this->operationManagement->changeOperationStatus(
$operation->getBulkUuid(),
$operation->getId(),
$status,
$errorCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

namespace Magento\AsynchronousOperations\Model\ResourceModel;

use Magento\Framework\Model\ResourceModel\Db\AbstractDb;

/**
* Resource class for Bulk Operations
*/
class Operation extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb
class Operation extends AbstractDb
{

public const TABLE_NAME = "magento_operation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
*/

declare(strict_types=1);

namespace Magento\AsynchronousOperations\Model\ResourceModel\Operation;

use Magento\AsynchronousOperations\Api\Data\OperationInterface;
use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
use Magento\AsynchronousOperations\Model\OperationRepositoryInterface;
use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\MessageQueue\MessageValidator;
use Magento\Framework\MessageQueue\MessageEncoder;
use Magento\Framework\Serialize\Serializer\Json;
Expand Down Expand Up @@ -73,11 +73,13 @@ public function __construct(
* @param string $topicName
* @param array $entityParams
* @param string $groupId
* @param string $operationId
* @return OperationInterface
* @throws LocalizedException
* @deprecated No longer used.
* @see create()
*/
public function createByTopic($topicName, $entityParams, $groupId)
public function createByTopic($topicName, $entityParams, $groupId, $operationId)
{
$this->messageValidator->validate($topicName, $entityParams);
$encodedMessage = $this->messageEncoder->encode($topicName, $entityParams);
Expand All @@ -89,10 +91,11 @@ public function createByTopic($topicName, $entityParams, $groupId)
];
$data = [
'data' => [
OperationInterface::BULK_ID => $groupId,
OperationInterface::TOPIC_NAME => $topicName,
OperationInterface::ID => $operationId,
OperationInterface::BULK_ID => $groupId,
OperationInterface::TOPIC_NAME => $topicName,
OperationInterface::SERIALIZED_DATA => $this->jsonSerializer->serialize($serializedData),
OperationInterface::STATUS => OperationInterface::STATUS_TYPE_OPEN,
OperationInterface::STATUS => OperationInterface::STATUS_TYPE_OPEN,
],
];

Expand All @@ -103,9 +106,11 @@ public function createByTopic($topicName, $entityParams, $groupId)

/**
* @inheritDoc
*
* @throws LocalizedException
*/
public function create($topicName, $entityParams, $groupId, $operationId): OperationInterface
{
return $this->createByTopic($topicName, $entityParams, $groupId);
return $this->createByTopic($topicName, $entityParams, $groupId, $operationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public function testRetryBulk()
$bulkUuid = 'bulk-001';
$errorCodes = ['errorCode'];
$connectionName = 'default';
$operationId = 1;
$operationId = 0;
$operationTable = 'magento_operation';
$topicName = 'topic.name';
$metadata = $this->getMockForAbstractClass(EntityMetadataInterface::class);
Expand All @@ -216,13 +216,20 @@ public function testRetryBulk()
$operationCollection->expects($this->once())->method('getItems')->willReturn([$operation]);
$connection->expects($this->once())->method('beginTransaction')->willReturnSelf();
$operation->expects($this->once())->method('getId')->willReturn($operationId);
$operation->expects($this->once())->method('setId')->with(null)->willReturnSelf();
$this->resourceConnection->expects($this->once())
->method('getTableName')->with($operationTable)->willReturn($operationTable);
$connection->expects($this->at(1))
->method('quoteInto')
->with('operation_key IN (?)', [$operationId])
->willReturn('operation_key IN (' . $operationId . ')');
$connection->expects($this->at(2))
->method('quoteInto')
->with('bulk_uuid = ?', $bulkUuid)
->willReturn("bulk_uuid = '$bulkUuid'");
$connection->expects($this->once())
->method('quoteInto')->with('id IN (?)', [$operationId])->willReturn('id IN (' . $operationId . ')');
$connection->expects($this->once())
->method('delete')->with($operationTable, 'id IN (' . $operationId . ')')->willReturn(1);
->method('delete')
->with($operationTable, 'operation_key IN (' . $operationId . ') AND bulk_uuid = \'' . $bulkUuid . '\'')
->willReturn(1);
$connection->expects($this->once())->method('commit')->willReturnSelf();
$operation->expects($this->once())->method('getTopicName')->willReturn($topicName);
$this->publisher->expects($this->once())->method('publish')->with($topicName, [$operation])->willReturn(null);
Expand All @@ -239,7 +246,7 @@ public function testRetryBulkWithException()
$bulkUuid = 'bulk-001';
$errorCodes = ['errorCode'];
$connectionName = 'default';
$operationId = 1;
$operationId = 0;
$operationTable = 'magento_operation';
$exceptionMessage = 'Exception message';
$metadata = $this->getMockForAbstractClass(EntityMetadataInterface::class);
Expand All @@ -259,13 +266,19 @@ public function testRetryBulkWithException()
$operationCollection->expects($this->once())->method('getItems')->willReturn([$operation]);
$connection->expects($this->once())->method('beginTransaction')->willReturnSelf();
$operation->expects($this->once())->method('getId')->willReturn($operationId);
$operation->expects($this->once())->method('setId')->with(null)->willReturnSelf();
$this->resourceConnection->expects($this->once())
->method('getTableName')->with($operationTable)->willReturn($operationTable);
$connection->expects($this->at(1))
->method('quoteInto')
->with('operation_key IN (?)', [$operationId])
->willReturn('operation_key IN (' . $operationId . ')');
$connection->expects($this->at(2))
->method('quoteInto')
->with('bulk_uuid = ?', $bulkUuid)
->willReturn("bulk_uuid = '$bulkUuid'");
$connection->expects($this->once())
->method('quoteInto')->with('id IN (?)', [$operationId])->willReturn('id IN (' . $operationId . ')');
$connection->expects($this->once())
->method('delete')->with($operationTable, 'id IN (' . $operationId . ')')
->method('delete')
->with($operationTable, 'operation_key IN (' . $operationId . ') AND bulk_uuid = \'' . $bulkUuid . '\'')
->willThrowException(new \Exception($exceptionMessage));
$connection->expects($this->once())->method('rollBack')->willReturnSelf();
$this->logger->expects($this->once())->method('critical')->with($exceptionMessage);
Expand Down
Loading

0 comments on commit 92fab51

Please sign in to comment.