From d7c6bd85ef82d76da5c6b618a17ecb04ca9c5e32 Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 1 May 2026 10:59:43 -0400 Subject: [PATCH] Patch unsafe producer map access While doing a little work on #1235, my LLM flagged that the way that we're accessing `producersByQueueName` today is already a little unsafe as it may be read in multiple places concurrently. There's a mutex to protect it somewhat in the `QueueBundle`, but it may still race between a change there a "notify producer" send. Here, add one additional `RWMutex` that makes sure to synchronize read access on the map. --- CHANGELOG.md | 4 ++++ client.go | 22 ++++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e98f9555..048c1aa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236). + ## [0.35.1] - 2026-04-26 ### Fixed diff --git a/client.go b/client.go index b60addcd..d4e0ec40 100644 --- a/client.go +++ b/client.go @@ -636,6 +636,7 @@ type Client[TTx any] struct { periodicJobs *PeriodicJobBundle pilot riverpilot.Pilot producersByQueueName map[string]*producer + producersMu sync.RWMutex queueMaintainer *maintenance.QueueMaintainer queueMaintainerLeader *maintenance.QueueMaintainerLeader queues *QueueBundle @@ -1959,7 +1960,14 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*rivertype. // transaction, the producer wouldn't yet be able to access the new jobs that // triggered the notification because they're not committed yet. func (c *Client[TTx]) notifyProducerWithoutListenerJobFetch(_ context.Context, res []*rivertype.JobInsertResult) { - if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 { + if c.driver.SupportsListener() { + return + } + + c.producersMu.RLock() + defer c.producersMu.RUnlock() + + if len(c.producersByQueueName) < 1 { return } @@ -2169,6 +2177,9 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error { } func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*producer, error) { + c.producersMu.Lock() + defer c.producersMu.Unlock() + if _, alreadyExists := c.producersByQueueName[queueName]; alreadyExists { return nil, &QueueAlreadyAddedError{Name: queueName} } @@ -2722,7 +2733,14 @@ func (c *Client[TTx]) QueueUpdateTx(ctx context.Context, tx TTx, name string, pa // transaction, the producer wouldn't yet be able to access the state that // triggered the notification because it's not committed yet. func (c *Client[TTx]) notifyProducerWithoutListenerQueueControlEvent(queue string, controlEvent *controlEventPayload) { - if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 { + if c.driver.SupportsListener() { + return + } + + c.producersMu.RLock() + defer c.producersMu.RUnlock() + + if len(c.producersByQueueName) < 1 { return }