Add Locking connection decorator for coroutine-safe access#82
Conversation
Wraps any Connection and serializes every command behind a mutex (utopia-php/lock) so concurrent coroutines sharing one connection cannot interleave requests and responses on the same socket. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
e1c26ea to
51fb3a5
Compare
Greptile SummaryIntroduces
Confidence Score: 5/5The change is purely additive — existing adapters are untouched and the decorator is opt-in — so there is no regression surface on current callers. The decorator correctly delegates all interface methods, the lock is acquired with a consistent wait-forever timeout, and the test suite covers ordering, argument forwarding, exception handling, and interface completeness. No logic gaps or correctness issues were found. No files require special attention. Important Files Changed
Reviews (2): Last reviewed commit: "Add unit tests for Locking connection de..." | Re-trigger Greptile |
| public function rightPop(string $queue, int $timeout): string|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPop($queue, $timeout)); | ||
| } | ||
|
|
||
| public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPopLeftPush($queue, $destination, $timeout)); | ||
| } | ||
|
|
||
| public function leftPush(string $queue, string $payload): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->leftPush($queue, $payload)); | ||
| } | ||
|
|
||
| public function leftPop(string $queue, int $timeout): string|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->leftPop($queue, $timeout)); | ||
| } |
There was a problem hiding this comment.
Blocking pops starve all other operations while waiting
rightPop, leftPop, and rightPopLeftPush (and their Array variants) hold the mutex for their full blocking timeout. In practice this means a worker coroutine sitting in a blocking pop for up to 2 s will prevent any other coroutine sharing this Locking instance from executing ping(), set(), get(), or any other command during that window. This is documented in the PR description as a known limitation, but callers using the same Locking instance for both queue polling and metadata operations (e.g. periodic heartbeats or stats updates) should be aware they will silently stall for the full pop timeout.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Spy Lock + spy Connection verify every Connection method runs its inner call wrapped in a single acquire/release pair, forwards its arguments, and returns the inner result. Also covers wait-forever acquire timeout, lock release on inner exception, the default Mutex, and a reflection guard so new Connection methods cannot ship without synchronization coverage. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What
Adds
Utopia\Queue\Connection\Locking— a decorator that wraps anyConnectionand serializes every command behind a single lock.Why
A single
\Redis/\RedisClustersocket cannot be shared by concurrent coroutines — their commands interleave on the wire and corrupt the request/response pairing. The decorator ensures only one command runs on the connection at a time.How
Utopia\Lock\Mutexfrom the newutopia-php/lockdependency, which gates on actual coroutine context (Coroutine::getCid() > 0) and degrades to a plain in-process flag outside Swoole (tests, Workerman, CLI).-1(wait-forever) timeout so commands queue rather than fail when the connection is momentarily busy.RedisandRedisClusterare untouched. Each wrapper method calls the inner connection directly, so there's no lock reentrancy/self-deadlock even for the composite*Arraymethods.Notes
rightPop/leftPop/rightPopLeftPush) hold the lock for their full timeout (POP_TIMEOUT= 2s), serializing other commands during the wait — inherent to sharing one socket.Verification
pint --test— PSR-12 pass (30 files)phpstan analyse— no errors🤖 Generated with Claude Code