Skip to content

Commit

Permalink
Allow setting the tube on which a command is to be queued
Browse files Browse the repository at this point in the history
  • Loading branch information
Van Peer Ilias committed Jan 7, 2016
1 parent b9ba4a6 commit 3fbc91c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
22 changes: 14 additions & 8 deletions src/QMan.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,28 @@ public function __construct(

/**
* @param CommandInterface $command
* @param string $tube
* @param int $priority
* @param int $delay
* @param int $timeToRun
* @throws Exception
*/
public function queue(
CommandInterface $command,
$tube = Beanie::DEFAULT_TUBE,
$priority = Beanie::DEFAULT_PRIORITY,
$delay = Beanie::DEFAULT_DELAY,
$timeToRun = Beanie::DEFAULT_TIME_TO_RUN
) {
try {
$this->producer->put(
$this->serializer->serialize($command),
$priority,
$delay,
$timeToRun
);
$this->producer
->useTube($tube)
->put(
$this->serializer->serialize($command),
$priority,
$delay,
$timeToRun
);
} catch (Exception $exception) {
$this->handlePutFailure($exception, $command);
}
Expand All @@ -65,7 +70,7 @@ public function queue(
/**
* @param Exception $exception
* @param CommandInterface $command
* @throws AbstractServerException
* @throws Exception
*/
protected function handlePutFailure(Exception $exception, CommandInterface $command)
{
Expand All @@ -89,14 +94,15 @@ public function enableFallback()

public function queueClosure(
\Closure $closure,
$tube = Beanie::DEFAULT_TUBE,
$priority = Beanie::DEFAULT_PRIORITY,
$delay = Beanie::DEFAULT_DELAY,
$timeToRun = Beanie::DEFAULT_TIME_TO_RUN
) {
$closureCommand = new ClosureCommand();
$closureCommand->setClosure($closure);

$this->queue($closureCommand, $priority, $delay, $timeToRun);
$this->queue($closureCommand, $tube, $priority, $delay, $timeToRun);
}

/**
Expand Down
20 changes: 14 additions & 6 deletions tests/QMan/QManTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class QManTest extends \PHPUnit_Framework_TestCase
{
const TEST_TUBE = 'test';

/** @var \PHPUnit_Framework_MockObject_MockObject|Producer */
protected $producerMock;

Expand All @@ -31,7 +33,7 @@ public function setUp()
$this->producerMock = $this
->getMockBuilder(Producer::class)
->disableOriginalConstructor()
->setMethods(['put'])
->setMethods(['put', 'useTube'])
->getMock();

$this->serializerMock = $this
Expand All @@ -44,6 +46,12 @@ public function setUp()
->setMethods(['getType', 'getData', 'execute'])
->getMockForAbstractClass();

$this->producerMock
->expects($this->any())
->method('useTube')
->with($this->isType('string'))
->willReturnSelf();

$this->qMan = new QMan($this->producerMock, $this->serializerMock);
}

Expand Down Expand Up @@ -73,7 +81,7 @@ public function testQueue_queues(array $params)
call_user_func_array([$invocationMocker, 'with'], array_merge([$testData], $params));


call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock], $params));
call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock, self::TEST_TUBE], $params));
}

/**
Expand Down Expand Up @@ -104,7 +112,7 @@ public function testQueue_noFallback_exceptionThrown(array $params)
$invocationMocker->willThrowException($serverExceptionMock);


call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock], $params));
call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock, self::TEST_TUBE], $params));
}


Expand Down Expand Up @@ -134,7 +142,7 @@ public function testQueue_fallbackEnabled_exceptionThrown(array $params)
$invocationMocker->willThrowException(new \RuntimeException());


call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock], $params));
call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock, self::TEST_TUBE], $params));
}

/**
Expand Down Expand Up @@ -170,7 +178,7 @@ public function testQueue_fallbackEnabled_serverExceptionMeansTryAgain(array $pa
$invocationMocker->willThrowException($serverExceptionMock);


call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock], $params));
call_user_func_array([$this->qMan, 'queue'], array_merge([$this->commandMock, self::TEST_TUBE], $params));
}

/**
Expand Down Expand Up @@ -198,7 +206,7 @@ public function testQueueClosure_queues(array $params)
call_user_func_array([$invocationMocker, 'with'], array_merge([$testData], $params));


call_user_func_array([$this->qMan, 'queueClosure'], array_merge([$closure], $params));
call_user_func_array([$this->qMan, 'queueClosure'], array_merge([$closure, self::TEST_TUBE], $params));
}

public function testStaticCreate_createsFromListOfServerNames()
Expand Down

0 comments on commit 3fbc91c

Please sign in to comment.