From c0fd716973f32d47361b84ac07111f558aded886 Mon Sep 17 00:00:00 2001 From: Jose Diaz-Gonzalez Date: Sat, 17 Sep 2016 15:06:46 -0600 Subject: [PATCH] Fix attempts handling for PDOEngine-based Engines Previously: - we would incorrectly set attempts, leaving old jobs in the database. - We rejected all jobs that were released - Job attributes were updated one by one *after* the lock was released. now, we update the attributes of the job all at once, allowing us to safely handle job releases. --- .../Queuesadilla/Engine/PdoEngine.php | 59 +++++++++++++------ src/josegonzalez/Queuesadilla/Job/Base.php | 6 +- .../Engine/AbstractPdoEngineTest.php | 4 +- 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/src/josegonzalez/Queuesadilla/Engine/PdoEngine.php b/src/josegonzalez/Queuesadilla/Engine/PdoEngine.php index 61d0b55..aa3b24d 100644 --- a/src/josegonzalez/Queuesadilla/Engine/PdoEngine.php +++ b/src/josegonzalez/Queuesadilla/Engine/PdoEngine.php @@ -203,31 +203,52 @@ public function queues() */ public function release($item, $options = []) { - $sql = sprintf('UPDATE %s SET locked = 0 WHERE id = ?', $this->config('table')); - $sth = $this->connection()->prepare($sql); - $sth->bindParam(1, $item['id'], PDO::PARAM_INT); - $sth->execute(); + if (isset($item['attempts']) && $item['attempts'] === 0) { + return $this->reject($item); + } + + $fields = [ + [ + 'type' => PDO::PARAM_INT, + 'key' => 'locked', + 'value' => 0, + ], + ]; if (isset($item['delay'])) { + $dateInterval = new DateInterval(sprintf('PT%sS', $item['delay'])); $datetime = new DateTime; - $delayUntil = $datetime->add(new DateInterval(sprintf('PT%sS', $item['delay'])))->format('Y-m-d H:i:s'); - - $sql = sprintf('UPDATE %s SET delay_until = ? WHERE id = ?', $this->config('table')); - $sth = $this->connection()->prepare($sql); - $sth->bindParam(1, $delayUntil, PDO::PARAM_STR); - $sth->bindParam(2, $item['id'], PDO::PARAM_INT); - $sth->execute(); + $delayUntil = $datetime->add($dateInterval) + ->format('Y-m-d H:i:s'); + $fields[] = [ + 'type' => PDO::PARAM_STR, + 'key' => 'delay_until', + 'value' => $delayUntil, + ]; } - if (isset($item['attempts']) && $item['attempts'] > 0) { - $sql = sprintf('UPDATE %s SET attempts = ? WHERE id = ?', $this->config('table')); - $sth = $this->connection()->prepare($sql); - $sth->bindParam(1, $item['attempts'], PDO::PARAM_INT); - $sth->bindParam(2, $item['id'], PDO::PARAM_INT); - $sth->execute(); - return $sth->rowCount() == 1; + $fields[] = [ + 'type' => PDO::PARAM_INT, + 'key' => 'attempts', + 'value' => (int)$item['attempts'], + ]; + } + $updateSql = []; + foreach ($fields as $config) { + $updateSql[] = sprintf('%1$s = :%1$s', $config['key']); + } + $sql = sprintf( + 'UPDATE %s SET %s WHERE id = :id', + $this->config('table'), + implode(', ', $updateSql) + ); + $sth = $this->connection()->prepare($sql); + foreach ($fields as $config) { + $sth->bindValue(sprintf(':%s', $config['key']), $config['value'], $config['type']); } - $this->reject($item); + $sth->bindValue(':id', (int)$item['id'], PDO::PARAM_INT); + $sth->execute(); + return $sth->rowCount() == 1; } diff --git a/src/josegonzalez/Queuesadilla/Job/Base.php b/src/josegonzalez/Queuesadilla/Job/Base.php index 447a11c..931d505 100644 --- a/src/josegonzalez/Queuesadilla/Job/Base.php +++ b/src/josegonzalez/Queuesadilla/Job/Base.php @@ -62,8 +62,10 @@ public function item() public function release($delay = 0) { - $this->item['attempts'] = 0; - if (isset($this->item['attempts']) && $this->item['attempts'] > 0) { + if (!isset($this->item['attempts'])) { + $this->item['attempts'] = 0; + } + if ($this->item['attempts'] > 0) { $this->item['attempts'] -= 1; } diff --git a/tests/josegonzalez/Queuesadilla/Engine/AbstractPdoEngineTest.php b/tests/josegonzalez/Queuesadilla/Engine/AbstractPdoEngineTest.php index 9cbb58e..b2d7abb 100644 --- a/tests/josegonzalez/Queuesadilla/Engine/AbstractPdoEngineTest.php +++ b/tests/josegonzalez/Queuesadilla/Engine/AbstractPdoEngineTest.php @@ -220,7 +220,7 @@ public function testRelease() $item = $this->Engine->pop(); $this->assertTrue($this->Engine->release($item)); $sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $this->Fixtures->default['first']['id']); - $this->assertFalse($sth->rowCount() == 1); + $this->assertTrue($sth->rowCount() == 0); $this->assertTrue($this->Engine->push($this->Fixtures->default['second'], [ 'attempts' => 10 @@ -233,7 +233,7 @@ public function testRelease() $date = new \DateTime(); $date->modify('+10 minutes'); - $sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $this->Fixtures->default['second']['id']); + $sth = $this->execute($this->Engine->connection(), 'SELECT * FROM jobs WHERE id = ' . $item2['id']); $results = $sth->fetch(PDO::FETCH_ASSOC); $inTenMinutes = $date->format('Y-m-d H:i:s');