Skip to content

Commit

Permalink
Merge pull request #44 from scherersoftware/master
Browse files Browse the repository at this point in the history
Implement attempts functionality for PDO-backend engines
  • Loading branch information
josegonzalez committed Jun 30, 2016
2 parents d132dc3 + 2f30ab1 commit d40e08a
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 66 deletions.
1 change: 1 addition & 0 deletions config/schema-mysql.sql
Expand Up @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS `jobs` (
`expires_at` datetime DEFAULT NULL,
`delay_until` datetime DEFAULT NULL,
`locked` tinyint(1) NOT NULL DEFAULT '0',
`attempts`int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `queue` (`queue`,`locked`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3 changes: 2 additions & 1 deletion config/schema-pgsql.sql
Expand Up @@ -9,6 +9,7 @@ CREATE TABLE jobs (
priority smallint NOT NULL DEFAULT '0',
expires_at timestamp DEFAULT NULL,
delay_until timestamp DEFAULT NULL,
locked smallint NOT NULL DEFAULT '0'
locked smallint NOT NULL DEFAULT '0',
attempts smallint DEFAULT '0'
);
CREATE INDEX queue ON JOBS (queue, locked);
2 changes: 1 addition & 1 deletion docs/defining-jobs.md
Expand Up @@ -4,7 +4,7 @@ Jobs are simply PHP callables. Job callables should be available to the worker p

Job callables receive a `Job` instance (not to be confused with your own own jobs), which is a wrapper around the metadata for a given job. The two useful methods of this `Job` instance are:

- `attempts()`: Contains the number of times a job has been attempted. This value may be incorrect, depending upon the semantics of your chosen backend.
- `attempts()`: Contains the number of attempts a job has has left before being discarded. This value may be incorrect, depending upon the semantics of your chosen backend
- `data($key = null, $default = null)`: Returns the job payload. If the first argument is passed, then the method will return only that key in the job payload if it exists. You can also fallback to a `$default` value if said key does not exist in the payload

### Bare Functions
Expand Down
40 changes: 0 additions & 40 deletions schema-pgsql.sql

This file was deleted.

2 changes: 2 additions & 0 deletions src/josegonzalez/Queuesadilla/Engine/MysqlEngine.php
Expand Up @@ -34,6 +34,8 @@ class MysqlEngine extends PdoEngine
'port' => 3306,
'priority' => 0,
'queue' => 'default',
'attempts' => 0,
'attempts_delay' => 600,
'host' => '127.0.0.1',
'table' => 'jobs',
];
Expand Down
34 changes: 31 additions & 3 deletions src/josegonzalez/Queuesadilla/Engine/PdoEngine.php
Expand Up @@ -63,7 +63,7 @@ public function pop($options = [])
$queue = $this->setting($options, 'queue');
$selectSql = implode(" ", [
sprintf(
'SELECT id, %s FROM %s',
'SELECT id, %s, attempts FROM %s',
$this->quoteIdentifier('data'),
$this->quoteIdentifier($this->config('table'))
),
Expand Down Expand Up @@ -100,6 +100,7 @@ public function pop($options = [])
'args' => $data['args'],
'queue' => $queue,
'options' => $data['options'],
'attempts' => (int)$result['attempts']
];
}
}
Expand All @@ -125,6 +126,8 @@ public function push($item, $options = [])
$expiresIn = $this->setting($options, 'expires_in');
$queue = $this->setting($options, 'queue');
$priority = $this->setting($options, 'priority');
$attempts = $this->setting($options, 'attempts');
$attemptsDelay = $this->setting($options, 'attempts_delay');

$delayUntil = null;
if ($delay !== null) {
Expand All @@ -139,25 +142,29 @@ public function push($item, $options = [])
}

unset($options['queue']);
unset($options['attempts']);
$item['options'] = $options;
$item['options']['attempts_delay'] = $attemptsDelay;
$data = json_encode($item);

$sql = 'INSERT INTO %s (%s, %s, %s, %s, %s) VALUES (?, ?, ?, ?, ?)';
$sql = 'INSERT INTO %s (%s, %s, %s, %s, %s, %s) VALUES (?, ?, ?, ?, ?, ?)';
$sql = sprintf(
$sql,
$this->quoteIdentifier($this->config('table')),
$this->quoteIdentifier('data'),
$this->quoteIdentifier('queue'),
$this->quoteIdentifier('priority'),
$this->quoteIdentifier('expires_at'),
$this->quoteIdentifier('delay_until')
$this->quoteIdentifier('delay_until'),
$this->quoteIdentifier('attempts')
);
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $data, PDO::PARAM_STR);
$sth->bindParam(2, $queue, PDO::PARAM_STR);
$sth->bindParam(3, $priority, PDO::PARAM_INT);
$sth->bindParam(4, $expiresAt, PDO::PARAM_STR);
$sth->bindParam(5, $delayUntil, PDO::PARAM_STR);
$sth->bindParam(6, $attempts, PDO::PARAM_INT);
$sth->execute();

if ($sth->rowCount() == 1) {
Expand Down Expand Up @@ -200,6 +207,27 @@ public function release($item, $options = [])
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $item['id'], PDO::PARAM_INT);
$sth->execute();

if (isset($item['delay'])) {
$datetime = new DateTime;
$delayUntil = $datetime->add(new DateInterval(sprintf('PT%sS', $item['delay'])))->format('Y-m-d H:i:s');

$sql = sprintf('UPDATE %s SET delay_until = ? WHERE id = ?', $this->config('table'));
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $delayUntil, PDO::PARAM_STR);
$sth->bindParam(2, $item['id'], PDO::PARAM_INT);
$sth->execute();
}

if (isset($item['attempts']) && $item['attempts'] > 0) {
$sql = sprintf('UPDATE %s SET attempts = ? WHERE id = ?', $this->config('table'));
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $item['attempts'], PDO::PARAM_INT);
$sth->bindParam(2, $item['id'], PDO::PARAM_INT);
$sth->execute();
return $sth->rowCount() == 1;
}
$this->reject($item);
return $sth->rowCount() == 1;
}

Expand Down
2 changes: 2 additions & 0 deletions src/josegonzalez/Queuesadilla/Engine/PostgresEngine.php
Expand Up @@ -34,6 +34,8 @@ class PostgresEngine extends PdoEngine
'port' => 5432,
'priority' => 0,
'queue' => 'default',
'attempts' => 0,
'attempts_delay' => 600,
'host' => '127.0.0.1',
'table' => 'jobs',
];
Expand Down
9 changes: 6 additions & 3 deletions src/josegonzalez/Queuesadilla/Job/Base.php
Expand Up @@ -62,12 +62,15 @@ public function item()

public function release($delay = 0)
{
if (!isset($this->item['attempts'])) {
$this->item['attempts'] = 0;
$this->item['attempts'] = 0;
if (isset($this->item['attempts']) && $this->item['attempts'] > 0) {
$this->item['attempts'] -= 1;
}

$this->item['attempts'] += 1;
$this->item['delay'] = $delay;
if (isset($this->item['options']['attempts_delay'])) {
$this->item['delay'] = $this->item['options']['attempts_delay'];
}
return $this->engine->release($this->item);
}

Expand Down
40 changes: 39 additions & 1 deletion tests/josegonzalez/Queuesadilla/Engine/AbstractPdoEngineTest.php
Expand Up @@ -23,6 +23,7 @@ public function setUp()
$this->Engine = $this->mockEngine();
$this->Fixtures = new FixtureData;
$this->clearEngine();
$this->expandFixtureData();
}

/**
Expand Down Expand Up @@ -214,6 +215,30 @@ public function testRelease()
$this->markTestSkipped('No connection to database available');
}
$this->assertFalse($this->Engine->release(null, 'default'));

$this->Engine->push($this->Fixtures->default['first'], 'default');
$item = $this->Engine->pop();
$this->assertTrue($this->Engine->release($item));
$sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $this->Fixtures->default['first']['id']);
$this->assertFalse($sth->rowCount() == 1);

$this->assertTrue($this->Engine->push($this->Fixtures->default['second'], [
'attempts' => 10
]));

$item2 = $this->Engine->pop();
$item2['attempts'] = 9;
$item2['delay'] = $item2['options']['attempts_delay'];
$this->assertTrue($this->Engine->release($item2));

$date = new \DateTime();
$date->modify('+10 minutes');
$sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $this->Fixtures->default['second']['id']);
$results = $sth->fetch(PDO::FETCH_ASSOC);
$inTenMinutes = $date->format('Y-m-d H:i:s');

$this->assertEquals($inTenMinutes, $results['delay_until']);
$this->assertEquals(9, $results['attempts']);
}

/**
Expand Down Expand Up @@ -272,11 +297,24 @@ protected function execute($connection, $sql)
}
}

protected function expandFixtureData() {
foreach ($this->Fixtures->default as &$default) {
$default['options']['attempts_delay'] = 600;
}
foreach ($this->Fixtures->other as &$other) {
$other['options']['attempts_delay'] = 600;
}
}

protected function mockEngine($methods = null, $config = null)
{
if ($config === null) {
$config = $this->config;
}
return $this->getMock($this->engineClass, $methods, [$this->Logger, $config]);

return $this->getMockBuilder($this->engineClass)
->setMethods($methods)
->setConstructorArgs([$this->Logger, $config])
->getMock();
}
}
Expand Up @@ -213,6 +213,10 @@ protected function mockEngine($methods = null, $config = null)
if ($config === null) {
$config = $this->config;
}
return $this->getMock($this->engineClass, $methods, [$this->Logger, $config]);

return $this->getMockBuilder($this->engineClass)
->setMethods($methods)
->setConstructorArgs([$this->Logger, $config])
->getMock();
}
}
6 changes: 5 additions & 1 deletion tests/josegonzalez/Queuesadilla/Engine/MemoryEngineTest.php
Expand Up @@ -165,6 +165,10 @@ protected function mockEngine($methods = null, $config = null)
if ($config === null) {
$config = $this->config;
}
return $this->getMock($this->engineClass, $methods, [$this->Logger, $config]);

return $this->getMockBuilder($this->engineClass)
->setMethods($methods)
->setConstructorArgs([$this->Logger, $config])
->getMock();
}
}
6 changes: 5 additions & 1 deletion tests/josegonzalez/Queuesadilla/Engine/NullEngineTest.php
Expand Up @@ -193,6 +193,10 @@ protected function mockEngine($methods = null, $config = null)
if ($config === null) {
$config = $this->config;
}
return $this->getMock($this->engineClass, $methods, [$this->Logger, $config]);

return $this->getMockBuilder($this->engineClass)
->setMethods($methods)
->setConstructorArgs([$this->Logger, $config])
->getMock();
}
}
6 changes: 5 additions & 1 deletion tests/josegonzalez/Queuesadilla/Engine/RedisEngineTest.php
Expand Up @@ -216,6 +216,10 @@ protected function mockEngine($methods = null, $config = null)
if ($config === null) {
$config = $this->config;
}
return $this->getMock($this->engineClass, $methods, [$this->Logger, $config]);

return $this->getMockBuilder($this->engineClass)
->setMethods($methods)
->setConstructorArgs([$this->Logger, $config])
->getMock();
}
}
Expand Up @@ -146,6 +146,10 @@ protected function mockEngine($methods = null, $config = null)
if ($config === null) {
$config = $this->config;
}
return $this->getMock($this->engineClass, $methods, [$this->Logger, $config]);

return $this->getMockBuilder($this->engineClass)
->setMethods($methods)
->setConstructorArgs([$this->Logger, $config])
->getMock();
}
}
16 changes: 8 additions & 8 deletions tests/josegonzalez/Queuesadilla/FixtureData.php
Expand Up @@ -5,16 +5,16 @@
class FixtureData
{
public $default = [
'first' => ['id' => '1', 'class' => null, 'args' => [], 'options' => [], 'queue' => 'default'],
'second' => ['id' => '2', 'class' => 'some_function', 'args' => [], 'options' => [], 'queue' => 'default'],
'third' => ['id' => '3', 'class' => 'another_function', 'args' => [], 'options' => [], 'queue' => 'default'],
'fourth' => ['id' => '4', 'class' => 'yet_another_function', 'args' => [], 'options' => [], 'queue' => 'default'],
'first' => ['id' => '1', 'class' => null, 'args' => [], 'options' => [], 'queue' => 'default', 'attempts' => 0],
'second' => ['id' => '2', 'class' => 'some_function', 'args' => [], 'options' => [], 'queue' => 'default', 'attempts' => 0],
'third' => ['id' => '3', 'class' => 'another_function', 'args' => [], 'options' => [], 'queue' => 'default', 'attempts' => 0],
'fourth' => ['id' => '4', 'class' => 'yet_another_function', 'args' => [], 'options' => [], 'queue' => 'default', 'attempts' => 0],
];

public $other = [
'first' => ['id' => '1', 'class' => null, 'args' => [], 'options' => [], 'queue' => 'other'],
'second' => ['id' => '2', 'class' => 'some_function', 'args' => [], 'options' => [], 'queue' => 'other'],
'third' => ['id' => '3', 'class' => 'another_function', 'args' => [], 'options' => [], 'queue' => 'other'],
'fourth' => ['id' => '4', 'class' => 'yet_another_function', 'args' => [], 'options' => [], 'queue' => 'other'],
'first' => ['id' => '1', 'class' => null, 'args' => [], 'options' => [], 'queue' => 'other', 'attempts' => 0],
'second' => ['id' => '2', 'class' => 'some_function', 'args' => [], 'options' => [], 'queue' => 'other', 'attempts' => 0],
'third' => ['id' => '3', 'class' => 'another_function', 'args' => [], 'options' => [], 'queue' => 'other', 'attempts' => 0],
'fourth' => ['id' => '4', 'class' => 'yet_another_function', 'args' => [], 'options' => [], 'queue' => 'other', 'attempts' => 0],
];
}
6 changes: 3 additions & 3 deletions tests/josegonzalez/Queuesadilla/Job/BaseTest.php
Expand Up @@ -193,7 +193,7 @@ public function testRelease()
$this->assertTrue($this->Jobs[0]->release(10));
$this->assertEquals([
'id' => 1,
'attempts' => 1,
'attempts' => 0,
'delay' => 10,
'class' => 'Foo',
'queue' => 'default',
Expand All @@ -209,7 +209,7 @@ public function testRelease()
$this->assertFalse($this->Jobs[1]->release());
$this->assertEquals([
'id' => 2,
'attempts' => 1,
'attempts' => 0,
'delay' => 0,
'class' => 'Foo',
'queue' => 'default',
Expand All @@ -224,7 +224,7 @@ public function testRelease()
$this->assertFalse($this->Jobs[2]->release());
$this->assertEquals([
'id' => 3,
'attempts' => 2,
'attempts' => 0,
'delay' => 0,
'class' => 'Foo',
'queue' => 'default',
Expand Down
Expand Up @@ -99,7 +99,9 @@ public function testWork()
$Worker = new SequentialWorker($Engine);
$this->assertFalse($Worker->work());

$Engine = $this->getMock('josegonzalez\Queuesadilla\Engine\NullEngine', ['pop']);
$Engine = $this->getMockBuilder('josegonzalez\Queuesadilla\Engine\NullEngine')
->setMethods(['pop'])
->getMock();
$Engine->expects($this->at(0))
->method('pop')
->will($this->returnValue(true));
Expand Down

0 comments on commit d40e08a

Please sign in to comment.