From 521905e20eed510d5032b1eebd9551c72d46e7ce Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 12 Feb 2025 16:30:33 -0600 Subject: [PATCH 1/6] sliceutil.DefaultIfEmpty -> FirstNonEmpty Make this util more useful with variadic args, functioning more similarly to `valutil.FirstNonZero` by returning the first non-empty value or else `nil`. --- .../riverdatabasesql/river_database_sql_driver.go | 2 +- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 6 +++--- rivershared/util/sliceutil/slice_util.go | 14 ++++++++------ rivershared/util/sliceutil/slice_util_test.go | 6 +++--- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index a89c6023..932f84de 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -241,7 +241,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. insertJobsParams.ScheduledAt[i] = scheduledAt insertJobsParams.State[i] = string(params.State) insertJobsParams.Tags[i] = strings.Join(tags, ",") - insertJobsParams.UniqueKey[i] = sliceutil.DefaultIfEmpty(params.UniqueKey, nil) + insertJobsParams.UniqueKey[i] = sliceutil.FirstNonEmpty(params.UniqueKey) insertJobsParams.UniqueStates[i] = pgtypealias.Bits{Bits: pgtype.Bits{Bytes: []byte{params.UniqueStates}, Len: 8, Valid: params.UniqueStates != 0}} } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 504dc4cb..0eaf3356 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -226,16 +226,16 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. defaultObject := []byte("{}") - insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) + insertJobsParams.Args[i] = sliceutil.FirstNonEmpty(params.EncodedArgs, defaultObject) insertJobsParams.Kind[i] = params.Kind insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec - insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) + insertJobsParams.Metadata[i] = sliceutil.FirstNonEmpty(params.Metadata, defaultObject) insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec insertJobsParams.Queue[i] = params.Queue insertJobsParams.ScheduledAt[i] = scheduledAt insertJobsParams.State[i] = string(params.State) insertJobsParams.Tags[i] = strings.Join(tags, ",") - insertJobsParams.UniqueKey[i] = sliceutil.DefaultIfEmpty(params.UniqueKey, nil) + insertJobsParams.UniqueKey[i] = sliceutil.FirstNonEmpty(params.UniqueKey) insertJobsParams.UniqueStates[i] = pgtype.Bits{Bytes: []byte{params.UniqueStates}, Len: 8, Valid: params.UniqueStates != 0} } diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index 8088e05a..a0b7262a 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -4,13 +4,15 @@ // therefore omitted from the utilities in `slices`. package sliceutil -// DefaultIfEmpty returns the default slice if the input slice is nil or empty, -// otherwise it returns the input slice. -func DefaultIfEmpty[T any](input []T, defaultSlice []T) []T { - if len(input) == 0 { - return defaultSlice +// FirstNonEmpty returns the first non-empty slice from the input, or nil if +// all input slices are empty. +func FirstNonEmpty[T any](inputs ...[]T) []T { + for _, input := range inputs { + if len(input) > 0 { + return input + } } - return input + return nil } // GroupBy returns an object composed of keys generated from the results of diff --git a/rivershared/util/sliceutil/slice_util_test.go b/rivershared/util/sliceutil/slice_util_test.go index 55e1dbb0..96f240b6 100644 --- a/rivershared/util/sliceutil/slice_util_test.go +++ b/rivershared/util/sliceutil/slice_util_test.go @@ -11,9 +11,9 @@ import ( func TestDefaultIfEmpty(t *testing.T) { t.Parallel() - result1 := DefaultIfEmpty([]int{1, 2, 3}, []int{4, 5, 6}) - result2 := DefaultIfEmpty([]int{}, []int{4, 5, 6}) - result3 := DefaultIfEmpty(nil, []int{4, 5, 6}) + result1 := FirstNonEmpty([]int{1, 2, 3}, []int{4, 5, 6}) + result2 := FirstNonEmpty([]int{}, []int{4, 5, 6}) + result3 := FirstNonEmpty(nil, []int{4, 5, 6}) require.Len(t, result1, 3) require.Len(t, result2, 3) From c743885b68083b22e7151f35c27767c752526c1b Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 12 Feb 2025 16:27:24 -0600 Subject: [PATCH 2/6] remove unused internal/clientconfig pkg --- rivershared/clientconfig/client_config.go | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 rivershared/clientconfig/client_config.go diff --git a/rivershared/clientconfig/client_config.go b/rivershared/clientconfig/client_config.go deleted file mode 100644 index c239eda0..00000000 --- a/rivershared/clientconfig/client_config.go +++ /dev/null @@ -1,19 +0,0 @@ -package clientconfig - -import ( - "time" - - "github.com/riverqueue/river/internal/dbunique" -) - -// InsertOpts is a mirror of river.InsertOpts usable by other internal packages. -type InsertOpts struct { - MaxAttempts int - Metadata []byte - Pending bool - Priority int - Queue string - ScheduledAt time.Time - Tags []string - UniqueOpts dbunique.UniqueOpts -} From 37bf61d83ff2f6a73ba16ee4dfdfb54816544f12 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 12 Feb 2025 22:23:25 -0600 Subject: [PATCH 3/6] testfactory / insert full: allow setting AttemptedBy --- .../riverdrivertest/riverdrivertest.go | 2 ++ riverdriver/river_driver_interface.go | 1 + .../internal/dbsqlc/river_job.sql.go | 24 +++++++++++-------- .../river_database_sql_driver.go | 1 + .../riverpgxv5/internal/dbsqlc/river_job.sql | 2 ++ .../internal/dbsqlc/river_job.sql.go | 24 +++++++++++-------- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 1 + rivershared/testfactory/test_factory.go | 2 ++ 8 files changed, 37 insertions(+), 20 deletions(-) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 088a4a81..f0c2553a 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1082,6 +1082,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job, err := exec.JobInsertFull(ctx, &riverdriver.JobInsertFullParams{ Attempt: 3, AttemptedAt: &now, + AttemptedBy: []string{"worker1", "worker2"}, CreatedAt: &now, EncodedArgs: []byte(`{"encoded": "args"}`), Errors: [][]byte{[]byte(`{"error": "message"}`)}, @@ -1099,6 +1100,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) require.Equal(t, 3, job.Attempt) requireEqualTime(t, now, *job.AttemptedAt) + require.Equal(t, []string{"worker1", "worker2"}, job.AttemptedBy) requireEqualTime(t, now, job.CreatedAt) require.JSONEq(t, `{"encoded": "args"}`, string(job.EncodedArgs)) require.Equal(t, "message", job.Errors[0].Error) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 9287878b..46527b14 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -273,6 +273,7 @@ type JobInsertFastResult struct { type JobInsertFullParams struct { Attempt int AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time EncodedArgs []byte Errors [][]byte diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index b6fe0118..c6abc07f 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -718,6 +718,7 @@ INSERT INTO river_job( args, attempt, attempted_at, + attempted_by, created_at, errors, finalized_at, @@ -735,19 +736,20 @@ INSERT INTO river_job( $1::jsonb, coalesce($2::smallint, 0), $3, - coalesce($4::timestamptz, now()), - $5, + coalesce($4::text[], '{}'), + coalesce($5::timestamptz, now()), $6, $7, - $8::smallint, - coalesce($9::jsonb, '{}'), - $10, + $8, + $9::smallint, + coalesce($10::jsonb, '{}'), $11, - coalesce($12::timestamptz, now()), - $13, - coalesce($14::varchar(255)[], '{}'), - $15, - $16 + $12, + coalesce($13::timestamptz, now()), + $14, + coalesce($15::varchar(255)[], '{}'), + $16, + $17 ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` @@ -755,6 +757,7 @@ type JobInsertFullParams struct { Args string Attempt int16 AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time Errors []string FinalizedAt *time.Time @@ -775,6 +778,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.Args, arg.Attempt, arg.AttemptedAt, + pq.Array(arg.AttemptedBy), arg.CreatedAt, pq.Array(arg.Errors), arg.FinalizedAt, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 932f84de..f57962ae 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -315,6 +315,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns job, err := dbsqlc.New().JobInsertFull(ctx, e.dbtx, &dbsqlc.JobInsertFullParams{ Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec AttemptedAt: params.AttemptedAt, + AttemptedBy: params.AttemptedBy, Args: string(params.EncodedArgs), CreatedAt: params.CreatedAt, Errors: sliceutil.Map(params.Errors, func(e []byte) string { return string(e) }), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index eaee9279..068ddcf8 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -277,6 +277,7 @@ INSERT INTO river_job( args, attempt, attempted_at, + attempted_by, created_at, errors, finalized_at, @@ -294,6 +295,7 @@ INSERT INTO river_job( @args::jsonb, coalesce(@attempt::smallint, 0), @attempted_at, + coalesce(@attempted_by::text[], '{}'), coalesce(sqlc.narg('created_at')::timestamptz, now()), @errors, @finalized_at, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 781c7fe3..d5f04038 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -702,6 +702,7 @@ INSERT INTO river_job( args, attempt, attempted_at, + attempted_by, created_at, errors, finalized_at, @@ -719,19 +720,20 @@ INSERT INTO river_job( $1::jsonb, coalesce($2::smallint, 0), $3, - coalesce($4::timestamptz, now()), - $5, + coalesce($4::text[], '{}'), + coalesce($5::timestamptz, now()), $6, $7, - $8::smallint, - coalesce($9::jsonb, '{}'), - $10, + $8, + $9::smallint, + coalesce($10::jsonb, '{}'), $11, - coalesce($12::timestamptz, now()), - $13, - coalesce($14::varchar(255)[], '{}'), - $15, - $16 + $12, + coalesce($13::timestamptz, now()), + $14, + coalesce($15::varchar(255)[], '{}'), + $16, + $17 ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` @@ -739,6 +741,7 @@ type JobInsertFullParams struct { Args []byte Attempt int16 AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time Errors [][]byte FinalizedAt *time.Time @@ -759,6 +762,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.Args, arg.Attempt, arg.AttemptedAt, + arg.AttemptedBy, arg.CreatedAt, arg.Errors, arg.FinalizedAt, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 0eaf3356..3ba19304 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -302,6 +302,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns job, err := dbsqlc.New().JobInsertFull(ctx, e.dbtx, &dbsqlc.JobInsertFullParams{ Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec AttemptedAt: params.AttemptedAt, + AttemptedBy: params.AttemptedBy, Args: params.EncodedArgs, CreatedAt: params.CreatedAt, Errors: params.Errors, diff --git a/rivershared/testfactory/test_factory.go b/rivershared/testfactory/test_factory.go index f6f3dd4c..568e24b6 100644 --- a/rivershared/testfactory/test_factory.go +++ b/rivershared/testfactory/test_factory.go @@ -21,6 +21,7 @@ import ( type JobOpts struct { Attempt *int AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time EncodedArgs []byte Errors [][]byte @@ -73,6 +74,7 @@ func Job_Build(tb testing.TB, opts *JobOpts) *riverdriver.JobInsertFullParams { return &riverdriver.JobInsertFullParams{ Attempt: ptrutil.ValOrDefault(opts.Attempt, 0), AttemptedAt: opts.AttemptedAt, + AttemptedBy: opts.AttemptedBy, CreatedAt: opts.CreatedAt, EncodedArgs: encodedArgs, Errors: opts.Errors, From a8569d1a6bc7569cc5aa55cd21c96a387ad224f4 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 13 Feb 2025 13:27:22 -0600 Subject: [PATCH 4/6] fix panic on non-existent job in JobCompleteTx --- job_complete_tx.go | 3 +++ job_complete_tx_test.go | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/job_complete_tx.go b/job_complete_tx.go index 0f6b7005..bf4a8e8e 100644 --- a/job_complete_tx.go +++ b/job_complete_tx.go @@ -50,6 +50,9 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx if err != nil { return nil, err } + if len(rows) == 0 { + return nil, rivertype.ErrNotFound + } updatedJob := &Job[TArgs]{JobRow: rows[0]} if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { diff --git a/job_complete_tx_test.go b/job_complete_tx_test.go index 497688fc..8b5f6f84 100644 --- a/job_complete_tx_test.go +++ b/job_complete_tx_test.go @@ -79,4 +79,23 @@ func TestJobCompleteTx(t *testing.T) { _, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) require.EqualError(t, err, "job must be running") }) + + t.Run("ErrorIfJobDoesntExist", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateAvailable), + }) + + // delete the job + _, err := bundle.exec.JobDelete(ctx, job.ID) + require.NoError(t, err) + + // fake the job's state to work around the check: + job.State = rivertype.JobStateRunning + _, err = JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) } From aa8813e81aa18e37e94a50ec6916e85b568794a0 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 10 Feb 2025 20:32:33 -0600 Subject: [PATCH 5/6] rivertest: implement a Worker type to simplify testing The `rivertest.Worker` type makes it much simpler to test River workers with a realistic execution environment without needing to touch the database at all. The type leverages a `river.Config` to allocate an inner `Client` and to enable jobs to be easily created with the normal default values, along with full access to features like middleweare. The execution context also contains all the normal stuff that a real context in River might, such as the client and a timeout if configured. --- CHANGELOG.md | 2 + client.go | 106 ++++++++-------- internal/execution/execution.go | 45 +++++++ job_executor.go | 43 +------ rivershared/go.mod | 6 - rivershared/go.sum | 24 ---- rivertest/worker.go | 218 ++++++++++++++++++++++++++++++++ rivertest/worker_test.go | 182 ++++++++++++++++++++++++++ 8 files changed, 508 insertions(+), 118 deletions(-) create mode 100644 internal/execution/execution.go create mode 100644 rivertest/worker.go create mode 100644 rivertest/worker_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a897257..fad08032 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Exposed `TestConfig` struct on `Config` under the `Test` field for configuration that is specific to test environments. For now, the only field on this type is `Time`, which can be used to set a synthetic `TimeGenerator` for tests. A stubbable time generator was added as `rivertest.TimeStub` to allow time to be easily stubbed in tests. [PR #754](https://github.com/riverqueue/river/pull/754). +- New `rivertest.Worker` type to make it significantly easier to test River workers. Either real or synthetic jobs can be worked using this interface, generally without requiring any database interactions. The `Worker` type provides a realistic execution environment with access to the full range of River features, including `river.ClientFromContext`, middleware (both global and per-worker), and timeouts. [PR #753](https://github.com/riverqueue/river/pull/753). ### Changed @@ -19,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - `riverdatabasesql` driver: properly handle `nil` values in `bytea[]` inputs. This fixes the driver's handling of empty unique keys on insert for non-unique jobs with the newer unique jobs implementation. [PR #739](https://github.com/riverqueue/river/pull/739). +- `JobCompleteTx` now returns `rivertype.ErrNotFound` if the job doesn't exist instead of panicking. [PR #753](https://github.com/riverqueue/river/pull/753). ## [0.16.0] - 2024-01-27 diff --git a/client.go b/client.go index 6537177e..5cbe22eb 100644 --- a/client.go +++ b/client.go @@ -268,6 +268,58 @@ type Config struct { schedulerInterval time.Duration } +// WithDefaults returns a copy of the Config with all default values applied. +func (c *Config) WithDefaults() *Config { + // Use the existing logger if set, otherwise create a default one. + logger := c.Logger + if logger == nil { + logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelWarn, + })) + } + + // Compute the default rescue value. For convenience, if JobTimeout is specified + // but RescueStuckJobsAfter is not set (or less than 1) and JobTimeout is large, use + // JobTimeout + maintenance.JobRescuerRescueAfterDefault as the default. + rescueAfter := maintenance.JobRescuerRescueAfterDefault + if c.JobTimeout > 0 && c.RescueStuckJobsAfter < 1 && c.JobTimeout > c.RescueStuckJobsAfter { + rescueAfter = c.JobTimeout + maintenance.JobRescuerRescueAfterDefault + } + + // Set default retry policy if none is provided. + retryPolicy := c.RetryPolicy + if retryPolicy == nil { + retryPolicy = &DefaultClientRetryPolicy{} + } + + return &Config{ + AdvisoryLockPrefix: c.AdvisoryLockPrefix, + CancelledJobRetentionPeriod: valutil.ValOrDefault(c.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault), + CompletedJobRetentionPeriod: valutil.ValOrDefault(c.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault), + DiscardedJobRetentionPeriod: valutil.ValOrDefault(c.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault), + ErrorHandler: c.ErrorHandler, + FetchCooldown: valutil.ValOrDefault(c.FetchCooldown, FetchCooldownDefault), + FetchPollInterval: valutil.ValOrDefault(c.FetchPollInterval, FetchPollIntervalDefault), + ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }), + JobInsertMiddleware: c.JobInsertMiddleware, + JobTimeout: valutil.ValOrDefault(c.JobTimeout, JobTimeoutDefault), + Logger: logger, + MaxAttempts: valutil.ValOrDefault(c.MaxAttempts, MaxAttemptsDefault), + PeriodicJobs: c.PeriodicJobs, + PollOnly: c.PollOnly, + Queues: c.Queues, + ReindexerSchedule: c.ReindexerSchedule, + RescueStuckJobsAfter: valutil.ValOrDefault(c.RescueStuckJobsAfter, rescueAfter), + RetryPolicy: retryPolicy, + SkipUnknownJobCheck: c.SkipUnknownJobCheck, + Test: c.Test, + TestOnly: c.TestOnly, + Workers: c.Workers, + WorkerMiddleware: c.WorkerMiddleware, + schedulerInterval: valutil.ValOrDefault(c.schedulerInterval, maintenance.JobSchedulerIntervalDefault), + } +} + func (c *Config) validate() error { if c.CancelledJobRetentionPeriod < 0 { return errors.New("CancelledJobRetentionPeriod time cannot be less than zero") @@ -460,55 +512,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return nil, errMissingConfig } - logger := config.Logger - if logger == nil { - logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ - Level: slog.LevelWarn, - })) - } - - retryPolicy := config.RetryPolicy - if retryPolicy == nil { - retryPolicy = &DefaultClientRetryPolicy{} - } - - // For convenience, in case the user's specified a large JobTimeout but no - // RescueStuckJobsAfter, since RescueStuckJobsAfter must be greater than - // JobTimeout, set a reasonable default value that's longer than JobTimeout. - rescueAfter := maintenance.JobRescuerRescueAfterDefault - if config.JobTimeout > 0 && config.RescueStuckJobsAfter < 1 && config.JobTimeout > config.RescueStuckJobsAfter { - rescueAfter = config.JobTimeout + maintenance.JobRescuerRescueAfterDefault - } - - // Create a new version of config with defaults filled in. This replaces the - // original object, so everything that we care about must be initialized - // here, even if it's only carrying over the original value. - config = &Config{ - AdvisoryLockPrefix: config.AdvisoryLockPrefix, - CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault), - CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault), - DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault), - ErrorHandler: config.ErrorHandler, - FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault), - FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault), - ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }), - JobInsertMiddleware: config.JobInsertMiddleware, - JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault), - Logger: logger, - MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault), - PeriodicJobs: config.PeriodicJobs, - PollOnly: config.PollOnly, - Queues: config.Queues, - ReindexerSchedule: config.ReindexerSchedule, - RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter), - RetryPolicy: retryPolicy, - SkipUnknownJobCheck: config.SkipUnknownJobCheck, - Test: config.Test, - TestOnly: config.TestOnly, - Workers: config.Workers, - WorkerMiddleware: config.WorkerMiddleware, - schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault), - } + config = config.WithDefaults() if err := config.validate(); err != nil { return nil, err @@ -567,7 +571,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, client.notifier) } } else { - logger.Info("Driver does not support listener; entering poll only mode") + config.Logger.Info("Driver does not support listener; entering poll only mode") } client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{ @@ -608,7 +612,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ - ClientRetryPolicy: retryPolicy, + ClientRetryPolicy: config.RetryPolicy, RescueAfter: config.RescueStuckJobsAfter, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { if workerInfo, ok := config.Workers.workersMap[kind]; ok { diff --git a/internal/execution/execution.go b/internal/execution/execution.go new file mode 100644 index 00000000..5d6ba90f --- /dev/null +++ b/internal/execution/execution.go @@ -0,0 +1,45 @@ +package execution + +import ( + "context" + "time" + + "github.com/riverqueue/river/rivertype" +) + +type Func func(ctx context.Context) error + +// MaybeApplyTimeout returns a context that will be cancelled after the given +// timeout. If the timeout is <= 0, the context will not be timed out, but it +// will still have a cancel function returned. In either case the cancel +// function should be called after execution (in a defer). +func MaybeApplyTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + // No timeout if a -1 was specified. + if timeout > 0 { + return context.WithTimeout(ctx, timeout) + } + + return context.WithCancel(ctx) +} + +// MiddlewareChain chains together the given middleware functions, returning a +// single function that applies them all in reverse order. +func MiddlewareChain(global, worker []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func { + allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(global)+len(worker)) + allMiddleware = append(allMiddleware, global...) + allMiddleware = append(allMiddleware, worker...) + + if len(allMiddleware) > 0 { + // Wrap middlewares in reverse order so the one defined first is wrapped + // as the outermost function and is first to receive the operation. + for i := len(allMiddleware) - 1; i >= 0; i-- { + middlewareItem := allMiddleware[i] // capture the current middleware item + previousDoInner := doInner // capture the current doInner function + doInner = func(ctx context.Context) error { + return middlewareItem.Work(ctx, jobRow, previousDoInner) + } + } + } + + return doInner +} diff --git a/job_executor.go b/job_executor.go index 2f84bd99..21529668 100644 --- a/job_executor.go +++ b/job_executor.go @@ -9,11 +9,13 @@ import ( "runtime/debug" "time" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobstats" "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/util/valutil" "github.com/riverqueue/river/rivertype" ) @@ -204,43 +206,10 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { return &jobExecutorResult{Err: err} } - workerMiddleware := e.WorkUnit.Middleware() - - doInner := func(ctx context.Context) error { - jobTimeout := e.WorkUnit.Timeout() - if jobTimeout == 0 { - jobTimeout = e.ClientJobTimeout - } - - // No timeout if a -1 was specified. - if jobTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, jobTimeout) - defer cancel() - } - - if err := e.WorkUnit.Work(ctx); err != nil { - return err - } - - return nil - } - - allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(e.GlobalMiddleware)+len(workerMiddleware)) - allMiddleware = append(allMiddleware, e.GlobalMiddleware...) - allMiddleware = append(allMiddleware, workerMiddleware...) - - if len(allMiddleware) > 0 { - // Wrap middlewares in reverse order so the one defined first is wrapped - // as the outermost function and is first to receive the operation. - for i := len(allMiddleware) - 1; i >= 0; i-- { - middlewareItem := allMiddleware[i] // capture the current middleware item - previousDoInner := doInner // Capture the current doInner function - doInner = func(ctx context.Context) error { - return middlewareItem.Work(ctx, e.JobRow, previousDoInner) - } - } - } + doInner := execution.MiddlewareChain(e.GlobalMiddleware, e.WorkUnit.Middleware(), e.WorkUnit.Work, e.JobRow) + jobTimeout := valutil.FirstNonZero(e.WorkUnit.Timeout(), e.ClientJobTimeout) + ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) + defer cancel() return &jobExecutorResult{Err: doInner(ctx)} } diff --git a/rivershared/go.mod b/rivershared/go.mod index 59db1a2a..22f095cb 100644 --- a/rivershared/go.mod +++ b/rivershared/go.mod @@ -17,11 +17,5 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/tidwall/gjson v1.18.0 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.1 // indirect - github.com/tidwall/sjson v1.2.5 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/text v0.22.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/rivershared/go.sum b/rivershared/go.sum index 62fefad0..299d37a1 100644 --- a/rivershared/go.sum +++ b/rivershared/go.sum @@ -1,14 +1,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= -github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= -github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= -github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -25,26 +17,10 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= -github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= -github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= -github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= -github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= -github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/rivertest/worker.go b/rivertest/worker.go new file mode 100644 index 00000000..43969f75 --- /dev/null +++ b/rivertest/worker.go @@ -0,0 +1,218 @@ +package rivertest + +import ( + "context" + "encoding/json" + "sync/atomic" + "testing" + "time" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivershared/util/sliceutil" + "github.com/riverqueue/river/rivershared/util/valutil" + "github.com/riverqueue/river/rivertype" +) + +// Worker makes it easier to test river workers. Once built, the worker can be +// used to work any number of synthetic jobs without touching the database: +// +// worker := rivertest.NewWorker(t, driver, config, worker) +// if err := worker.Work(ctx, t, args, nil); err != nil { +// t.Fatalf("failed to work job: %s", err) +// } +// +// if err := worker.Work(ctx, t, args, &river.InsertOpts{Queue: "custom_queue"}); err != nil { +// t.Fatalf("failed to work job: %s", err) +// } +// +// In all cases the underlying [river.Worker] will be called with the synthetic +// job. The execution environment has a realistic River context with access to +// all River features, including [river.ClientFromContext] and worker +// middleware. +type Worker[T river.JobArgs, TTx any] struct { + client *river.Client[TTx] + config *river.Config + worker river.Worker[T] +} + +// NewWorker creates a new test Worker for testing the provided [river.Worker]. +// The worker uses the provided driver and River config to populate default +// values on test jobs and to configure the execution environment. +// +// The database is not required to use this helper, and a pool-less driver is +// recommended for most usage. This enables individual test cases to run in +// parallel and with full isolation, even using a single shared `Worker` +// instance across many test cases. +func NewWorker[T river.JobArgs, TTx any](tb testing.TB, driver riverdriver.Driver[TTx], config *river.Config, worker river.Worker[T]) *Worker[T, TTx] { + tb.Helper() + + config = config.WithDefaults() + client, err := river.NewClient(driver, config) + if err != nil { + tb.Fatalf("failed to create client: %s", err) + } + + return &Worker[T, TTx]{ + client: client, + config: config, + worker: worker, + } +} + +// Work allocates a synthetic job with the provided arguments and optional +// insert options, then works it. +func (w *Worker[T, TTx]) Work(ctx context.Context, tb testing.TB, args T, opts *river.InsertOpts) error { + tb.Helper() + job := makeJobFromArgs(tb, w.config, args, opts) + return w.WorkJob(ctx, tb, job) +} + +// WorkJob works the provided job. The job must be constructed to be a realistic +// job using external logic prior to calling this method. Unlike the other +// variants, this method offers total control over the job's attributes. +func (w *Worker[T, TTx]) WorkJob(ctx context.Context, tb testing.TB, job *river.Job[T]) error { + tb.Helper() + + // populate river client into context: + ctx = WorkContext(ctx, w.client) + + doInner := execution.MiddlewareChain( + w.config.WorkerMiddleware, + w.worker.Middleware(job), + func(ctx context.Context) error { return w.worker.Work(ctx, job) }, + job.JobRow, + ) + + jobTimeout := valutil.FirstNonZero(w.worker.Timeout(job), w.config.JobTimeout) + ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) + defer cancel() + + return doInner(ctx) +} + +var idSeq int64 = 0 //nolint:gochecknoglobals + +func nextID() int64 { + return atomic.AddInt64(&idSeq, 1) +} + +func makeJobFromArgs[T river.JobArgs](tb testing.TB, config *river.Config, args T, opts *river.InsertOpts) *river.Job[T] { + tb.Helper() + + encodedArgs, err := json.Marshal(args) + if err != nil { + tb.Fatalf("failed to marshal args: %s", err) + } + + // Round trip the args to validate JSON marshalling/unmarshalling on the type. + var decodedArgs T + if err = json.Unmarshal(encodedArgs, &decodedArgs); err != nil { + tb.Fatalf("failed to unmarshal args: %s", err) + } + + if opts == nil { + opts = &river.InsertOpts{} + } + + // Extract any job insert opts from args type if present + var jobInsertOpts river.InsertOpts + if argsWithOpts, ok := any(args).(river.JobArgsWithInsertOpts); ok { + jobInsertOpts = argsWithOpts.InsertOpts() + } + + now := valutil.FirstNonZero(opts.ScheduledAt, jobInsertOpts.ScheduledAt, time.Now()) + + internalUniqueOpts := (*dbunique.UniqueOpts)(&opts.UniqueOpts) + insertParams := &rivertype.JobInsertParams{ + Args: args, + CreatedAt: &now, + EncodedArgs: encodedArgs, + Kind: args.Kind(), + MaxAttempts: valutil.FirstNonZero(opts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts, river.MaxAttemptsDefault), + Metadata: sliceutil.FirstNonEmpty(opts.Metadata, jobInsertOpts.Metadata, []byte(`{}`)), + Priority: valutil.FirstNonZero(opts.Priority, jobInsertOpts.Priority, river.PriorityDefault), + Queue: valutil.FirstNonZero(opts.Queue, jobInsertOpts.Queue, river.QueueDefault), + State: rivertype.JobStateAvailable, + Tags: sliceutil.FirstNonEmpty(opts.Tags, jobInsertOpts.Tags, []string{}), + } + var uniqueKey []byte + timeGen := config.Test.Time + if timeGen == nil { + timeGen = &baseservice.UnStubbableTimeGenerator{} + } + if !internalUniqueOpts.IsEmpty() { + uniqueKey, err = dbunique.UniqueKey(valutil.ValOrDefault(config.Test.Time, timeGen), internalUniqueOpts, insertParams) + if err != nil { + tb.Fatalf("failed to create unique key: %s", err) + } + } + + return makeJobFromFactoryBuild(tb, args, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(1), + AttemptedAt: &now, + AttemptedBy: []string{"worker1"}, + CreatedAt: &now, + EncodedArgs: encodedArgs, + Errors: nil, + FinalizedAt: nil, + Kind: ptrutil.Ptr(args.Kind()), + MaxAttempts: ptrutil.Ptr(valutil.FirstNonZero(opts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts, river.MaxAttemptsDefault)), + Metadata: sliceutil.FirstNonEmpty(opts.Metadata, jobInsertOpts.Metadata, []byte(`{}`)), + Priority: ptrutil.Ptr(valutil.FirstNonZero(opts.Priority, jobInsertOpts.Priority, river.PriorityDefault)), + Queue: ptrutil.Ptr(valutil.FirstNonZero(opts.Queue, jobInsertOpts.Queue, river.QueueDefault)), + ScheduledAt: &now, + State: ptrutil.Ptr(rivertype.JobStateRunning), + Tags: sliceutil.FirstNonEmpty(opts.Tags, jobInsertOpts.Tags, []string{}), + UniqueKey: uniqueKey, + UniqueStates: internalUniqueOpts.StateBitmask(), + }) +} + +func makeJobFromFactoryBuild[T river.JobArgs](tb testing.TB, args T, jobOpts *testfactory.JobOpts) *river.Job[T] { + tb.Helper() + + jobParams := testfactory.Job_Build(tb, jobOpts) + + var errors []rivertype.AttemptError + if jobParams.Errors != nil { + errors = make([]rivertype.AttemptError, len(jobParams.Errors)) + for i, err := range jobParams.Errors { + var attemptError rivertype.AttemptError + if err := json.Unmarshal(err, &attemptError); err != nil { + tb.Fatalf("failed to unmarshal attempt error: %s", err) + } + errors[i] = attemptError + } + } + + now := time.Now() + + return &river.Job[T]{ + Args: args, + JobRow: &rivertype.JobRow{ + ID: nextID(), + Attempt: jobParams.Attempt, + AttemptedAt: jobParams.AttemptedAt, + AttemptedBy: jobParams.AttemptedBy, + CreatedAt: ptrutil.ValOrDefault(jobParams.CreatedAt, now), + EncodedArgs: jobParams.EncodedArgs, + Errors: errors, + FinalizedAt: jobParams.FinalizedAt, + Kind: jobParams.Kind, + MaxAttempts: jobParams.MaxAttempts, + Metadata: jobParams.Metadata, + Priority: jobParams.Priority, + Queue: jobParams.Queue, + ScheduledAt: ptrutil.ValOrDefault(jobParams.ScheduledAt, now), + State: jobParams.State, + Tags: jobParams.Tags, + UniqueKey: jobParams.UniqueKey, + }, + } +} diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go new file mode 100644 index 00000000..75d15d0f --- /dev/null +++ b/rivertest/worker_test.go @@ -0,0 +1,182 @@ +package rivertest + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivertype" +) + +type testArgs struct { + Value string `json:"value"` +} + +func (testArgs) Kind() string { return "rivertest_work_test" } + +func TestWorker_Work(t *testing.T) { + t.Parallel() + + config := &river.Config{} + driver := riverpgxv5.New(nil) + + t.Run("WorkASimpleJob", func(t *testing.T) { + t.Parallel() + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, testArgs{Value: "test"}, job.Args) + require.Equal(t, 1, job.JobRow.Attempt) + require.NotNil(t, job.JobRow.AttemptedAt) + require.WithinDuration(t, time.Now(), *job.JobRow.AttemptedAt, 5*time.Second) + require.Equal(t, []string{"worker1"}, job.JobRow.AttemptedBy) + require.WithinDuration(t, time.Now(), job.JobRow.CreatedAt, 5*time.Second) + require.JSONEq(t, `{"value": "test"}`, string(job.JobRow.EncodedArgs)) + require.Empty(t, job.JobRow.Errors) + require.Nil(t, job.JobRow.FinalizedAt) + require.Positive(t, job.JobRow.ID) + require.Equal(t, "rivertest_work_test", job.JobRow.Kind) + require.Equal(t, river.MaxAttemptsDefault, job.JobRow.MaxAttempts) + require.Equal(t, []byte(`{}`), job.JobRow.Metadata) + require.Equal(t, river.PriorityDefault, job.JobRow.Priority) + require.Equal(t, river.QueueDefault, job.JobRow.Queue) + require.WithinDuration(t, time.Now(), job.JobRow.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateRunning, job.JobRow.State) + require.Equal(t, []string{}, job.JobRow.Tags) + require.Nil(t, job.JobRow.UniqueKey) + + return nil + }) + tw := NewWorker(t, driver, config, worker) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test"}, nil)) + }) + + t.Run("Reusable", func(t *testing.T) { + t.Parallel() + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + return nil + }) + tw := NewWorker(t, driver, config, worker) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test"}, nil)) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test2"}, nil)) + }) + + t.Run("SetsCustomInsertOpts", func(t *testing.T) { + t.Parallel() + + uniqueOpts := river.UniqueOpts{ByQueue: true} + hourFromNow := time.Now().Add(1 * time.Hour) + internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts) + uniqueKey, err := dbunique.UniqueKey(&baseservice.UnStubbableTimeGenerator{}, internalUniqueOpts, &rivertype.JobInsertParams{ + Args: testArgs{Value: "test3"}, + CreatedAt: &hourFromNow, + EncodedArgs: []byte(`{"value": "test3"}`), + Kind: "rivertest_work_test", + MaxAttempts: 420, + Metadata: []byte(`{"key": "value"}`), + Priority: 3, + Queue: "custom_queue", + State: rivertype.JobStateAvailable, + Tags: []string{"tag1", "tag2"}, + }) + require.NoError(t, err) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, testArgs{Value: "test3"}, job.Args) + require.Equal(t, 1, job.JobRow.Attempt) + require.NotNil(t, job.JobRow.AttemptedAt) + require.WithinDuration(t, hourFromNow, *job.JobRow.AttemptedAt, 2*time.Second) + require.Equal(t, []string{"worker1"}, job.JobRow.AttemptedBy) + require.WithinDuration(t, hourFromNow, job.JobRow.CreatedAt, 2*time.Second) + require.JSONEq(t, `{"value": "test3"}`, string(job.JobRow.EncodedArgs)) + require.Empty(t, job.JobRow.Errors) + require.Nil(t, job.JobRow.FinalizedAt) + require.Positive(t, job.JobRow.ID) + require.Equal(t, "rivertest_work_test", job.JobRow.Kind) + require.Equal(t, 420, job.JobRow.MaxAttempts) + require.JSONEq(t, `{"key": "value"}`, string(job.JobRow.Metadata)) + require.Equal(t, 3, job.JobRow.Priority) + require.Equal(t, "custom_queue", job.JobRow.Queue) + require.WithinDuration(t, hourFromNow, job.JobRow.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateRunning, job.JobRow.State) + require.Equal(t, []string{"tag1", "tag2"}, job.JobRow.Tags) + require.Equal(t, uniqueKey, job.JobRow.UniqueKey) + + return nil + }) + tw := NewWorker(t, driver, config, worker) + + // You can also use the WorkOpts method to pass in custom insert options: + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test3"}, &river.InsertOpts{ + MaxAttempts: 420, + Metadata: []byte(`{"key": "value"}`), + Pending: true, // ignored but added to ensure non-default behavior + Priority: 3, + Queue: "custom_queue", + ScheduledAt: hourFromNow, + Tags: []string{"tag1", "tag2"}, + UniqueOpts: uniqueOpts, + })) + }) + + t.Run("UniqueOptsByPeriodRespectsCustomStubbedTime", func(t *testing.T) { + t.Parallel() + + stubTime := &riversharedtest.TimeStub{} + now := time.Now().UTC() + stubTime.StubNowUTC(now) + config := &river.Config{ + Test: river.TestConfig{Time: stubTime}, + } + + uniqueOpts := river.UniqueOpts{ByPeriod: 1 * time.Hour} + internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts) + uniqueKey, err := dbunique.UniqueKey(stubTime, internalUniqueOpts, &rivertype.JobInsertParams{ + Args: testArgs{Value: "test3"}, + CreatedAt: &now, + EncodedArgs: []byte(`{"value": "test3"}`), + Kind: "rivertest_work_test", + }) + require.NoError(t, err) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, uniqueKey, job.JobRow.UniqueKey) + + return nil + }) + tw := NewWorker(t, driver, config, worker) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test"}, &river.InsertOpts{ + UniqueOpts: uniqueOpts, + })) + }) +} + +func TestWorker_WorkJob(t *testing.T) { + t.Parallel() + + config := &river.Config{} + driver := riverpgxv5.New(nil) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, []string{"worker123"}, job.JobRow.AttemptedBy) + return nil + }) + tw := NewWorker(t, driver, config, worker) + + now := time.Now() + require.NoError(t, tw.WorkJob(context.Background(), t, makeJobFromFactoryBuild(t, testArgs{Value: "test"}, &testfactory.JobOpts{ + AttemptedAt: &now, + AttemptedBy: []string{"worker123"}, + CreatedAt: &now, + EncodedArgs: []byte(`{"value": "test"}`), + Errors: nil, + }))) +} From 8b36c267bec56515c25b1a51b3f1f056899172cd Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Fri, 14 Feb 2025 17:37:06 -0600 Subject: [PATCH 6/6] panic if JobCompleteTx called within a test worker This can't succeed if the record isn't in the database, so panic if this situation occurs within a test worker environment. --- internal/execution/execution.go | 4 ++++ job_complete_tx.go | 5 +++++ job_complete_tx_test.go | 19 +++++++++++++++++++ rivertest/time_stub_test.go | 3 ++- rivertest/worker.go | 5 +++++ rivertest/worker_test.go | 4 ++++ 6 files changed, 39 insertions(+), 1 deletion(-) diff --git a/internal/execution/execution.go b/internal/execution/execution.go index 5d6ba90f..bcf9917e 100644 --- a/internal/execution/execution.go +++ b/internal/execution/execution.go @@ -7,6 +7,10 @@ import ( "github.com/riverqueue/river/rivertype" ) +// ContextKeyInsideTestWorker is an internal context key that indicates whether +// the worker is running inside a [rivertest.Worker]. +type ContextKeyInsideTestWorker struct{} + type Func func(ctx context.Context) error // MaybeApplyTimeout returns a context that will be cancelled after the given diff --git a/job_complete_tx.go b/job_complete_tx.go index bf4a8e8e..929a1a85 100644 --- a/job_complete_tx.go +++ b/job_complete_tx.go @@ -6,6 +6,7 @@ import ( "errors" "time" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" ) @@ -51,6 +52,10 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx return nil, err } if len(rows) == 0 { + if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker { + panic("to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first") + } + return nil, rivertype.ErrNotFound } updatedJob := &Job[TArgs]{JobRow: rows[0]} diff --git a/job_complete_tx_test.go b/job_complete_tx_test.go index 8b5f6f84..612b5a36 100644 --- a/job_complete_tx_test.go +++ b/job_complete_tx_test.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/riverdriver" @@ -98,4 +99,22 @@ func TestJobCompleteTx(t *testing.T) { _, err = JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) require.ErrorIs(t, err, rivertype.ErrNotFound) }) + + t.Run("PanicsIfCalledInTestWorkerWithoutInsertingJob", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + // delete the job as though it was never inserted: + _, err := bundle.client.JobDeleteTx(ctx, bundle.tx, job.ID) + require.NoError(t, err) + job.State = rivertype.JobStateRunning + + require.PanicsWithValue(t, "to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first", func() { + _, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) + require.NoError(t, err) + }) + }) } diff --git a/rivertest/time_stub_test.go b/rivertest/time_stub_test.go index 22af2717..8bc7f5b8 100644 --- a/rivertest/time_stub_test.go +++ b/rivertest/time_stub_test.go @@ -4,8 +4,9 @@ import ( "testing" "time" - "github.com/riverqueue/river/rivertype" "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivertype" ) // Ensure that TimeStub implements rivertype.TimeGenerator. diff --git a/rivertest/worker.go b/rivertest/worker.go index 43969f75..55a623c9 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -35,6 +35,10 @@ import ( // job. The execution environment has a realistic River context with access to // all River features, including [river.ClientFromContext] and worker // middleware. +// +// When relying on features that require a database record (such as JobCompleteTx), +// the job must be inserted into the database first and then executed with +// WorkJob. type Worker[T river.JobArgs, TTx any] struct { client *river.Client[TTx] config *river.Config @@ -81,6 +85,7 @@ func (w *Worker[T, TTx]) WorkJob(ctx context.Context, tb testing.TB, job *river. // populate river client into context: ctx = WorkContext(ctx, w.client) + ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true) doInner := execution.MiddlewareChain( w.config.WorkerMiddleware, diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go index 75d15d0f..66a9f7f3 100644 --- a/rivertest/worker_test.go +++ b/rivertest/worker_test.go @@ -9,6 +9,7 @@ import ( "github.com/riverqueue/river" "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riversharedtest" @@ -52,6 +53,9 @@ func TestWorker_Work(t *testing.T) { require.Equal(t, []string{}, job.JobRow.Tags) require.Nil(t, job.JobRow.UniqueKey) + _, hasContextKeyInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool) + require.True(t, hasContextKeyInsideTestWorker) + return nil }) tw := NewWorker(t, driver, config, worker)