Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Exposed `TestConfig` struct on `Config` under the `Test` field for configuration that is specific to test environments. For now, the only field on this type is `Time`, which can be used to set a synthetic `TimeGenerator` for tests. A stubbable time generator was added as `rivertest.TimeStub` to allow time to be easily stubbed in tests. [PR #754](https://github.com/riverqueue/river/pull/754).
- New `rivertest.Worker` type to make it significantly easier to test River workers. Either real or synthetic jobs can be worked using this interface, generally without requiring any database interactions. The `Worker` type provides a realistic execution environment with access to the full range of River features, including `river.ClientFromContext`, middleware (both global and per-worker), and timeouts. [PR #753](https://github.com/riverqueue/river/pull/753).

### Changed

Expand All @@ -19,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- `riverdatabasesql` driver: properly handle `nil` values in `bytea[]` inputs. This fixes the driver's handling of empty unique keys on insert for non-unique jobs with the newer unique jobs implementation. [PR #739](https://github.com/riverqueue/river/pull/739).
- `JobCompleteTx` now returns `rivertype.ErrNotFound` if the job doesn't exist instead of panicking. [PR #753](https://github.com/riverqueue/river/pull/753).

## [0.16.0] - 2024-01-27

Expand Down
106 changes: 55 additions & 51 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,58 @@ type Config struct {
schedulerInterval time.Duration
}

// WithDefaults returns a copy of the Config with all default values applied.
func (c *Config) WithDefaults() *Config {
// Use the existing logger if set, otherwise create a default one.
logger := c.Logger
if logger == nil {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
}))
}

// Compute the default rescue value. For convenience, if JobTimeout is specified
// but RescueStuckJobsAfter is not set (or less than 1) and JobTimeout is large, use
// JobTimeout + maintenance.JobRescuerRescueAfterDefault as the default.
rescueAfter := maintenance.JobRescuerRescueAfterDefault
if c.JobTimeout > 0 && c.RescueStuckJobsAfter < 1 && c.JobTimeout > c.RescueStuckJobsAfter {
rescueAfter = c.JobTimeout + maintenance.JobRescuerRescueAfterDefault
}

// Set default retry policy if none is provided.
retryPolicy := c.RetryPolicy
if retryPolicy == nil {
retryPolicy = &DefaultClientRetryPolicy{}
}

return &Config{
AdvisoryLockPrefix: c.AdvisoryLockPrefix,
CancelledJobRetentionPeriod: valutil.ValOrDefault(c.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: valutil.ValOrDefault(c.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(c.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault),
ErrorHandler: c.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(c.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: valutil.ValOrDefault(c.FetchPollInterval, FetchPollIntervalDefault),
ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }),
JobInsertMiddleware: c.JobInsertMiddleware,
JobTimeout: valutil.ValOrDefault(c.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(c.MaxAttempts, MaxAttemptsDefault),
PeriodicJobs: c.PeriodicJobs,
PollOnly: c.PollOnly,
Queues: c.Queues,
ReindexerSchedule: c.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(c.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
Test: c.Test,
TestOnly: c.TestOnly,
Workers: c.Workers,
WorkerMiddleware: c.WorkerMiddleware,
schedulerInterval: valutil.ValOrDefault(c.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
}
}

func (c *Config) validate() error {
if c.CancelledJobRetentionPeriod < 0 {
return errors.New("CancelledJobRetentionPeriod time cannot be less than zero")
Expand Down Expand Up @@ -460,55 +512,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return nil, errMissingConfig
}

logger := config.Logger
if logger == nil {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
}))
}

retryPolicy := config.RetryPolicy
if retryPolicy == nil {
retryPolicy = &DefaultClientRetryPolicy{}
}

// For convenience, in case the user's specified a large JobTimeout but no
// RescueStuckJobsAfter, since RescueStuckJobsAfter must be greater than
// JobTimeout, set a reasonable default value that's longer than JobTimeout.
rescueAfter := maintenance.JobRescuerRescueAfterDefault
if config.JobTimeout > 0 && config.RescueStuckJobsAfter < 1 && config.JobTimeout > config.RescueStuckJobsAfter {
rescueAfter = config.JobTimeout + maintenance.JobRescuerRescueAfterDefault
}

// Create a new version of config with defaults filled in. This replaces the
// original object, so everything that we care about must be initialized
// here, even if it's only carrying over the original value.
config = &Config{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault),
ErrorHandler: config.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault),
ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }),
JobInsertMiddleware: config.JobInsertMiddleware,
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault),
PeriodicJobs: config.PeriodicJobs,
PollOnly: config.PollOnly,
Queues: config.Queues,
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
SkipUnknownJobCheck: config.SkipUnknownJobCheck,
Test: config.Test,
TestOnly: config.TestOnly,
Workers: config.Workers,
WorkerMiddleware: config.WorkerMiddleware,
schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
}
config = config.WithDefaults()

if err := config.validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -567,7 +571,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.services = append(client.services, client.notifier)
}
} else {
logger.Info("Driver does not support listener; entering poll only mode")
config.Logger.Info("Driver does not support listener; entering poll only mode")
}

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
Expand Down Expand Up @@ -608,7 +612,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

{
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
ClientRetryPolicy: retryPolicy,
ClientRetryPolicy: config.RetryPolicy,
RescueAfter: config.RescueStuckJobsAfter,
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
if workerInfo, ok := config.Workers.workersMap[kind]; ok {
Expand Down
49 changes: 49 additions & 0 deletions internal/execution/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package execution

import (
"context"
"time"

"github.com/riverqueue/river/rivertype"
)

// ContextKeyInsideTestWorker is an internal context key that indicates whether
// the worker is running inside a [rivertest.Worker].
type ContextKeyInsideTestWorker struct{}

type Func func(ctx context.Context) error

// MaybeApplyTimeout returns a context that will be cancelled after the given
// timeout. If the timeout is <= 0, the context will not be timed out, but it
// will still have a cancel function returned. In either case the cancel
// function should be called after execution (in a defer).
func MaybeApplyTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
// No timeout if a -1 was specified.
if timeout > 0 {
return context.WithTimeout(ctx, timeout)
}

return context.WithCancel(ctx)
}

// MiddlewareChain chains together the given middleware functions, returning a
// single function that applies them all in reverse order.
func MiddlewareChain(global, worker []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func {
allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(global)+len(worker))
allMiddleware = append(allMiddleware, global...)
allMiddleware = append(allMiddleware, worker...)

if len(allMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(allMiddleware) - 1; i >= 0; i-- {
middlewareItem := allMiddleware[i] // capture the current middleware item
previousDoInner := doInner // capture the current doInner function
doInner = func(ctx context.Context) error {
return middlewareItem.Work(ctx, jobRow, previousDoInner)
}
}
}

return doInner
}
2 changes: 2 additions & 0 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
job, err := exec.JobInsertFull(ctx, &riverdriver.JobInsertFullParams{
Attempt: 3,
AttemptedAt: &now,
AttemptedBy: []string{"worker1", "worker2"},
CreatedAt: &now,
EncodedArgs: []byte(`{"encoded": "args"}`),
Errors: [][]byte{[]byte(`{"error": "message"}`)},
Expand All @@ -1099,6 +1100,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
require.NoError(t, err)
require.Equal(t, 3, job.Attempt)
requireEqualTime(t, now, *job.AttemptedAt)
require.Equal(t, []string{"worker1", "worker2"}, job.AttemptedBy)
requireEqualTime(t, now, job.CreatedAt)
require.JSONEq(t, `{"encoded": "args"}`, string(job.EncodedArgs))
require.Equal(t, "message", job.Errors[0].Error)
Expand Down
8 changes: 8 additions & 0 deletions job_complete_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"time"

"github.com/riverqueue/river/internal/execution"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -50,6 +51,13 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx
if err != nil {
return nil, err
}
if len(rows) == 0 {
if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker {
panic("to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first")
}

return nil, rivertype.ErrNotFound
}
updatedJob := &Job[TArgs]{JobRow: rows[0]}

if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil {
Expand Down
38 changes: 38 additions & 0 deletions job_complete_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/execution"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
Expand Down Expand Up @@ -79,4 +80,41 @@ func TestJobCompleteTx(t *testing.T) {
_, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.EqualError(t, err, "job must be running")
})

t.Run("ErrorIfJobDoesntExist", func(t *testing.T) {
t.Parallel()

ctx, bundle := setup(ctx, t)

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
State: ptrutil.Ptr(rivertype.JobStateAvailable),
})

// delete the job
_, err := bundle.exec.JobDelete(ctx, job.ID)
require.NoError(t, err)

// fake the job's state to work around the check:
job.State = rivertype.JobStateRunning
_, err = JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("PanicsIfCalledInTestWorkerWithoutInsertingJob", func(t *testing.T) {
t.Parallel()

ctx, bundle := setup(ctx, t)
ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true)

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)})
// delete the job as though it was never inserted:
_, err := bundle.client.JobDeleteTx(ctx, bundle.tx, job.ID)
require.NoError(t, err)
job.State = rivertype.JobStateRunning

require.PanicsWithValue(t, "to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first", func() {
_, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.NoError(t, err)
})
})
}
43 changes: 6 additions & 37 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"runtime/debug"
"time"

"github.com/riverqueue/river/internal/execution"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/valutil"
"github.com/riverqueue/river/rivertype"
)

Expand Down Expand Up @@ -204,43 +206,10 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
return &jobExecutorResult{Err: err}
}

workerMiddleware := e.WorkUnit.Middleware()

doInner := func(ctx context.Context) error {
jobTimeout := e.WorkUnit.Timeout()
if jobTimeout == 0 {
jobTimeout = e.ClientJobTimeout
}

// No timeout if a -1 was specified.
if jobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}

if err := e.WorkUnit.Work(ctx); err != nil {
return err
}

return nil
}

allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(e.GlobalMiddleware)+len(workerMiddleware))
allMiddleware = append(allMiddleware, e.GlobalMiddleware...)
allMiddleware = append(allMiddleware, workerMiddleware...)

if len(allMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(allMiddleware) - 1; i >= 0; i-- {
middlewareItem := allMiddleware[i] // capture the current middleware item
previousDoInner := doInner // Capture the current doInner function
doInner = func(ctx context.Context) error {
return middlewareItem.Work(ctx, e.JobRow, previousDoInner)
}
}
}
doInner := execution.MiddlewareChain(e.GlobalMiddleware, e.WorkUnit.Middleware(), e.WorkUnit.Work, e.JobRow)
jobTimeout := valutil.FirstNonZero(e.WorkUnit.Timeout(), e.ClientJobTimeout)
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
defer cancel()

return &jobExecutorResult{Err: doInner(ctx)}
}
Expand Down
1 change: 1 addition & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ type JobInsertFastResult struct {
type JobInsertFullParams struct {
Attempt int
AttemptedAt *time.Time
AttemptedBy []string
CreatedAt *time.Time
EncodedArgs []byte
Errors [][]byte
Expand Down
Loading