Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Set minimum Go version to Go 1.23. [PR #811](https://github.com/riverqueue/river/pull/811).
- Deprecate `river.JobInsertMiddlewareDefaults` and `river.WorkerMiddlewareDefaults` in favor of the more general `river.MiddlewareDefaults` embeddable struct. The two former structs will be removed in a future version. [PR #815](https://github.com/riverqueue/river/pull/815).

### Fixed

- Cleanly error when attempting to add a queue at runtime to a `Client` which was not configured to run jobs (no `Workers`). [PR #826](https://github.com/riverqueue/river/pull/826).

## [0.19.0] - 2025-03-16

⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0 that has a worker's `Middleware` function now taking a non-generic `JobRow` parameter instead of a generic `Job[T]`. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile.
Expand Down
8 changes: 7 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
testSignals: clientTestSignals{},
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer}
client.queues = &QueueBundle{addProducer: client.addProducer, clientWillExecuteJobs: config.willExecuteJobs()}

baseservice.Init(archetype, &client.baseService)
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
Expand Down Expand Up @@ -2160,6 +2160,8 @@ type QueueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) *producer

clientWillExecuteJobs bool

fetchCtx context.Context //nolint:containedctx

// Mutex that's acquired when client is starting and stopping and when a
Expand All @@ -2174,6 +2176,10 @@ type QueueBundle struct {
// producer for the queue is started. Context is inherited from the one given to
// Client.Start.
func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
if !b.clientWillExecuteJobs {
return errors.New("client is not configured to execute jobs, cannot add queue")
}

if err := queueConfig.validate(queueName); err != nil {
return err
}
Expand Down
13 changes: 13 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,19 @@ func Test_Client(t *testing.T) {
riversharedtest.WaitOrTimeout(t, workedChan)
})

t.Run("Queues_Add_WhenClientWontExecuteJobs", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
config.Queues = nil
config.Workers = nil
client := newTestClient(t, bundle.dbPool, config)

err := client.Queues().Add("new_queue", QueueConfig{MaxWorkers: 2})
require.Error(t, err)
require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot add queue")
})

t.Run("Queues_Add_BeforeStart", func(t *testing.T) {
t.Parallel()

Expand Down
Loading