diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a897257..fad08032 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Exposed `TestConfig` struct on `Config` under the `Test` field for configuration that is specific to test environments. For now, the only field on this type is `Time`, which can be used to set a synthetic `TimeGenerator` for tests. A stubbable time generator was added as `rivertest.TimeStub` to allow time to be easily stubbed in tests. [PR #754](https://github.com/riverqueue/river/pull/754). +- New `rivertest.Worker` type to make it significantly easier to test River workers. Either real or synthetic jobs can be worked using this interface, generally without requiring any database interactions. The `Worker` type provides a realistic execution environment with access to the full range of River features, including `river.ClientFromContext`, middleware (both global and per-worker), and timeouts. [PR #753](https://github.com/riverqueue/river/pull/753). ### Changed @@ -19,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - `riverdatabasesql` driver: properly handle `nil` values in `bytea[]` inputs. This fixes the driver's handling of empty unique keys on insert for non-unique jobs with the newer unique jobs implementation. [PR #739](https://github.com/riverqueue/river/pull/739). +- `JobCompleteTx` now returns `rivertype.ErrNotFound` if the job doesn't exist instead of panicking. [PR #753](https://github.com/riverqueue/river/pull/753). ## [0.16.0] - 2024-01-27 diff --git a/client.go b/client.go index 6537177e..5cbe22eb 100644 --- a/client.go +++ b/client.go @@ -268,6 +268,58 @@ type Config struct { schedulerInterval time.Duration } +// WithDefaults returns a copy of the Config with all default values applied. +func (c *Config) WithDefaults() *Config { + // Use the existing logger if set, otherwise create a default one. + logger := c.Logger + if logger == nil { + logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelWarn, + })) + } + + // Compute the default rescue value. For convenience, if JobTimeout is specified + // but RescueStuckJobsAfter is not set (or less than 1) and JobTimeout is large, use + // JobTimeout + maintenance.JobRescuerRescueAfterDefault as the default. + rescueAfter := maintenance.JobRescuerRescueAfterDefault + if c.JobTimeout > 0 && c.RescueStuckJobsAfter < 1 && c.JobTimeout > c.RescueStuckJobsAfter { + rescueAfter = c.JobTimeout + maintenance.JobRescuerRescueAfterDefault + } + + // Set default retry policy if none is provided. + retryPolicy := c.RetryPolicy + if retryPolicy == nil { + retryPolicy = &DefaultClientRetryPolicy{} + } + + return &Config{ + AdvisoryLockPrefix: c.AdvisoryLockPrefix, + CancelledJobRetentionPeriod: valutil.ValOrDefault(c.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault), + CompletedJobRetentionPeriod: valutil.ValOrDefault(c.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault), + DiscardedJobRetentionPeriod: valutil.ValOrDefault(c.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault), + ErrorHandler: c.ErrorHandler, + FetchCooldown: valutil.ValOrDefault(c.FetchCooldown, FetchCooldownDefault), + FetchPollInterval: valutil.ValOrDefault(c.FetchPollInterval, FetchPollIntervalDefault), + ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }), + JobInsertMiddleware: c.JobInsertMiddleware, + JobTimeout: valutil.ValOrDefault(c.JobTimeout, JobTimeoutDefault), + Logger: logger, + MaxAttempts: valutil.ValOrDefault(c.MaxAttempts, MaxAttemptsDefault), + PeriodicJobs: c.PeriodicJobs, + PollOnly: c.PollOnly, + Queues: c.Queues, + ReindexerSchedule: c.ReindexerSchedule, + RescueStuckJobsAfter: valutil.ValOrDefault(c.RescueStuckJobsAfter, rescueAfter), + RetryPolicy: retryPolicy, + SkipUnknownJobCheck: c.SkipUnknownJobCheck, + Test: c.Test, + TestOnly: c.TestOnly, + Workers: c.Workers, + WorkerMiddleware: c.WorkerMiddleware, + schedulerInterval: valutil.ValOrDefault(c.schedulerInterval, maintenance.JobSchedulerIntervalDefault), + } +} + func (c *Config) validate() error { if c.CancelledJobRetentionPeriod < 0 { return errors.New("CancelledJobRetentionPeriod time cannot be less than zero") @@ -460,55 +512,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return nil, errMissingConfig } - logger := config.Logger - if logger == nil { - logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ - Level: slog.LevelWarn, - })) - } - - retryPolicy := config.RetryPolicy - if retryPolicy == nil { - retryPolicy = &DefaultClientRetryPolicy{} - } - - // For convenience, in case the user's specified a large JobTimeout but no - // RescueStuckJobsAfter, since RescueStuckJobsAfter must be greater than - // JobTimeout, set a reasonable default value that's longer than JobTimeout. - rescueAfter := maintenance.JobRescuerRescueAfterDefault - if config.JobTimeout > 0 && config.RescueStuckJobsAfter < 1 && config.JobTimeout > config.RescueStuckJobsAfter { - rescueAfter = config.JobTimeout + maintenance.JobRescuerRescueAfterDefault - } - - // Create a new version of config with defaults filled in. This replaces the - // original object, so everything that we care about must be initialized - // here, even if it's only carrying over the original value. - config = &Config{ - AdvisoryLockPrefix: config.AdvisoryLockPrefix, - CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault), - CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault), - DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault), - ErrorHandler: config.ErrorHandler, - FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault), - FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault), - ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }), - JobInsertMiddleware: config.JobInsertMiddleware, - JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault), - Logger: logger, - MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault), - PeriodicJobs: config.PeriodicJobs, - PollOnly: config.PollOnly, - Queues: config.Queues, - ReindexerSchedule: config.ReindexerSchedule, - RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter), - RetryPolicy: retryPolicy, - SkipUnknownJobCheck: config.SkipUnknownJobCheck, - Test: config.Test, - TestOnly: config.TestOnly, - Workers: config.Workers, - WorkerMiddleware: config.WorkerMiddleware, - schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault), - } + config = config.WithDefaults() if err := config.validate(); err != nil { return nil, err @@ -567,7 +571,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, client.notifier) } } else { - logger.Info("Driver does not support listener; entering poll only mode") + config.Logger.Info("Driver does not support listener; entering poll only mode") } client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{ @@ -608,7 +612,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ - ClientRetryPolicy: retryPolicy, + ClientRetryPolicy: config.RetryPolicy, RescueAfter: config.RescueStuckJobsAfter, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { if workerInfo, ok := config.Workers.workersMap[kind]; ok { diff --git a/internal/execution/execution.go b/internal/execution/execution.go new file mode 100644 index 00000000..bcf9917e --- /dev/null +++ b/internal/execution/execution.go @@ -0,0 +1,49 @@ +package execution + +import ( + "context" + "time" + + "github.com/riverqueue/river/rivertype" +) + +// ContextKeyInsideTestWorker is an internal context key that indicates whether +// the worker is running inside a [rivertest.Worker]. +type ContextKeyInsideTestWorker struct{} + +type Func func(ctx context.Context) error + +// MaybeApplyTimeout returns a context that will be cancelled after the given +// timeout. If the timeout is <= 0, the context will not be timed out, but it +// will still have a cancel function returned. In either case the cancel +// function should be called after execution (in a defer). +func MaybeApplyTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + // No timeout if a -1 was specified. + if timeout > 0 { + return context.WithTimeout(ctx, timeout) + } + + return context.WithCancel(ctx) +} + +// MiddlewareChain chains together the given middleware functions, returning a +// single function that applies them all in reverse order. +func MiddlewareChain(global, worker []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func { + allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(global)+len(worker)) + allMiddleware = append(allMiddleware, global...) + allMiddleware = append(allMiddleware, worker...) + + if len(allMiddleware) > 0 { + // Wrap middlewares in reverse order so the one defined first is wrapped + // as the outermost function and is first to receive the operation. + for i := len(allMiddleware) - 1; i >= 0; i-- { + middlewareItem := allMiddleware[i] // capture the current middleware item + previousDoInner := doInner // capture the current doInner function + doInner = func(ctx context.Context) error { + return middlewareItem.Work(ctx, jobRow, previousDoInner) + } + } + } + + return doInner +} diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 088a4a81..f0c2553a 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1082,6 +1082,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job, err := exec.JobInsertFull(ctx, &riverdriver.JobInsertFullParams{ Attempt: 3, AttemptedAt: &now, + AttemptedBy: []string{"worker1", "worker2"}, CreatedAt: &now, EncodedArgs: []byte(`{"encoded": "args"}`), Errors: [][]byte{[]byte(`{"error": "message"}`)}, @@ -1099,6 +1100,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) require.Equal(t, 3, job.Attempt) requireEqualTime(t, now, *job.AttemptedAt) + require.Equal(t, []string{"worker1", "worker2"}, job.AttemptedBy) requireEqualTime(t, now, job.CreatedAt) require.JSONEq(t, `{"encoded": "args"}`, string(job.EncodedArgs)) require.Equal(t, "message", job.Errors[0].Error) diff --git a/job_complete_tx.go b/job_complete_tx.go index 0f6b7005..929a1a85 100644 --- a/job_complete_tx.go +++ b/job_complete_tx.go @@ -6,6 +6,7 @@ import ( "errors" "time" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" ) @@ -50,6 +51,13 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx if err != nil { return nil, err } + if len(rows) == 0 { + if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker { + panic("to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first") + } + + return nil, rivertype.ErrNotFound + } updatedJob := &Job[TArgs]{JobRow: rows[0]} if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { diff --git a/job_complete_tx_test.go b/job_complete_tx_test.go index 497688fc..612b5a36 100644 --- a/job_complete_tx_test.go +++ b/job_complete_tx_test.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/riverdriver" @@ -79,4 +80,41 @@ func TestJobCompleteTx(t *testing.T) { _, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) require.EqualError(t, err, "job must be running") }) + + t.Run("ErrorIfJobDoesntExist", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateAvailable), + }) + + // delete the job + _, err := bundle.exec.JobDelete(ctx, job.ID) + require.NoError(t, err) + + // fake the job's state to work around the check: + job.State = rivertype.JobStateRunning + _, err = JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("PanicsIfCalledInTestWorkerWithoutInsertingJob", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + // delete the job as though it was never inserted: + _, err := bundle.client.JobDeleteTx(ctx, bundle.tx, job.ID) + require.NoError(t, err) + job.State = rivertype.JobStateRunning + + require.PanicsWithValue(t, "to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first", func() { + _, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}) + require.NoError(t, err) + }) + }) } diff --git a/job_executor.go b/job_executor.go index 2f84bd99..21529668 100644 --- a/job_executor.go +++ b/job_executor.go @@ -9,11 +9,13 @@ import ( "runtime/debug" "time" + "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobstats" "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/util/valutil" "github.com/riverqueue/river/rivertype" ) @@ -204,43 +206,10 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { return &jobExecutorResult{Err: err} } - workerMiddleware := e.WorkUnit.Middleware() - - doInner := func(ctx context.Context) error { - jobTimeout := e.WorkUnit.Timeout() - if jobTimeout == 0 { - jobTimeout = e.ClientJobTimeout - } - - // No timeout if a -1 was specified. - if jobTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, jobTimeout) - defer cancel() - } - - if err := e.WorkUnit.Work(ctx); err != nil { - return err - } - - return nil - } - - allMiddleware := make([]rivertype.WorkerMiddleware, 0, len(e.GlobalMiddleware)+len(workerMiddleware)) - allMiddleware = append(allMiddleware, e.GlobalMiddleware...) - allMiddleware = append(allMiddleware, workerMiddleware...) - - if len(allMiddleware) > 0 { - // Wrap middlewares in reverse order so the one defined first is wrapped - // as the outermost function and is first to receive the operation. - for i := len(allMiddleware) - 1; i >= 0; i-- { - middlewareItem := allMiddleware[i] // capture the current middleware item - previousDoInner := doInner // Capture the current doInner function - doInner = func(ctx context.Context) error { - return middlewareItem.Work(ctx, e.JobRow, previousDoInner) - } - } - } + doInner := execution.MiddlewareChain(e.GlobalMiddleware, e.WorkUnit.Middleware(), e.WorkUnit.Work, e.JobRow) + jobTimeout := valutil.FirstNonZero(e.WorkUnit.Timeout(), e.ClientJobTimeout) + ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) + defer cancel() return &jobExecutorResult{Err: doInner(ctx)} } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 9287878b..46527b14 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -273,6 +273,7 @@ type JobInsertFastResult struct { type JobInsertFullParams struct { Attempt int AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time EncodedArgs []byte Errors [][]byte diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index b6fe0118..c6abc07f 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -718,6 +718,7 @@ INSERT INTO river_job( args, attempt, attempted_at, + attempted_by, created_at, errors, finalized_at, @@ -735,19 +736,20 @@ INSERT INTO river_job( $1::jsonb, coalesce($2::smallint, 0), $3, - coalesce($4::timestamptz, now()), - $5, + coalesce($4::text[], '{}'), + coalesce($5::timestamptz, now()), $6, $7, - $8::smallint, - coalesce($9::jsonb, '{}'), - $10, + $8, + $9::smallint, + coalesce($10::jsonb, '{}'), $11, - coalesce($12::timestamptz, now()), - $13, - coalesce($14::varchar(255)[], '{}'), - $15, - $16 + $12, + coalesce($13::timestamptz, now()), + $14, + coalesce($15::varchar(255)[], '{}'), + $16, + $17 ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` @@ -755,6 +757,7 @@ type JobInsertFullParams struct { Args string Attempt int16 AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time Errors []string FinalizedAt *time.Time @@ -775,6 +778,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.Args, arg.Attempt, arg.AttemptedAt, + pq.Array(arg.AttemptedBy), arg.CreatedAt, pq.Array(arg.Errors), arg.FinalizedAt, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index a89c6023..f57962ae 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -241,7 +241,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. insertJobsParams.ScheduledAt[i] = scheduledAt insertJobsParams.State[i] = string(params.State) insertJobsParams.Tags[i] = strings.Join(tags, ",") - insertJobsParams.UniqueKey[i] = sliceutil.DefaultIfEmpty(params.UniqueKey, nil) + insertJobsParams.UniqueKey[i] = sliceutil.FirstNonEmpty(params.UniqueKey) insertJobsParams.UniqueStates[i] = pgtypealias.Bits{Bits: pgtype.Bits{Bytes: []byte{params.UniqueStates}, Len: 8, Valid: params.UniqueStates != 0}} } @@ -315,6 +315,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns job, err := dbsqlc.New().JobInsertFull(ctx, e.dbtx, &dbsqlc.JobInsertFullParams{ Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec AttemptedAt: params.AttemptedAt, + AttemptedBy: params.AttemptedBy, Args: string(params.EncodedArgs), CreatedAt: params.CreatedAt, Errors: sliceutil.Map(params.Errors, func(e []byte) string { return string(e) }), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index eaee9279..068ddcf8 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -277,6 +277,7 @@ INSERT INTO river_job( args, attempt, attempted_at, + attempted_by, created_at, errors, finalized_at, @@ -294,6 +295,7 @@ INSERT INTO river_job( @args::jsonb, coalesce(@attempt::smallint, 0), @attempted_at, + coalesce(@attempted_by::text[], '{}'), coalesce(sqlc.narg('created_at')::timestamptz, now()), @errors, @finalized_at, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 781c7fe3..d5f04038 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -702,6 +702,7 @@ INSERT INTO river_job( args, attempt, attempted_at, + attempted_by, created_at, errors, finalized_at, @@ -719,19 +720,20 @@ INSERT INTO river_job( $1::jsonb, coalesce($2::smallint, 0), $3, - coalesce($4::timestamptz, now()), - $5, + coalesce($4::text[], '{}'), + coalesce($5::timestamptz, now()), $6, $7, - $8::smallint, - coalesce($9::jsonb, '{}'), - $10, + $8, + $9::smallint, + coalesce($10::jsonb, '{}'), $11, - coalesce($12::timestamptz, now()), - $13, - coalesce($14::varchar(255)[], '{}'), - $15, - $16 + $12, + coalesce($13::timestamptz, now()), + $14, + coalesce($15::varchar(255)[], '{}'), + $16, + $17 ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` @@ -739,6 +741,7 @@ type JobInsertFullParams struct { Args []byte Attempt int16 AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time Errors [][]byte FinalizedAt *time.Time @@ -759,6 +762,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.Args, arg.Attempt, arg.AttemptedAt, + arg.AttemptedBy, arg.CreatedAt, arg.Errors, arg.FinalizedAt, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 504dc4cb..3ba19304 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -226,16 +226,16 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. defaultObject := []byte("{}") - insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) + insertJobsParams.Args[i] = sliceutil.FirstNonEmpty(params.EncodedArgs, defaultObject) insertJobsParams.Kind[i] = params.Kind insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec - insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) + insertJobsParams.Metadata[i] = sliceutil.FirstNonEmpty(params.Metadata, defaultObject) insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec insertJobsParams.Queue[i] = params.Queue insertJobsParams.ScheduledAt[i] = scheduledAt insertJobsParams.State[i] = string(params.State) insertJobsParams.Tags[i] = strings.Join(tags, ",") - insertJobsParams.UniqueKey[i] = sliceutil.DefaultIfEmpty(params.UniqueKey, nil) + insertJobsParams.UniqueKey[i] = sliceutil.FirstNonEmpty(params.UniqueKey) insertJobsParams.UniqueStates[i] = pgtype.Bits{Bytes: []byte{params.UniqueStates}, Len: 8, Valid: params.UniqueStates != 0} } @@ -302,6 +302,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns job, err := dbsqlc.New().JobInsertFull(ctx, e.dbtx, &dbsqlc.JobInsertFullParams{ Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec AttemptedAt: params.AttemptedAt, + AttemptedBy: params.AttemptedBy, Args: params.EncodedArgs, CreatedAt: params.CreatedAt, Errors: params.Errors, diff --git a/rivershared/clientconfig/client_config.go b/rivershared/clientconfig/client_config.go deleted file mode 100644 index c239eda0..00000000 --- a/rivershared/clientconfig/client_config.go +++ /dev/null @@ -1,19 +0,0 @@ -package clientconfig - -import ( - "time" - - "github.com/riverqueue/river/internal/dbunique" -) - -// InsertOpts is a mirror of river.InsertOpts usable by other internal packages. -type InsertOpts struct { - MaxAttempts int - Metadata []byte - Pending bool - Priority int - Queue string - ScheduledAt time.Time - Tags []string - UniqueOpts dbunique.UniqueOpts -} diff --git a/rivershared/go.mod b/rivershared/go.mod index 59db1a2a..22f095cb 100644 --- a/rivershared/go.mod +++ b/rivershared/go.mod @@ -17,11 +17,5 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/tidwall/gjson v1.18.0 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.1 // indirect - github.com/tidwall/sjson v1.2.5 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/text v0.22.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/rivershared/go.sum b/rivershared/go.sum index 62fefad0..299d37a1 100644 --- a/rivershared/go.sum +++ b/rivershared/go.sum @@ -1,14 +1,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= -github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= -github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= -github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -25,26 +17,10 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= -github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= -github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= -github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= -github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= -github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/rivershared/testfactory/test_factory.go b/rivershared/testfactory/test_factory.go index f6f3dd4c..568e24b6 100644 --- a/rivershared/testfactory/test_factory.go +++ b/rivershared/testfactory/test_factory.go @@ -21,6 +21,7 @@ import ( type JobOpts struct { Attempt *int AttemptedAt *time.Time + AttemptedBy []string CreatedAt *time.Time EncodedArgs []byte Errors [][]byte @@ -73,6 +74,7 @@ func Job_Build(tb testing.TB, opts *JobOpts) *riverdriver.JobInsertFullParams { return &riverdriver.JobInsertFullParams{ Attempt: ptrutil.ValOrDefault(opts.Attempt, 0), AttemptedAt: opts.AttemptedAt, + AttemptedBy: opts.AttemptedBy, CreatedAt: opts.CreatedAt, EncodedArgs: encodedArgs, Errors: opts.Errors, diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index 8088e05a..a0b7262a 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -4,13 +4,15 @@ // therefore omitted from the utilities in `slices`. package sliceutil -// DefaultIfEmpty returns the default slice if the input slice is nil or empty, -// otherwise it returns the input slice. -func DefaultIfEmpty[T any](input []T, defaultSlice []T) []T { - if len(input) == 0 { - return defaultSlice +// FirstNonEmpty returns the first non-empty slice from the input, or nil if +// all input slices are empty. +func FirstNonEmpty[T any](inputs ...[]T) []T { + for _, input := range inputs { + if len(input) > 0 { + return input + } } - return input + return nil } // GroupBy returns an object composed of keys generated from the results of diff --git a/rivershared/util/sliceutil/slice_util_test.go b/rivershared/util/sliceutil/slice_util_test.go index 55e1dbb0..96f240b6 100644 --- a/rivershared/util/sliceutil/slice_util_test.go +++ b/rivershared/util/sliceutil/slice_util_test.go @@ -11,9 +11,9 @@ import ( func TestDefaultIfEmpty(t *testing.T) { t.Parallel() - result1 := DefaultIfEmpty([]int{1, 2, 3}, []int{4, 5, 6}) - result2 := DefaultIfEmpty([]int{}, []int{4, 5, 6}) - result3 := DefaultIfEmpty(nil, []int{4, 5, 6}) + result1 := FirstNonEmpty([]int{1, 2, 3}, []int{4, 5, 6}) + result2 := FirstNonEmpty([]int{}, []int{4, 5, 6}) + result3 := FirstNonEmpty(nil, []int{4, 5, 6}) require.Len(t, result1, 3) require.Len(t, result2, 3) diff --git a/rivertest/time_stub_test.go b/rivertest/time_stub_test.go index 22af2717..8bc7f5b8 100644 --- a/rivertest/time_stub_test.go +++ b/rivertest/time_stub_test.go @@ -4,8 +4,9 @@ import ( "testing" "time" - "github.com/riverqueue/river/rivertype" "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivertype" ) // Ensure that TimeStub implements rivertype.TimeGenerator. diff --git a/rivertest/worker.go b/rivertest/worker.go new file mode 100644 index 00000000..55a623c9 --- /dev/null +++ b/rivertest/worker.go @@ -0,0 +1,223 @@ +package rivertest + +import ( + "context" + "encoding/json" + "sync/atomic" + "testing" + "time" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivershared/util/sliceutil" + "github.com/riverqueue/river/rivershared/util/valutil" + "github.com/riverqueue/river/rivertype" +) + +// Worker makes it easier to test river workers. Once built, the worker can be +// used to work any number of synthetic jobs without touching the database: +// +// worker := rivertest.NewWorker(t, driver, config, worker) +// if err := worker.Work(ctx, t, args, nil); err != nil { +// t.Fatalf("failed to work job: %s", err) +// } +// +// if err := worker.Work(ctx, t, args, &river.InsertOpts{Queue: "custom_queue"}); err != nil { +// t.Fatalf("failed to work job: %s", err) +// } +// +// In all cases the underlying [river.Worker] will be called with the synthetic +// job. The execution environment has a realistic River context with access to +// all River features, including [river.ClientFromContext] and worker +// middleware. +// +// When relying on features that require a database record (such as JobCompleteTx), +// the job must be inserted into the database first and then executed with +// WorkJob. +type Worker[T river.JobArgs, TTx any] struct { + client *river.Client[TTx] + config *river.Config + worker river.Worker[T] +} + +// NewWorker creates a new test Worker for testing the provided [river.Worker]. +// The worker uses the provided driver and River config to populate default +// values on test jobs and to configure the execution environment. +// +// The database is not required to use this helper, and a pool-less driver is +// recommended for most usage. This enables individual test cases to run in +// parallel and with full isolation, even using a single shared `Worker` +// instance across many test cases. +func NewWorker[T river.JobArgs, TTx any](tb testing.TB, driver riverdriver.Driver[TTx], config *river.Config, worker river.Worker[T]) *Worker[T, TTx] { + tb.Helper() + + config = config.WithDefaults() + client, err := river.NewClient(driver, config) + if err != nil { + tb.Fatalf("failed to create client: %s", err) + } + + return &Worker[T, TTx]{ + client: client, + config: config, + worker: worker, + } +} + +// Work allocates a synthetic job with the provided arguments and optional +// insert options, then works it. +func (w *Worker[T, TTx]) Work(ctx context.Context, tb testing.TB, args T, opts *river.InsertOpts) error { + tb.Helper() + job := makeJobFromArgs(tb, w.config, args, opts) + return w.WorkJob(ctx, tb, job) +} + +// WorkJob works the provided job. The job must be constructed to be a realistic +// job using external logic prior to calling this method. Unlike the other +// variants, this method offers total control over the job's attributes. +func (w *Worker[T, TTx]) WorkJob(ctx context.Context, tb testing.TB, job *river.Job[T]) error { + tb.Helper() + + // populate river client into context: + ctx = WorkContext(ctx, w.client) + ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true) + + doInner := execution.MiddlewareChain( + w.config.WorkerMiddleware, + w.worker.Middleware(job), + func(ctx context.Context) error { return w.worker.Work(ctx, job) }, + job.JobRow, + ) + + jobTimeout := valutil.FirstNonZero(w.worker.Timeout(job), w.config.JobTimeout) + ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) + defer cancel() + + return doInner(ctx) +} + +var idSeq int64 = 0 //nolint:gochecknoglobals + +func nextID() int64 { + return atomic.AddInt64(&idSeq, 1) +} + +func makeJobFromArgs[T river.JobArgs](tb testing.TB, config *river.Config, args T, opts *river.InsertOpts) *river.Job[T] { + tb.Helper() + + encodedArgs, err := json.Marshal(args) + if err != nil { + tb.Fatalf("failed to marshal args: %s", err) + } + + // Round trip the args to validate JSON marshalling/unmarshalling on the type. + var decodedArgs T + if err = json.Unmarshal(encodedArgs, &decodedArgs); err != nil { + tb.Fatalf("failed to unmarshal args: %s", err) + } + + if opts == nil { + opts = &river.InsertOpts{} + } + + // Extract any job insert opts from args type if present + var jobInsertOpts river.InsertOpts + if argsWithOpts, ok := any(args).(river.JobArgsWithInsertOpts); ok { + jobInsertOpts = argsWithOpts.InsertOpts() + } + + now := valutil.FirstNonZero(opts.ScheduledAt, jobInsertOpts.ScheduledAt, time.Now()) + + internalUniqueOpts := (*dbunique.UniqueOpts)(&opts.UniqueOpts) + insertParams := &rivertype.JobInsertParams{ + Args: args, + CreatedAt: &now, + EncodedArgs: encodedArgs, + Kind: args.Kind(), + MaxAttempts: valutil.FirstNonZero(opts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts, river.MaxAttemptsDefault), + Metadata: sliceutil.FirstNonEmpty(opts.Metadata, jobInsertOpts.Metadata, []byte(`{}`)), + Priority: valutil.FirstNonZero(opts.Priority, jobInsertOpts.Priority, river.PriorityDefault), + Queue: valutil.FirstNonZero(opts.Queue, jobInsertOpts.Queue, river.QueueDefault), + State: rivertype.JobStateAvailable, + Tags: sliceutil.FirstNonEmpty(opts.Tags, jobInsertOpts.Tags, []string{}), + } + var uniqueKey []byte + timeGen := config.Test.Time + if timeGen == nil { + timeGen = &baseservice.UnStubbableTimeGenerator{} + } + if !internalUniqueOpts.IsEmpty() { + uniqueKey, err = dbunique.UniqueKey(valutil.ValOrDefault(config.Test.Time, timeGen), internalUniqueOpts, insertParams) + if err != nil { + tb.Fatalf("failed to create unique key: %s", err) + } + } + + return makeJobFromFactoryBuild(tb, args, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(1), + AttemptedAt: &now, + AttemptedBy: []string{"worker1"}, + CreatedAt: &now, + EncodedArgs: encodedArgs, + Errors: nil, + FinalizedAt: nil, + Kind: ptrutil.Ptr(args.Kind()), + MaxAttempts: ptrutil.Ptr(valutil.FirstNonZero(opts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts, river.MaxAttemptsDefault)), + Metadata: sliceutil.FirstNonEmpty(opts.Metadata, jobInsertOpts.Metadata, []byte(`{}`)), + Priority: ptrutil.Ptr(valutil.FirstNonZero(opts.Priority, jobInsertOpts.Priority, river.PriorityDefault)), + Queue: ptrutil.Ptr(valutil.FirstNonZero(opts.Queue, jobInsertOpts.Queue, river.QueueDefault)), + ScheduledAt: &now, + State: ptrutil.Ptr(rivertype.JobStateRunning), + Tags: sliceutil.FirstNonEmpty(opts.Tags, jobInsertOpts.Tags, []string{}), + UniqueKey: uniqueKey, + UniqueStates: internalUniqueOpts.StateBitmask(), + }) +} + +func makeJobFromFactoryBuild[T river.JobArgs](tb testing.TB, args T, jobOpts *testfactory.JobOpts) *river.Job[T] { + tb.Helper() + + jobParams := testfactory.Job_Build(tb, jobOpts) + + var errors []rivertype.AttemptError + if jobParams.Errors != nil { + errors = make([]rivertype.AttemptError, len(jobParams.Errors)) + for i, err := range jobParams.Errors { + var attemptError rivertype.AttemptError + if err := json.Unmarshal(err, &attemptError); err != nil { + tb.Fatalf("failed to unmarshal attempt error: %s", err) + } + errors[i] = attemptError + } + } + + now := time.Now() + + return &river.Job[T]{ + Args: args, + JobRow: &rivertype.JobRow{ + ID: nextID(), + Attempt: jobParams.Attempt, + AttemptedAt: jobParams.AttemptedAt, + AttemptedBy: jobParams.AttemptedBy, + CreatedAt: ptrutil.ValOrDefault(jobParams.CreatedAt, now), + EncodedArgs: jobParams.EncodedArgs, + Errors: errors, + FinalizedAt: jobParams.FinalizedAt, + Kind: jobParams.Kind, + MaxAttempts: jobParams.MaxAttempts, + Metadata: jobParams.Metadata, + Priority: jobParams.Priority, + Queue: jobParams.Queue, + ScheduledAt: ptrutil.ValOrDefault(jobParams.ScheduledAt, now), + State: jobParams.State, + Tags: jobParams.Tags, + UniqueKey: jobParams.UniqueKey, + }, + } +} diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go new file mode 100644 index 00000000..66a9f7f3 --- /dev/null +++ b/rivertest/worker_test.go @@ -0,0 +1,186 @@ +package rivertest + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivertype" +) + +type testArgs struct { + Value string `json:"value"` +} + +func (testArgs) Kind() string { return "rivertest_work_test" } + +func TestWorker_Work(t *testing.T) { + t.Parallel() + + config := &river.Config{} + driver := riverpgxv5.New(nil) + + t.Run("WorkASimpleJob", func(t *testing.T) { + t.Parallel() + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, testArgs{Value: "test"}, job.Args) + require.Equal(t, 1, job.JobRow.Attempt) + require.NotNil(t, job.JobRow.AttemptedAt) + require.WithinDuration(t, time.Now(), *job.JobRow.AttemptedAt, 5*time.Second) + require.Equal(t, []string{"worker1"}, job.JobRow.AttemptedBy) + require.WithinDuration(t, time.Now(), job.JobRow.CreatedAt, 5*time.Second) + require.JSONEq(t, `{"value": "test"}`, string(job.JobRow.EncodedArgs)) + require.Empty(t, job.JobRow.Errors) + require.Nil(t, job.JobRow.FinalizedAt) + require.Positive(t, job.JobRow.ID) + require.Equal(t, "rivertest_work_test", job.JobRow.Kind) + require.Equal(t, river.MaxAttemptsDefault, job.JobRow.MaxAttempts) + require.Equal(t, []byte(`{}`), job.JobRow.Metadata) + require.Equal(t, river.PriorityDefault, job.JobRow.Priority) + require.Equal(t, river.QueueDefault, job.JobRow.Queue) + require.WithinDuration(t, time.Now(), job.JobRow.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateRunning, job.JobRow.State) + require.Equal(t, []string{}, job.JobRow.Tags) + require.Nil(t, job.JobRow.UniqueKey) + + _, hasContextKeyInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool) + require.True(t, hasContextKeyInsideTestWorker) + + return nil + }) + tw := NewWorker(t, driver, config, worker) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test"}, nil)) + }) + + t.Run("Reusable", func(t *testing.T) { + t.Parallel() + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + return nil + }) + tw := NewWorker(t, driver, config, worker) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test"}, nil)) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test2"}, nil)) + }) + + t.Run("SetsCustomInsertOpts", func(t *testing.T) { + t.Parallel() + + uniqueOpts := river.UniqueOpts{ByQueue: true} + hourFromNow := time.Now().Add(1 * time.Hour) + internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts) + uniqueKey, err := dbunique.UniqueKey(&baseservice.UnStubbableTimeGenerator{}, internalUniqueOpts, &rivertype.JobInsertParams{ + Args: testArgs{Value: "test3"}, + CreatedAt: &hourFromNow, + EncodedArgs: []byte(`{"value": "test3"}`), + Kind: "rivertest_work_test", + MaxAttempts: 420, + Metadata: []byte(`{"key": "value"}`), + Priority: 3, + Queue: "custom_queue", + State: rivertype.JobStateAvailable, + Tags: []string{"tag1", "tag2"}, + }) + require.NoError(t, err) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, testArgs{Value: "test3"}, job.Args) + require.Equal(t, 1, job.JobRow.Attempt) + require.NotNil(t, job.JobRow.AttemptedAt) + require.WithinDuration(t, hourFromNow, *job.JobRow.AttemptedAt, 2*time.Second) + require.Equal(t, []string{"worker1"}, job.JobRow.AttemptedBy) + require.WithinDuration(t, hourFromNow, job.JobRow.CreatedAt, 2*time.Second) + require.JSONEq(t, `{"value": "test3"}`, string(job.JobRow.EncodedArgs)) + require.Empty(t, job.JobRow.Errors) + require.Nil(t, job.JobRow.FinalizedAt) + require.Positive(t, job.JobRow.ID) + require.Equal(t, "rivertest_work_test", job.JobRow.Kind) + require.Equal(t, 420, job.JobRow.MaxAttempts) + require.JSONEq(t, `{"key": "value"}`, string(job.JobRow.Metadata)) + require.Equal(t, 3, job.JobRow.Priority) + require.Equal(t, "custom_queue", job.JobRow.Queue) + require.WithinDuration(t, hourFromNow, job.JobRow.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateRunning, job.JobRow.State) + require.Equal(t, []string{"tag1", "tag2"}, job.JobRow.Tags) + require.Equal(t, uniqueKey, job.JobRow.UniqueKey) + + return nil + }) + tw := NewWorker(t, driver, config, worker) + + // You can also use the WorkOpts method to pass in custom insert options: + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test3"}, &river.InsertOpts{ + MaxAttempts: 420, + Metadata: []byte(`{"key": "value"}`), + Pending: true, // ignored but added to ensure non-default behavior + Priority: 3, + Queue: "custom_queue", + ScheduledAt: hourFromNow, + Tags: []string{"tag1", "tag2"}, + UniqueOpts: uniqueOpts, + })) + }) + + t.Run("UniqueOptsByPeriodRespectsCustomStubbedTime", func(t *testing.T) { + t.Parallel() + + stubTime := &riversharedtest.TimeStub{} + now := time.Now().UTC() + stubTime.StubNowUTC(now) + config := &river.Config{ + Test: river.TestConfig{Time: stubTime}, + } + + uniqueOpts := river.UniqueOpts{ByPeriod: 1 * time.Hour} + internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts) + uniqueKey, err := dbunique.UniqueKey(stubTime, internalUniqueOpts, &rivertype.JobInsertParams{ + Args: testArgs{Value: "test3"}, + CreatedAt: &now, + EncodedArgs: []byte(`{"value": "test3"}`), + Kind: "rivertest_work_test", + }) + require.NoError(t, err) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, uniqueKey, job.JobRow.UniqueKey) + + return nil + }) + tw := NewWorker(t, driver, config, worker) + require.NoError(t, tw.Work(context.Background(), t, testArgs{Value: "test"}, &river.InsertOpts{ + UniqueOpts: uniqueOpts, + })) + }) +} + +func TestWorker_WorkJob(t *testing.T) { + t.Parallel() + + config := &river.Config{} + driver := riverpgxv5.New(nil) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, []string{"worker123"}, job.JobRow.AttemptedBy) + return nil + }) + tw := NewWorker(t, driver, config, worker) + + now := time.Now() + require.NoError(t, tw.WorkJob(context.Background(), t, makeJobFromFactoryBuild(t, testArgs{Value: "test"}, &testfactory.JobOpts{ + AttemptedAt: &now, + AttemptedBy: []string{"worker123"}, + CreatedAt: &now, + EncodedArgs: []byte(`{"value": "test"}`), + Errors: nil, + }))) +}