Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).
- Fix in `Client.Start` where previously it was possible for a River client that only partially started before erroring to not try to start on subsequent `Start` invocations. [PR #1187](https://github.com/riverqueue/river/pull/1187).

## [0.32.0] - 2026-03-23
Expand Down
125 changes: 103 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/dbutil"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivershared/util/valutil"
Expand Down Expand Up @@ -619,7 +620,9 @@ type Client[TTx any] struct {

// Test-only signals.
type clientTestSignals struct {
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt
queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted

jobCleaner *maintenance.JobCleanerTestSignals
jobRescuer *maintenance.JobRescuerTestSignals
Expand All @@ -631,6 +634,8 @@ type clientTestSignals struct {

func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
ts.electedLeader.Init(tb)
ts.queueMaintainerStartError.Init(tb)
ts.queueMaintainerStartRetriesExhausted.Init(tb)

if ts.jobCleaner != nil {
ts.jobCleaner.Init(tb)
Expand Down Expand Up @@ -1279,26 +1284,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte
}

func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error {
handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) {
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))

switch {
case notification.IsLeader:
// Starting the queue maintainer can take a little time so send to
// this test signal _first_ so tests waiting on it can finish,
// cancel the queue maintainer start, and overall run much faster.
c.testSignals.electedLeader.Signal(struct{}{})

if err := c.queueMaintainer.Start(ctx); err != nil {
c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error()))
}

default:
c.queueMaintainer.Stop()
}
}

if !shouldStart {
return nil
}
Expand All @@ -1310,20 +1295,116 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
sub := c.elector.Listen()
defer sub.Unlisten()

// Cancel function for an in-progress tryStartQueueMaintainer. If
// leadership is lost while the start process is still retrying, this
// is used to abort it promptly instead of waiting for retries to
// finish.
var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {}

for {
select {
case <-ctx.Done():
cancelQueueMaintainerStart(context.Cause(ctx))
return

case notification := <-sub.C():
handleLeadershipChange(ctx, notification)
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))

switch {
case notification.IsLeader:
// Starting the queue maintainer can take a little time so
// send to this test signal first so tests waiting on it
// can finish, cancel the queue maintainer start, and
// overall run much faster.
c.testSignals.electedLeader.Signal(struct{}{})

// Start the queue maintainer with a few retries and
// exponential backoff in a separate goroutine so the
// leadership change loop remains responsive to new
// notifications. startCtx is used for cancellation in case
// leadership is lost while retries are in progress.
var startCtx context.Context
startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx)
go c.tryStartQueueMaintainer(startCtx)

default:
// Cancel any in-progress start attempts before stopping. We
// sent a startstop.ErrStop to make sure services like
// Reindexer run any specific cleanup code for stops.
cancelQueueMaintainerStart(startstop.ErrStop)
cancelQueueMaintainerStart = func(_ error) {}

c.queueMaintainer.Stop()
}
}
}
}()

return nil
}

// Tries to start the queue maintainer after gaining leadership. We allow some
// retries with exponential backoff in case of failure, and in case the queue
// maintainer can't be started, we request resignation to allow another client
// to try and take over.
func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) {
const maxStartAttempts = 3

ctxCancelled := func() bool {
if ctx.Err() != nil {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled")
return true
}
return false
}

var lastErr error
for attempt := 1; attempt <= maxStartAttempts; attempt++ {
if ctxCancelled() {
return
}

if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil {
return
}

c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer",
slog.String("err", lastErr.Error()), slog.Int("attempt", attempt))

c.testSignals.queueMaintainerStartError.Signal(lastErr)

// If Start blocked long enough for this context to be cancelled
// (e.g. leadership was lost), bail out immediately. A newer
// leadership term may already have started the maintainer, and
// calling Stop here would tear it down.
if ctxCancelled() {
return
}

// Stop the queue maintainer to fully reset its state (and any
// sub-services) before retrying.
c.queueMaintainer.Stop()
Comment on lines +1385 to +1387

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip stale retry before stopping queue maintainer

tryStartQueueMaintainer stops the shared maintainer on every start error, even when this retry context has already been canceled due to a leadership loss. If leadership flips (lose then regain) while an earlier Start call is still blocked (for example in a hook that does not return promptly on cancellation), that older goroutine can return later and execute Stop against the newer leader term’s successfully started maintainer, taking maintenance back offline unexpectedly. Add a cancellation/epoch check immediately after Start returns and before calling Stop so stale retries cannot affect a newer leadership term.

Useful? React with 👍 / 👎.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex re-review

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent stale start retries from stopping new leadership term

This Stop call can run from an older tryStartQueueMaintainer goroutine after leadership has changed, because cancellation is only checked just before this line and not synchronized with the stop itself. If leadership is lost between that check and this call, then quickly regained, the stale goroutine can stop a queue maintainer that was started by the newer term, leaving maintenance offline while this client is still leader. Add a term/epoch guard or a cancellation check that is atomic with the stop decision to avoid cross-term teardown.

Useful? React with 👍 / 👎.


if attempt < maxStartAttempts {
serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault))
}
}

if ctxCancelled() {
return
}

c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation",
slog.String("err", lastErr.Error()))

c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{})

if err := c.clientNotifyBundle.RequestResign(ctx); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Abort stale resign requests after leadership loss

tryStartQueueMaintainer runs inside the same goroutine that consumes elector.Listen() notifications, so demotion events are queued until retries/backoff finish. If leadership is lost during those retries, this unconditional RequestResign call can run after another client has already become leader and force that healthy leader to resign, creating unnecessary leadership churn and maintenance interruptions. Please gate this path on current leadership (or cancel retries when a non-leader notification arrives) before sending resign notifications.

Useful? React with 👍 / 👎.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dang that is a subtle one @brandur 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man, yes that's a good one.

Should be addressed now I think.

@codex re-review

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex re-review

c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error()))
}
}

// Driver exposes the underlying driver used by the client.
//
// API is not stable. DO NOT USE.
Expand Down
25 changes: 25 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5145,6 +5145,31 @@ func Test_Client_Maintenance(t *testing.T) {
require.True(t, svc.RemoveByID("new_periodic_job"))
})

t.Run("QueueMaintainerStartRetriesAndResigns", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.Hooks = []rivertype.Hook{
HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error {
return errors.New("hook start error")
}),
}

client, _ := setup(t, config)

startClient(ctx, t, client)
client.testSignals.electedLeader.WaitOrTimeout()

// Wait for all 3 retry attempts to fail.
for range 3 {
err := client.testSignals.queueMaintainerStartError.WaitOrTimeout()
require.EqualError(t, err, "hook start error")
}

// After all retries exhausted, the client should request resignation.
client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout()
})

t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) {
t.Parallel()

Expand Down
53 changes: 32 additions & 21 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,31 +318,42 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {

s.StaggerStart(ctx)

initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{
Schema: s.Config.Schema,
})
if err != nil {
return err
}
var (
initialPeriodicJobs []*riverpilot.PeriodicJob
subServices []startstop.Service
)
if err := func() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up indenting all of this in so that there's a single if err != nil check that we can call stopped() in at the end. Previously, stopped wasn't being invoked in some error branches which could lead this service erroring on start and never really stopping :/

var err error
initialPeriodicJobs, err = s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{
Schema: s.Config.Schema,
})
if err != nil {
return err
}

for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) {
if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert
DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob {
return (*rivertype.DurablePeriodicJob)(job)
}),
}); err != nil {
for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) {
if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert
DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob {
return (*rivertype.DurablePeriodicJob)(job)
}),
}); err != nil {
return err
}
}

subServices = []startstop.Service{
startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically),
}
stopServicesOnError := func() {
startstop.StopAllParallel(subServices...)
}
if err := startstop.StartAll(ctx, subServices...); err != nil {
stopServicesOnError()
return err
}
}

subServices := []startstop.Service{
startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically),
}
stopServicesOnError := func() {
startstop.StopAllParallel(subServices...)
}
if err := startstop.StartAll(ctx, subServices...); err != nil {
stopServicesOnError()
return nil
}(); err != nil {
stopped()
return err
}
Expand Down
2 changes: 2 additions & 0 deletions internal/maintenance/queue_maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (m *QueueMaintainer) Start(ctx context.Context) error {

for _, service := range m.servicesByName {
if err := service.Start(ctx); err != nil {
startstop.StopAllParallel(maputil.Values(m.servicesByName)...)
stopped()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, make sure stopped() is invoked in the error condition. I really need to go look at this startstop API again to see if we can make this safer.

return err
}
}
Expand Down
Loading