Skip to content

Commit

Permalink
Merge pull request #48 from josegonzalez/fix-mysql-attempts
Browse files Browse the repository at this point in the history
Fix attempts handling for PDOEngine-based Engines
  • Loading branch information
josegonzalez committed Sep 17, 2016
2 parents 8ed2038 + c0fd716 commit a969693
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
59 changes: 40 additions & 19 deletions src/josegonzalez/Queuesadilla/Engine/PdoEngine.php
Expand Up @@ -203,31 +203,52 @@ public function queues()
*/
public function release($item, $options = [])
{
$sql = sprintf('UPDATE %s SET locked = 0 WHERE id = ?', $this->config('table'));
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $item['id'], PDO::PARAM_INT);
$sth->execute();
if (isset($item['attempts']) && $item['attempts'] === 0) {
return $this->reject($item);
}

$fields = [
[
'type' => PDO::PARAM_INT,
'key' => 'locked',
'value' => 0,
],
];

if (isset($item['delay'])) {
$dateInterval = new DateInterval(sprintf('PT%sS', $item['delay']));
$datetime = new DateTime;
$delayUntil = $datetime->add(new DateInterval(sprintf('PT%sS', $item['delay'])))->format('Y-m-d H:i:s');

$sql = sprintf('UPDATE %s SET delay_until = ? WHERE id = ?', $this->config('table'));
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $delayUntil, PDO::PARAM_STR);
$sth->bindParam(2, $item['id'], PDO::PARAM_INT);
$sth->execute();
$delayUntil = $datetime->add($dateInterval)
->format('Y-m-d H:i:s');
$fields[] = [
'type' => PDO::PARAM_STR,
'key' => 'delay_until',
'value' => $delayUntil,
];
}

if (isset($item['attempts']) && $item['attempts'] > 0) {
$sql = sprintf('UPDATE %s SET attempts = ? WHERE id = ?', $this->config('table'));
$sth = $this->connection()->prepare($sql);
$sth->bindParam(1, $item['attempts'], PDO::PARAM_INT);
$sth->bindParam(2, $item['id'], PDO::PARAM_INT);
$sth->execute();
return $sth->rowCount() == 1;
$fields[] = [
'type' => PDO::PARAM_INT,
'key' => 'attempts',
'value' => (int)$item['attempts'],
];
}
$updateSql = [];
foreach ($fields as $config) {
$updateSql[] = sprintf('%1$s = :%1$s', $config['key']);
}
$sql = sprintf(
'UPDATE %s SET %s WHERE id = :id',
$this->config('table'),
implode(', ', $updateSql)
);
$sth = $this->connection()->prepare($sql);
foreach ($fields as $config) {
$sth->bindValue(sprintf(':%s', $config['key']), $config['value'], $config['type']);
}
$this->reject($item);
$sth->bindValue(':id', (int)$item['id'], PDO::PARAM_INT);
$sth->execute();

return $sth->rowCount() == 1;
}

Expand Down
6 changes: 4 additions & 2 deletions src/josegonzalez/Queuesadilla/Job/Base.php
Expand Up @@ -62,8 +62,10 @@ public function item()

public function release($delay = 0)
{
$this->item['attempts'] = 0;
if (isset($this->item['attempts']) && $this->item['attempts'] > 0) {
if (!isset($this->item['attempts'])) {
$this->item['attempts'] = 0;
}
if ($this->item['attempts'] > 0) {
$this->item['attempts'] -= 1;
}

Expand Down
Expand Up @@ -220,7 +220,7 @@ public function testRelease()
$item = $this->Engine->pop();
$this->assertTrue($this->Engine->release($item));
$sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $this->Fixtures->default['first']['id']);
$this->assertFalse($sth->rowCount() == 1);
$this->assertTrue($sth->rowCount() == 0);

$this->assertTrue($this->Engine->push($this->Fixtures->default['second'], [
'attempts' => 10
Expand All @@ -233,7 +233,7 @@ public function testRelease()

$date = new \DateTime();
$date->modify('+10 minutes');
$sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $this->Fixtures->default['second']['id']);
$sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $item2['id']);
$results = $sth->fetch(PDO::FETCH_ASSOC);
$inTenMinutes = $date->format('Y-m-d H:i:s');

Expand Down

0 comments on commit a969693

Please sign in to comment.