Skip to content
Permalink
Browse files

bug #31387 [Messenger] Fix Redis Connection::get() after reject() (ch…

…alasr)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Fix Redis Connection::get() after reject()

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  |no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | n/a
| License       | MIT
| Doc PR        | n/a

If a message is rejected, another consumer cannot read from the stream because the first subsequent call to `\Redis::xreadgroup()` returns false for some reason.
Reproducer: https://github.com/chalasr/redis-transport-bug

ping @alexander-schranz

Commits
-------

c05273f [Messenger] Fix Redis Connection::get() after reject()
  • Loading branch information...
fabpot committed May 6, 2019
2 parents bee5216 + c05273f commit 4e61ff535eca3733939b8c5462c28cb3d50f3c44
@@ -112,4 +112,22 @@ public function testUnexpectedRedisError()
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
public function testGetAfterReject()
{
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
try {
$connection->setup();
} catch (TransportException $e) {
}
$connection->add('1', []);
$connection->add('2', []);
$failing = $connection->get();
$connection->reject($failing['id']);
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
$this->assertNotNull($connection->get());
}
}
@@ -89,7 +89,7 @@ public function get(): ?array
} catch (\RedisException $e) {
}
if (false === $messages || $e) {
if ($e || (false === $messages && !$this->couldHavePendingMessages)) {
throw new TransportException(
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
);
@@ -123,7 +123,7 @@ public function ack(string $id): void
} catch (\RedisException $e) {
}
if (!$acknowledged || $e) {
if ($e || !$acknowledged) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
}
}
@@ -136,7 +136,7 @@ public function reject(string $id): void
} catch (\RedisException $e) {
}
if (!$deleted || $e) {
if ($e || !$deleted) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
}
}
@@ -151,7 +151,7 @@ public function add(string $body, array $headers)
} catch (\RedisException $e) {
}
if (!$added || $e) {
if ($e || !$added) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
}
}

0 comments on commit 4e61ff5

Please sign in to comment.
You can’t perform that action at this time.