diff --git a/CHANGELOG.md b/CHANGELOG.md index 233aa084..45df88d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Set minimum Go version to Go 1.23. [PR #811](https://github.com/riverqueue/river/pull/811). - Deprecate `river.JobInsertMiddlewareDefaults` and `river.WorkerMiddlewareDefaults` in favor of the more general `river.MiddlewareDefaults` embeddable struct. The two former structs will be removed in a future version. [PR #815](https://github.com/riverqueue/river/pull/815). +### Fixed + +- Cleanly error when attempting to add a queue at runtime to a `Client` which was not configured to run jobs (no `Workers`). [PR #826](https://github.com/riverqueue/river/pull/826). + ## [0.19.0] - 2025-03-16 ⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0 that has a worker's `Middleware` function now taking a non-generic `JobRow` parameter instead of a generic `Job[T]`. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile. diff --git a/client.go b/client.go index 6367df31..eb93292d 100644 --- a/client.go +++ b/client.go @@ -606,7 +606,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client testSignals: clientTestSignals{}, workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up } - client.queues = &QueueBundle{addProducer: client.addProducer} + client.queues = &QueueBundle{addProducer: client.addProducer, clientWillExecuteJobs: config.willExecuteJobs()} baseservice.Init(archetype, &client.baseService) client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is @@ -2160,6 +2160,8 @@ type QueueBundle struct { // Function that adds a producer to the associated client. addProducer func(queueName string, queueConfig QueueConfig) *producer + clientWillExecuteJobs bool + fetchCtx context.Context //nolint:containedctx // Mutex that's acquired when client is starting and stopping and when a @@ -2174,6 +2176,10 @@ type QueueBundle struct { // producer for the queue is started. Context is inherited from the one given to // Client.Start. func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { + if !b.clientWillExecuteJobs { + return errors.New("client is not configured to execute jobs, cannot add queue") + } + if err := queueConfig.validate(queueName); err != nil { return err } diff --git a/client_test.go b/client_test.go index 167d2248..ad774632 100644 --- a/client_test.go +++ b/client_test.go @@ -246,6 +246,19 @@ func Test_Client(t *testing.T) { riversharedtest.WaitOrTimeout(t, workedChan) }) + t.Run("Queues_Add_WhenClientWontExecuteJobs", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = nil + config.Workers = nil + client := newTestClient(t, bundle.dbPool, config) + + err := client.Queues().Add("new_queue", QueueConfig{MaxWorkers: 2}) + require.Error(t, err) + require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot add queue") + }) + t.Run("Queues_Add_BeforeStart", func(t *testing.T) { t.Parallel()