Skip to content

Commit

Permalink
Merge pull request #8 from koongo-com/feature/batch-notification-proc…
Browse files Browse the repository at this point in the history
…essor

Feature/batch notification processor
  • Loading branch information
pecinaon committed Aug 23, 2023
2 parents 3d24593 + 47c24f2 commit 81b30e5
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 153 deletions.
147 changes: 147 additions & 0 deletions src/Nostress/Koongo/Model/Webhook/Event/AbstractProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<?php
/**
* Magento Module developed by NoStress Commerce
*
* NOTICE OF LICENSE
*
* This program is licensed under the Koongo software licence (by NoStress Commerce).
* With the purchase, download of the software or the installation of the software
* in your application you accept the licence agreement. The allowed usage is outlined in the
* Koongo software licence which can be found under https://docs.koongo.com/display/koongo/License+Conditions
*
* Any modification or distribution is strictly forbidden. The license
* grants you the installation in one application. For multiuse you will need
* to purchase further licences at https://store.koongo.com/.
*
* See the Koongo software licence agreement for more details.
* @copyright Copyright (c) 2017 NoStress Commerce (http://www.nostresscommerce.cz, http://www.koongo.com/)
*
*/

/**
* Webhook event Processor model for Koongo api
*
* @category Nostress
* @package Nostress_Koongo
*
*/

namespace Nostress\Koongo\Model\Webhook\Event;

use Magento\Framework\Data\Collection\AbstractDb;
use Magento\Framework\Model\Context;
use Magento\Framework\Model\ResourceModel\AbstractResource;
use Magento\Framework\Registry;
use Magento\Sales\Model\OrderFactory;
use Magento\Store\Model\StoreManagerInterface;
use Nostress\Koongo\Helper\Data;
use Nostress\Koongo\Model\AbstractModel;
use Nostress\Koongo\Model\Api\Restclient\Kaas;
use Nostress\Koongo\Model\Translation;
use Nostress\Koongo\Model\Webhook\EventFactory;
use Nostress\Koongo\Model\WebhookFactory;

abstract class AbstractProcessor extends AbstractModel
{
/** Check events in processing state in given period */
const EVENT_PROCESSING_CHECK_PERIOD = "-1 hours";
/** Check and remove events older than diven number of days */
const EVENT_REMOVAL_CHECK_PERIOD = "-14 days";
/** Maximum number of event renews */
const EVENT_RENEWAL_COUNTER_LIMIT = 5;
/** Maximum amount of events processed in one run. */
const EVENT_PROCESSING_MAX_AMOUNT = 5000;

/**
* Webhook event factory
*
* @var EventFactory
*/
protected $_eventFactory;

/**
* Webhook factory
*
* @var WebhookFactory
*/
protected $_webhookFactory;

/**
* Kaas api client
*
* @var Kaas
*/
protected $_apiClient;

/**
* Webhook event manager
*
* @var Manager
*/
protected $_webhookEventManager;

/**
* Sales order factory
*
* @var OrderFactory
*/
protected $_orderFactory;

/**
* @param Context $context
* @param Registry $registry
* @param Data $helper
* @param StoreManagerInterface $storeManager
* @param Translation $translation
* @param EventFactory $eventFactory
* @param WebhookFactory $webhookFactory
* @param Kaas $apiClient
* @param Manager $eventManager
* @param OrderFactory $orderFactory
* @param AbstractResource $resource
* @param AbstractDb $resourceCollection
* @param array $data
*/
public function __construct(
Context $context,
Registry $registry,
Data $helper,
StoreManagerInterface $storeManager,
Translation $translation,
EventFactory $eventFactory,
WebhookFactory $webhookFactory,
Kaas $apiClient,
Manager $webhookEventManager,
OrderFactory $orderFactory,
AbstractResource $resource = null,
AbstractDb $resourceCollection = null,
array $data = []
) {
$this->_eventFactory = $eventFactory;
$this->_webhookFactory = $webhookFactory;
$this->_apiClient = $apiClient;
$this->_webhookEventManager = $webhookEventManager;
$this->_orderFactory = $orderFactory;
parent::__construct($context, $registry, $helper, $storeManager, $translation, $resource, $resourceCollection, $data);
}

/**
* Load API client
*
* @return Kaas
*/
protected function _getApiClient()
{
return $this->_apiClient;
}

/**
* Get Manager for events
*
* @return Manager
*/
protected function _getManager()
{
return $this->_webhookEventManager;
}
}
196 changes: 196 additions & 0 deletions src/Nostress/Koongo/Model/Webhook/Event/BatchProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
<?php
/**
* Magento Module developed by NoStress Commerce
*
* NOTICE OF LICENSE
*
* This program is licensed under the Koongo software licence (by NoStress Commerce).
* With the purchase, download of the software or the installation of the software
* in your application you accept the licence agreement. The allowed usage is outlined in the
* Koongo software licence which can be found under https://docs.koongo.com/display/koongo/License+Conditions
*
* Any modification or distribution is strictly forbidden. The license
* grants you the installation in one application. For multiuse you will need
* to purchase further licences at https://store.koongo.com/.
*
* See the Koongo software licence agreement for more details.
* @copyright Copyright (c) 2017 NoStress Commerce (http://www.nostresscommerce.cz, http://www.koongo.com/)
*
*/

/**
* Webhook event Processor model for Koongo api
*
* @category Nostress
* @package Nostress_Koongo
*
*/

namespace Nostress\Koongo\Model\Webhook\Event;

use Exception;
use Magento\Framework\Model\ResourceModel\Db\Collection\AbstractCollection;
use Nostress\Koongo\Model\Webhook;
use Nostress\Koongo\Model\Webhook\Event;
use Zend_Db_Expr;
use Zend_Db_Select;
use Magento\Cron\Model\Schedule;

/**
* Batch processor for products.batch topic events
*/
class BatchProcessor extends AbstractProcessor
{
private const BATCH_SIZE = 500;

/**
* Process events with batch & bulk it into more rows
*
* @return void
*/
public function process(?Schedule $schedule = null, bool $pendingOnly = true)
{
$batches = $this->prepareBatch($pendingOnly);

/** @var AbstractCollection $eventCollection */
$eventCollection = $this->_eventFactory->create()->getCollection();

foreach ($batches as $cnt => $webhookId) {
var_dump($webhookId);
$webhook = $this->_webhookFactory->create()->load($webhookId);

if (!empty($webhook->getId())) {
$collection = clone $eventCollection;
$select = $collection->getSelect();
$skus = [];
$eventIds = [];

while ($cnt > 0) {
$select->where('topic = ?', Webhook::WEBHOOK_TOPIC_PRODUCTS_BATCH)
->where('status = ?', Event::STATUS_PENDING)
->where('webhook_id = ?', $webhookId)
->limit(self::BATCH_SIZE)
->order('created_at');

$processed = 0;
$collection->load();
/** @var Event $event */
foreach ($collection as $event) {
$params = json_decode($event->getParams(), true);
$eventIds[] = $event->getId();
$processed++;

if (empty($params)) {
continue;
}

if (array_key_exists($event::PARAM_STORE_PRODUCT_SKUS, $params)) {
$skus = array_merge($params[$event::PARAM_STORE_PRODUCT_SKUS], $skus);
$skus = array_unique($skus);
}

if (count($skus) >= self::BATCH_SIZE) {
$hash = md5(json_encode($eventIds));
// send
try {
$result = $this->send($webhook, $skus);
$status = Event::STATUS_FINISHED;
} catch (Exception $e) {
$result = $e->getMessage();
$status = Event::STATUS_ERROR;
}
$this->updateEvents($eventIds, $status, $result, $hash);
$skus = [];
$eventIds = [];
}
}

if (count($skus) > 0) {
$hash = md5(json_encode($eventIds));
// send
try {
$result = $this->send($webhook, $skus);
$status = Event::STATUS_FINISHED;
} catch (Exception $e) {
$result = $e->getMessage();
$status = Event::STATUS_ERROR;
}
$this->updateEvents($eventIds, $status, $result, $hash);
$skus = [];
$eventIds = [];
}

$cnt = $cnt - $processed;
}
}
}
}

/**
* @return string
*/
private function send(Webhook $webhook, array $skus)
{
$client = $this->_getApiClient();
$payload = [
Event::PARAM_STORE_PRODUCT_SKUS => $skus
];

return $client->sendRequestPost($webhook->getUrl(), $payload, $webhook->getTopic(), $webhook->getSecret());
}

/**
* @return array<int, int> [[count => webhook_id]]
*/
private function prepareBatch(bool $pendingOnly = true)
{
/** @var AbstractCollection $eventCollection */
$eventCollection = $this->_eventFactory->create()->getCollection();
$select = $eventCollection->getSelect();

$select->reset(Zend_Db_Select::COLUMNS)
->columns([
'cnt' => new Zend_Db_Expr('COUNT(*)'),
'webhook_id' => 'webhook_id'
]);

if ($pendingOnly) {
$select->where('status = ?', Event::STATUS_PENDING);
}

$select->where('topic = ?', Webhook::WEBHOOK_TOPIC_PRODUCTS_BATCH)
->group('webhook_id')
->order('cnt');

$eventCollection->load();

$hooks = [];
foreach ($eventCollection as $item) {
$hooks[$item->getCnt()] = $item->getWebhookId();
}

krsort($hooks);
return $hooks;
}

private function updateEvents(array $eventIds, string $status, string $result, string $hash)
{
/** @var AbstractCollection $eventCollection */
$eventCollection = $this->_eventFactory->create()->getCollection();
$eventCollection->getConnection()->update(
$eventCollection->getMainTable(),
[
'status' => $status,
'message' => sprintf(
'%s: %s | %s',
__('Batched ID'),
$hash,
$result
)
],
[
'entity_id IN (?)' => $eventIds
]
);
}
}

0 comments on commit 81b30e5

Please sign in to comment.