Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

### Changed
Expand All @@ -18,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255)
- Add a 30-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) [PR #1263](https://github.com/riverqueue/river/pull/1263)
- Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262)

## [0.38.0] - 2026-05-22
Expand Down
8 changes: 8 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ type ExecutorTx interface {
//
// API is not stable. DO NOT USE.
Rollback(ctx context.Context) error

// SetLocalStatementTimeout sets a statement timeout local to the current
// transaction if supported by the underlying database. Some databases don't
// support this behavior, so this should be used in addition to context
// timeouts, not instead of them.
//
// API is not stable. DO NOT USE.
SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error
}

type GetListenenerParams struct {
Expand Down
11 changes: 11 additions & 0 deletions riverdriver/river_driver_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ func TestJobSetStateCancelled(t *testing.T) {
})
}

func TestPostgresStatementTimeoutValue(t *testing.T) {
t.Parallel()

require.Equal(t, "0ms", PostgresStatementTimeoutValue(0))
require.Equal(t, "1ms", PostgresStatementTimeoutValue(time.Nanosecond))
require.Equal(t, "1ms", PostgresStatementTimeoutValue(999*time.Microsecond))
require.Equal(t, "1ms", PostgresStatementTimeoutValue(time.Millisecond))
require.Equal(t, "2ms", PostgresStatementTimeoutValue(time.Millisecond+time.Nanosecond))
require.Equal(t, "1234ms", PostgresStatementTimeoutValue(1234*time.Millisecond))
}

func TestJobSetStateCompleted(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 8 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}

func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", riverdriver.PostgresStatementTimeoutValue(timeout))
}

type ExecutorSubTx struct {
Executor

Expand Down Expand Up @@ -1103,6 +1107,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
return nil
}

func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", riverdriver.PostgresStatementTimeoutValue(timeout))
}

func interpretError(err error) error {
if errors.Is(err, sql.ErrNoRows) {
return rivertype.ErrNotFound
Expand Down
22 changes: 22 additions & 0 deletions riverdriver/riverdrivertest/executor_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package riverdrivertest
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -161,6 +162,27 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,
})
})

t.Run("SetLocalStatementTimeout", func(t *testing.T) {
t.Parallel()

exec, driver := executorWithTx(ctx, t)

tx, err := exec.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback(ctx) })

require.NoError(t, tx.SetLocalStatementTimeout(ctx, 999*time.Microsecond))
require.NoError(t, tx.SetLocalStatementTimeout(ctx, 1234*time.Millisecond))

if driver.DatabaseName() == databaseNameSQLite {
return
}

var timeoutMilliseconds int64
require.NoError(t, tx.QueryRow(ctx, "SELECT setting::bigint FROM pg_settings WHERE name = 'statement_timeout'").Scan(&timeoutMilliseconds))
require.Equal(t, int64(1234), timeoutMilliseconds)
})

t.Run("PGAdvisoryXactLock", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}

func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", riverdriver.PostgresStatementTimeoutValue(timeout))
}

type Listener struct {
afterConnectExec string // should only ever be used in testing
conn *pgx.Conn
Expand Down
8 changes: 8 additions & 0 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}

func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return nil
}

type ExecutorSubTx struct {
Executor

Expand Down Expand Up @@ -1560,6 +1564,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
return nil
}

func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return nil
}

func interpretError(err error) error {
if errors.Is(err, sql.ErrNoRows) {
return rivertype.ErrNotFound
Expand Down
21 changes: 21 additions & 0 deletions riverdriver/statement_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package riverdriver

import (
"strconv"
"time"
)

// PostgresStatementTimeoutValue formats a duration for Postgres'
// statement_timeout setting.
//
// Postgres accepts statement_timeout values as whole milliseconds. Round
// positive sub-millisecond values up so they don't truncate to 0ms, which would
// disable the timeout.
func PostgresStatementTimeoutValue(timeout time.Duration) string {
milliseconds := timeout / time.Millisecond
if timeout > 0 && timeout%time.Millisecond != 0 {
milliseconds++
}

return strconv.FormatInt(int64(milliseconds), 10) + "ms"
}
22 changes: 18 additions & 4 deletions rivershared/riverpilot/standard_pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package riverpilot

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/dbutil"
"github.com/riverqueue/river/rivertype"
)

const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second

type StandardPilot struct {
seq atomic.Int64
}
Expand All @@ -23,10 +23,24 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
return nil, nil
}

ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded)
// Set an outer context timeout on locking jobs, and where possible (i.e. in
// Postgres, but not SQLite), set an inner `statement_timeout` inside a
// transaction so the configuration isn't durable. The error from the
// Postgres version will be better, so try to have that trigger first. It
// also minimizes the chances of a successful operation that locks jobs but
// then accidentally errors because it's run time was so close to the Go
// timeout.
const timeout = 30 * time.Second

ctx, cancel := context.WithTimeoutCause(ctx, timeout, context.DeadlineExceeded)
defer cancel()

return exec.JobGetAvailable(ctx, params)
return dbutil.WithTxV(ctx, exec, func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobRow, error) {
if err := execTx.SetLocalStatementTimeout(ctx, timeout-1*time.Second); err != nil {
return nil, fmt.Errorf("error setting statement timeout: %w", err)
}
return execTx.JobGetAvailable(ctx, params)
})
}

func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
Expand Down
15 changes: 15 additions & 0 deletions rivershared/riverpilot/standard_pilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -17,10 +18,24 @@ type standardPilotExecutorMock struct {
jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)
}

func (m *standardPilotExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) {
return &standardPilotExecutorTxMock{standardPilotExecutorMock: m}, nil
}

func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
return m.jobGetAvailableFunc(ctx, params)
}

type standardPilotExecutorTxMock struct {
*standardPilotExecutorMock
}

func (m *standardPilotExecutorTxMock) Commit(ctx context.Context) error { return nil }
func (m *standardPilotExecutorTxMock) Rollback(ctx context.Context) error { return nil }
func (m *standardPilotExecutorTxMock) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error {
return nil
}

func TestStandardPilot_JobGetAvailable(t *testing.T) {
t.Parallel()

Expand Down
Loading