Skip to content

Commit

Permalink
Reorganize Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
xemlock committed Jul 4, 2019
1 parent 591359c commit dd176c4
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 14 deletions.
5 changes: 4 additions & 1 deletion composer.json
Expand Up @@ -9,11 +9,14 @@
}
],
"require": {
"xemlock/zend1-composer-installer": "*",
"xemlock/zend1-composer-installer": "^0.1.0",
"doctrine/orm": "^2.4",
"symfony/yaml": "^2.7"
},
"autoload": {
"classmap": [
"Bootstrap.php"
],
"psr-0": {
"ManipleCore_": "library/"
},
Expand Down
10 changes: 8 additions & 2 deletions configs/resources.config.php
Expand Up @@ -36,11 +36,17 @@
),

'Maniple.Queue' => array(
'class' => 'ManipleCore_Queue_Service',
'class' => ManipleCore_Queue_Service::className,
'args' => array(
'resource:Zefram_Db',
'resource:Maniple.QueueAdapter',
'resource:SharedEventManager',
'resource:Log',
),
),
'Maniple.QueueAdapter' => array(
'class' => ManipleCore_Queue_Adapter_DbTable::className,
'args' => array(
'resource:Zefram_Db',
),
),
);
File renamed without changes.
@@ -1,9 +1,25 @@
<?php

class ManipleCore_Queue_Adapter extends Zend_Queue_Adapter_Db
class ManipleCore_Queue_Adapter_DbTable extends Zend_Queue_Adapter_Db
{
const className = __CLASS__;

/**
* @param Zefram_Db|Zend_Config|array $options
* @param Zend_Queue|null $queue
* @throws Zend_Queue_Exception
*/
public function __construct($options, Zend_Queue $queue = null)
{
if ($options instanceof Zefram_Db) {
$db = $options;
$options = array(
'dbAdapter' => $db->getAdapter(),
'tablePrefix' => $db->getTablePrefix(),
// Zend_Db_Select::FOR_UPDATE =>
);
}

if (isset($options['tablePrefix'])) {
$tablePrefix = $options['tablePrefix'];
unset($options['tablePrefix']);
Expand Down Expand Up @@ -47,10 +63,11 @@ public function create($name, $timeout = null)
/**
* Get messages in the queue
*
* @param integer $maxMessages Maximum number of messages to return
* @param integer $timeout Lock timeout in seconds
* @param integer $maxMessages Maximum number of messages to return
* @param integer $timeout Lock timeout in seconds
* @param Zend_Queue $queue
* @return Zend_Queue_Message_Iterator
* @throws
*/
public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null)
{
Expand Down
File renamed without changes.
Expand Up @@ -2,6 +2,8 @@

class ManipleCore_Queue_Service
{
const className = __CLASS__;

/**
* @var Zend_Queue_Adapter_AdapterInterface
*/
Expand All @@ -23,16 +25,13 @@ class ManipleCore_Queue_Service
protected $_logger;

/**
* @param Zefram_Db $db
* @param Zend_Queue_Adapter_AdapterInterface $adapter
* @param Zend_EventManager_SharedEventManager $sharedEvents
* @param Zend_Log $logger
*/
public function __construct(Zefram_Db $db, Zend_EventManager_SharedEventManager $sharedEvents, Zend_Log $logger = null)
public function __construct(Zend_Queue_Adapter_AdapterInterface $adapter, Zend_EventManager_SharedEventManager $sharedEvents, Zend_Log $logger = null)
{
$this->_adapter = new ManipleCore_Queue_Adapter(array(
'dbAdapter' => $db->getAdapter(),
'tablePrefix' => $db->getTablePrefix(),
));
$this->_adapter = $adapter;

$this->_events = new Zend_EventManager_EventManager();
$this->_events->setIdentifiers(array(
Expand Down Expand Up @@ -76,6 +75,7 @@ public function getLogger()
* Process messages from all queues
*
* @param int $maxMessages maximum number of messages to process
* @throws Zend_Queue_Exception
*/
public function process($maxMessages = 1)
{
Expand Down Expand Up @@ -136,6 +136,7 @@ public function process($maxMessages = 1)
/**
* @param string $name
* @return Zend_Queue
* @throws Zend_Queue_Exception
*/
public function openQueue($name)
{
Expand All @@ -151,7 +152,8 @@ public function openQueue($name)
*
* @param string $queue queue name
* @param mixed $message message body
* @return $this provides fluent interface
* @return $this
* @throws Zend_Queue_Exception
*/
public function sendMessage($queue, $message)
{
Expand All @@ -177,7 +179,8 @@ public function sendMessage($queue, $message)
*
* @param string|callable $queue queue name or message listener
* @param callable $listener message listener
* @return $this provides fluent interface
* @return $this
* @throws Zend_EventManager_Exception_InvalidArgumentException
*/
public function addListener($queue, $listener = null)
{
Expand Down

0 comments on commit dd176c4

Please sign in to comment.