From 8929dfcc39e4d1da518be10710110c75f756e527 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Mon, 1 Jun 2026 15:56:36 +0800 Subject: [PATCH 01/11] Clarify worker cancellation requirements for stuck jobs --- doc.go | 9 +++++++++ docs/README.md | 9 +++++++++ example_graceful_shutdown_test.go | 3 +++ worker.go | 11 ++++++++--- 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/doc.go b/doc.go index ac64df1f..17b7afdd 100644 --- a/doc.go +++ b/doc.go @@ -41,6 +41,11 @@ Workers expose a `Work` function that dictates how jobs run. return nil } +Workers should respect context cancellation. In particular, if work may block +on a channel, timer, or network operation, prefer a `select` that also watches +`ctx.Done()`. A job that ignores cancellation may continue running even after +River has timed it out or marked it rescued for retry. + # Registering workers Jobs are uniquely identified by their "kind" string. Workers are registered on @@ -73,6 +78,10 @@ goroutines at a time: panic(err) } +If jobs appear to be stuck in `running`, turn on info-level logging and look +for `num_jobs_stuck` in the producer job counts log line. A common cause is a +worker blocking without also checking `ctx.Done()`. + ## Insert-only clients It's often desirable to have a client that'll be used for inserting jobs, but diff --git a/docs/README.md b/docs/README.md index 75230831..05f9dc09 100644 --- a/docs/README.md +++ b/docs/README.md @@ -46,6 +46,11 @@ func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { } ``` +Workers should respect context cancellation. In particular, if work may block +on a channel, timer, or network operation, prefer a `select` that also watches +`ctx.Done()`. A job that ignores cancellation may continue running even after +River has timed it out or marked it rescued for retry. + ## Registering workers Jobs are uniquely identified by their "kind" string. Workers are registered on @@ -82,6 +87,10 @@ if err := riverClient.Start(ctx); err != nil { } ``` +If jobs appear to be stuck in `running`, turn on info-level logging and look +for `num_jobs_stuck` in the producer job counts log line. A common cause is a +worker blocking without also checking `ctx.Done()`. + `Workers` can also be omitted, but it's better to include it so River can check that inserted job kinds have a worker that can run them. diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index bf99b8ac..8b61b723 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -33,6 +33,9 @@ func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[Wait fmt.Printf("Working job that doesn't finish until cancelled\n") close(w.jobStarted) + // Real workers often block on channels, timers, or network calls. Make sure + // those waits also observe ctx.Done(), or the job may keep running after the + // client has timed it out or marked it rescued for retry. <-ctx.Done() fmt.Printf("Job cancelled\n") diff --git a/worker.go b/worker.go index 15493157..4ae5f631 100644 --- a/worker.go +++ b/worker.go @@ -64,9 +64,14 @@ type Worker[T JobArgs] interface { // marked completed. // // It is important for any worker to respect context cancellation to enable - // the client to respond to shutdown requests; there is no way to cancel a - // running job that does not respect context cancellation, other than - // terminating the process. + // the client to respond to shutdown requests. In particular, workers that + // wait on channels, timers, or network operations should prefer a `select` + // that also watches `ctx.Done()`. There is no way to cancel a running job + // that does not respect context cancellation, other than terminating the + // process. + // + // A worker that ignores cancellation may continue running even after the + // client has timed out the job or marked it rescued for retry. Work(ctx context.Context, job *Job[T]) error } From 3c16aabc3ee73509312229166ed1d21021f3d7f6 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Mon, 1 Jun 2026 16:23:39 +0800 Subject: [PATCH 02/11] Tighten stuck-job cancellation docs wording --- doc.go | 2 +- docs/README.md | 2 +- example_graceful_shutdown_test.go | 2 +- worker.go | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/doc.go b/doc.go index 17b7afdd..a0f80c43 100644 --- a/doc.go +++ b/doc.go @@ -44,7 +44,7 @@ Workers expose a `Work` function that dictates how jobs run. Workers should respect context cancellation. In particular, if work may block on a channel, timer, or network operation, prefer a `select` that also watches `ctx.Done()`. A job that ignores cancellation may continue running even after -River has timed it out or marked it rescued for retry. +River has timed it out or the job rescuer has moved it out of `running`. # Registering workers diff --git a/docs/README.md b/docs/README.md index 05f9dc09..cc57e17e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -49,7 +49,7 @@ func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { Workers should respect context cancellation. In particular, if work may block on a channel, timer, or network operation, prefer a `select` that also watches `ctx.Done()`. A job that ignores cancellation may continue running even after -River has timed it out or marked it rescued for retry. +River has timed it out or the job rescuer has moved it out of `running`. ## Registering workers diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index 8b61b723..1c17ad1d 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -35,7 +35,7 @@ func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[Wait // Real workers often block on channels, timers, or network calls. Make sure // those waits also observe ctx.Done(), or the job may keep running after the - // client has timed it out or marked it rescued for retry. + // client has timed it out or the job rescuer has moved it out of running. <-ctx.Done() fmt.Printf("Job cancelled\n") diff --git a/worker.go b/worker.go index 4ae5f631..f833b760 100644 --- a/worker.go +++ b/worker.go @@ -71,7 +71,8 @@ type Worker[T JobArgs] interface { // process. // // A worker that ignores cancellation may continue running even after the - // client has timed out the job or marked it rescued for retry. + // client has timed out the job or the job rescuer has moved it out of + // running. Work(ctx context.Context, job *Job[T]) error } From b2019c82b38ca3fc518077805798018061170aab Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Mon, 1 Jun 2026 19:51:29 +0800 Subject: [PATCH 03/11] Trim broad stuck-job doc additions --- doc.go | 9 --------- docs/README.md | 9 --------- example_graceful_shutdown_test.go | 3 --- 3 files changed, 21 deletions(-) diff --git a/doc.go b/doc.go index a0f80c43..ac64df1f 100644 --- a/doc.go +++ b/doc.go @@ -41,11 +41,6 @@ Workers expose a `Work` function that dictates how jobs run. return nil } -Workers should respect context cancellation. In particular, if work may block -on a channel, timer, or network operation, prefer a `select` that also watches -`ctx.Done()`. A job that ignores cancellation may continue running even after -River has timed it out or the job rescuer has moved it out of `running`. - # Registering workers Jobs are uniquely identified by their "kind" string. Workers are registered on @@ -78,10 +73,6 @@ goroutines at a time: panic(err) } -If jobs appear to be stuck in `running`, turn on info-level logging and look -for `num_jobs_stuck` in the producer job counts log line. A common cause is a -worker blocking without also checking `ctx.Done()`. - ## Insert-only clients It's often desirable to have a client that'll be used for inserting jobs, but diff --git a/docs/README.md b/docs/README.md index cc57e17e..75230831 100644 --- a/docs/README.md +++ b/docs/README.md @@ -46,11 +46,6 @@ func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { } ``` -Workers should respect context cancellation. In particular, if work may block -on a channel, timer, or network operation, prefer a `select` that also watches -`ctx.Done()`. A job that ignores cancellation may continue running even after -River has timed it out or the job rescuer has moved it out of `running`. - ## Registering workers Jobs are uniquely identified by their "kind" string. Workers are registered on @@ -87,10 +82,6 @@ if err := riverClient.Start(ctx); err != nil { } ``` -If jobs appear to be stuck in `running`, turn on info-level logging and look -for `num_jobs_stuck` in the producer job counts log line. A common cause is a -worker blocking without also checking `ctx.Done()`. - `Workers` can also be omitted, but it's better to include it so River can check that inserted job kinds have a worker that can run them. diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index 1c17ad1d..bf99b8ac 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -33,9 +33,6 @@ func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[Wait fmt.Printf("Working job that doesn't finish until cancelled\n") close(w.jobStarted) - // Real workers often block on channels, timers, or network calls. Make sure - // those waits also observe ctx.Done(), or the job may keep running after the - // client has timed it out or the job rescuer has moved it out of running. <-ctx.Done() fmt.Printf("Job cancelled\n") From 1f45624190690c5c9a51f3e22855737c28620cc0 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 11:48:49 +0800 Subject: [PATCH 04/11] Add helper for job metadata updates --- client_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++ metadata.go | 31 ++++++++++++++++++++++++ metadata_test.go | 40 +++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 metadata.go create mode 100644 metadata_test.go diff --git a/client_test.go b/client_test.go index 8b54954a..e3330598 100644 --- a/client_test.go +++ b/client_test.go @@ -70,6 +70,10 @@ type noOpWorker struct { func (w *noOpWorker) Work(ctx context.Context, job *Job[noOpArgs]) error { return nil } +type metadataWorkerArgs struct{} + +func (metadataWorkerArgs) Kind() string { return "metadata_worker" } + type periodicJobArgs struct{} func (periodicJobArgs) Kind() string { return "periodic_job" } @@ -1293,6 +1297,33 @@ func Test_Client_Common(t *testing.T) { require.True(t, workEndHookCalled) }) + t.Run("WithWorkerSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + AddWorkerArgs(bundle.config.Workers, metadataWorkerArgs{}, WorkFunc(func(ctx context.Context, job *Job[metadataWorkerArgs]) error { + return SetMetadata(ctx, "worker_key", "worker_value") + })) + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, metadataWorkerArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "worker_value", metadata["worker_key"]) + }) + t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) { t.Parallel() @@ -1394,6 +1425,36 @@ func Test_Client_Common(t *testing.T) { require.True(t, hookWorkEndCalled.Load()) }) + t.Run("WithWorkEndHookSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + bundle.config.Hooks = []rivertype.Hook{ + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { + require.NoError(t, SetMetadata(ctx, "hook_key", "hook_value")) + return err + }), + } + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, noOpArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "hook_value", metadata["hook_key"]) + }) + t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) { t.Parallel() diff --git a/metadata.go b/metadata.go new file mode 100644 index 00000000..baad5aaa --- /dev/null +++ b/metadata.go @@ -0,0 +1,31 @@ +package river + +import ( + "context" + "errors" + "strings" + + "github.com/riverqueue/river/internal/jobexecutor" +) + +var errMetadataNotSettable = errors.New("SetMetadata must be called within a Worker, worker middleware, or work hook") + +// SetMetadata records a metadata value to be merged into the job's metadata +// when the current work attempt finishes. +// +// This function is only valid from a worker, worker middleware, or work hook +// like rivertype.HookWorkEnd. Keys prefixed with `river:` are reserved for +// internal use and may not be set by user code. +func SetMetadata(ctx context.Context, key string, value any) error { + if strings.HasPrefix(key, "river:") { + return errors.New("SetMetadata cannot be used with keys prefixed with `river:`") + } + + metadataUpdates, ok := jobexecutor.MetadataUpdatesFromWorkContext(ctx) + if !ok { + return errMetadataNotSettable + } + + metadataUpdates[key] = value + return nil +} diff --git a/metadata_test.go b/metadata_test.go new file mode 100644 index 00000000..f14129a6 --- /dev/null +++ b/metadata_test.go @@ -0,0 +1,40 @@ +package river + +import ( + "context" + "testing" + + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/stretchr/testify/require" +) + +func TestSetMetadata(t *testing.T) { + t.Parallel() + + t.Run("RejectsReservedPrefix", func(t *testing.T) { + t.Parallel() + + ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, map[string]any{}) + + err := SetMetadata(ctx, "river:reserved", "value") + require.EqualError(t, err, "SetMetadata cannot be used with keys prefixed with `river:`") + }) + + t.Run("RequiresWorkContext", func(t *testing.T) { + t.Parallel() + + err := SetMetadata(context.Background(), "key", "value") + require.EqualError(t, err, "SetMetadata must be called within a Worker, worker middleware, or work hook") + }) + + t.Run("SetsValueOnWorkContext", func(t *testing.T) { + t.Parallel() + + metadataUpdates := map[string]any{} + ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates) + + err := SetMetadata(ctx, "key", "value") + require.NoError(t, err) + require.Equal(t, "value", metadataUpdates["key"]) + }) +} From 4df831367d0955ca03a712afc0cdef6a1880bb4d Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 11:59:11 +0800 Subject: [PATCH 05/11] Add middleware coverage for SetMetadata --- client_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/client_test.go b/client_test.go index e3330598..b33a9c44 100644 --- a/client_test.go +++ b/client_test.go @@ -1455,6 +1455,51 @@ func Test_Client_Common(t *testing.T) { require.Equal(t, "hook_value", metadata["hook_key"]) }) + t.Run("WithWorkerMiddlewareSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + worker := &workerWithMiddleware[JobArgs]{ + workFunc: func(ctx context.Context, job *Job[JobArgs]) error { + return nil + }, + middlewareFunc: func(job *rivertype.JobRow) []rivertype.WorkerMiddleware { + require.Equal(t, (JobArgs{}).Kind(), job.Kind) + + return []rivertype.WorkerMiddleware{ + WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + require.NoError(t, SetMetadata(ctx, "middleware_key", "middleware_value")) + return doInner(ctx) + }), + } + }, + } + + AddWorker(bundle.config.Workers, worker) + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "middleware_value", metadata["middleware_key"]) + }) + t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) { t.Parallel() From c2506704dc1f7f88116d3fcf8e215348fcb5e509 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 12:11:23 +0800 Subject: [PATCH 06/11] Relax stop test timeout under race --- client_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index b33a9c44..7e31702b 100644 --- a/client_test.go +++ b/client_test.go @@ -2092,8 +2092,10 @@ func Test_Client_Stop_Common(t *testing.T) { client := runNewTestClient(ctx, t, newTestConfig(t, "")) - // Should shut down quickly: - ctx, cancel := context.WithTimeout(ctx, time.Second) + // Shutdown should still complete promptly, but the client may need a full + // leadership resign attempt before stopping, which can take about a second + // under race instrumentation. + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() require.NoError(t, client.Stop(ctx)) From 8d180bad84f35b00d1b286ff9002ac87319ca75d Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 12:19:42 +0800 Subject: [PATCH 07/11] Add CLI custom schema migration coverage --- cmd/river/rivercli/river_cli_test.go | 32 ++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 95ee4fe5..bf5e8057 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -290,6 +290,38 @@ func TestBaseCommandSetNonParallel(t *testing.T) { cmd.SetArgs([]string{"migrate-up", "--schema", schema}) require.NoError(t, cmd.Execute()) }) + + t.Run("PostgresCustomSchemaMigrateUpValidateDown", func(t *testing.T) { + t.Parallel() + + cmd, _ := setup(t) + + testDatabaseURL := riversharedtest.TestDatabaseURL() + + config, err := pgxpool.ParseConfig(testDatabaseURL) + require.NoError(t, err) + + dbPool, err := pgxpool.NewWithConfig(ctx, config) + require.NoError(t, err) + + driver := riverpgxv5.New(dbPool) + schema := riverdbtest.TestSchema(ctx, t, driver, nil) + + cmd.SetArgs([]string{"migrate-up", "--database-url", testDatabaseURL, "--schema", schema}) + require.NoError(t, cmd.Execute()) + + cmd, _ = setup(t) + cmd.SetArgs([]string{"validate", "--database-url", testDatabaseURL, "--schema", schema}) + require.NoError(t, cmd.Execute()) + + cmd, _ = setup(t) + cmd.SetArgs([]string{"migrate-down", "--database-url", testDatabaseURL, "--max-steps", "100", "--schema", schema}) + require.NoError(t, cmd.Execute()) + + cmd, _ = setup(t) + cmd.SetArgs([]string{"validate", "--database-url", testDatabaseURL, "--schema", schema}) + require.Error(t, cmd.Execute()) + }) } func TestBaseCommandSetPostgresTimeoutPrecedence(t *testing.T) { From d2ca1425b58214172ec341e8e1451ac50b3aa277 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 12:26:53 +0800 Subject: [PATCH 08/11] Align CLI custom schema test with repo rules --- cmd/river/rivercli/river_cli_test.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index bf5e8057..f990694f 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -237,13 +237,11 @@ Built with %s // out into its own test block so that we don't have to mark the entire block // above as non-parallel because a few tests can't be made parallel. func TestBaseCommandSetNonParallel(t *testing.T) { - ctx := context.Background() + t.Parallel() - type testBundle struct { - out *bytes.Buffer - } + ctx := context.Background() - setup := func(t *testing.T) (*cobra.Command, *testBundle) { + setup := func(t *testing.T) *cobra.Command { t.Helper() cli := NewCLI(&Config{ @@ -253,13 +251,11 @@ func TestBaseCommandSetNonParallel(t *testing.T) { var out bytes.Buffer cli.SetOut(&out) - return cli.BaseCommandSet(), &testBundle{ - out: &out, - } + return cli.BaseCommandSet() } t.Run("PGEnvWithoutDatabaseURL", func(t *testing.T) { - cmd, _ := setup(t) + cmd := setup(t) testDatabaseURL := riversharedtest.TestDatabaseURL() @@ -294,7 +290,7 @@ func TestBaseCommandSetNonParallel(t *testing.T) { t.Run("PostgresCustomSchemaMigrateUpValidateDown", func(t *testing.T) { t.Parallel() - cmd, _ := setup(t) + cmd := setup(t) testDatabaseURL := riversharedtest.TestDatabaseURL() @@ -310,15 +306,15 @@ func TestBaseCommandSetNonParallel(t *testing.T) { cmd.SetArgs([]string{"migrate-up", "--database-url", testDatabaseURL, "--schema", schema}) require.NoError(t, cmd.Execute()) - cmd, _ = setup(t) + cmd = setup(t) cmd.SetArgs([]string{"validate", "--database-url", testDatabaseURL, "--schema", schema}) require.NoError(t, cmd.Execute()) - cmd, _ = setup(t) + cmd = setup(t) cmd.SetArgs([]string{"migrate-down", "--database-url", testDatabaseURL, "--max-steps", "100", "--schema", schema}) require.NoError(t, cmd.Execute()) - cmd, _ = setup(t) + cmd = setup(t) cmd.SetArgs([]string{"validate", "--database-url", testDatabaseURL, "--schema", schema}) require.Error(t, cmd.Execute()) }) From afae714ca3c369b789e05774ad3e1cf6b9cc10c0 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 14:52:08 +0800 Subject: [PATCH 09/11] Narrow SetMetadata PR scope --- client_test.go | 12 ++++---- cmd/river/rivercli/river_cli_test.go | 46 ++++++---------------------- metadata.go | 2 +- metadata_test.go | 2 +- 4 files changed, 17 insertions(+), 45 deletions(-) diff --git a/client_test.go b/client_test.go index 7e31702b..45ff844d 100644 --- a/client_test.go +++ b/client_test.go @@ -70,10 +70,6 @@ type noOpWorker struct { func (w *noOpWorker) Work(ctx context.Context, job *Job[noOpArgs]) error { return nil } -type metadataWorkerArgs struct{} - -func (metadataWorkerArgs) Kind() string { return "metadata_worker" } - type periodicJobArgs struct{} func (periodicJobArgs) Kind() string { return "periodic_job" } @@ -1302,7 +1298,11 @@ func Test_Client_Common(t *testing.T) { _, bundle := setup(t) - AddWorkerArgs(bundle.config.Workers, metadataWorkerArgs{}, WorkFunc(func(ctx context.Context, job *Job[metadataWorkerArgs]) error { + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + AddWorkerArgs(bundle.config.Workers, JobArgs{}, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { return SetMetadata(ctx, "worker_key", "worker_value") })) @@ -1312,7 +1312,7 @@ func Test_Client_Common(t *testing.T) { subscribeChan := subscribe(t, client) startClient(ctx, t, client) - insertRes, err := client.Insert(ctx, metadataWorkerArgs{}, nil) + insertRes, err := client.Insert(ctx, JobArgs{}, nil) require.NoError(t, err) event := riversharedtest.WaitOrTimeout(t, subscribeChan) diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index f990694f..95ee4fe5 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -237,11 +237,13 @@ Built with %s // out into its own test block so that we don't have to mark the entire block // above as non-parallel because a few tests can't be made parallel. func TestBaseCommandSetNonParallel(t *testing.T) { - t.Parallel() - ctx := context.Background() - setup := func(t *testing.T) *cobra.Command { + type testBundle struct { + out *bytes.Buffer + } + + setup := func(t *testing.T) (*cobra.Command, *testBundle) { t.Helper() cli := NewCLI(&Config{ @@ -251,11 +253,13 @@ func TestBaseCommandSetNonParallel(t *testing.T) { var out bytes.Buffer cli.SetOut(&out) - return cli.BaseCommandSet() + return cli.BaseCommandSet(), &testBundle{ + out: &out, + } } t.Run("PGEnvWithoutDatabaseURL", func(t *testing.T) { - cmd := setup(t) + cmd, _ := setup(t) testDatabaseURL := riversharedtest.TestDatabaseURL() @@ -286,38 +290,6 @@ func TestBaseCommandSetNonParallel(t *testing.T) { cmd.SetArgs([]string{"migrate-up", "--schema", schema}) require.NoError(t, cmd.Execute()) }) - - t.Run("PostgresCustomSchemaMigrateUpValidateDown", func(t *testing.T) { - t.Parallel() - - cmd := setup(t) - - testDatabaseURL := riversharedtest.TestDatabaseURL() - - config, err := pgxpool.ParseConfig(testDatabaseURL) - require.NoError(t, err) - - dbPool, err := pgxpool.NewWithConfig(ctx, config) - require.NoError(t, err) - - driver := riverpgxv5.New(dbPool) - schema := riverdbtest.TestSchema(ctx, t, driver, nil) - - cmd.SetArgs([]string{"migrate-up", "--database-url", testDatabaseURL, "--schema", schema}) - require.NoError(t, cmd.Execute()) - - cmd = setup(t) - cmd.SetArgs([]string{"validate", "--database-url", testDatabaseURL, "--schema", schema}) - require.NoError(t, cmd.Execute()) - - cmd = setup(t) - cmd.SetArgs([]string{"migrate-down", "--database-url", testDatabaseURL, "--max-steps", "100", "--schema", schema}) - require.NoError(t, cmd.Execute()) - - cmd = setup(t) - cmd.SetArgs([]string{"validate", "--database-url", testDatabaseURL, "--schema", schema}) - require.Error(t, cmd.Execute()) - }) } func TestBaseCommandSetPostgresTimeoutPrecedence(t *testing.T) { diff --git a/metadata.go b/metadata.go index baad5aaa..cfc85277 100644 --- a/metadata.go +++ b/metadata.go @@ -8,7 +8,7 @@ import ( "github.com/riverqueue/river/internal/jobexecutor" ) -var errMetadataNotSettable = errors.New("SetMetadata must be called within a Worker, worker middleware, or work hook") +var errMetadataNotSettable = errors.New("SetMetadata must be called within a worker, worker middleware, or work hook") // SetMetadata records a metadata value to be merged into the job's metadata // when the current work attempt finishes. diff --git a/metadata_test.go b/metadata_test.go index f14129a6..cdf7bafa 100644 --- a/metadata_test.go +++ b/metadata_test.go @@ -24,7 +24,7 @@ func TestSetMetadata(t *testing.T) { t.Parallel() err := SetMetadata(context.Background(), "key", "value") - require.EqualError(t, err, "SetMetadata must be called within a Worker, worker middleware, or work hook") + require.EqualError(t, err, "SetMetadata must be called within a worker, worker middleware, or work hook") }) t.Run("SetsValueOnWorkContext", func(t *testing.T) { From 8db022b925cb2fb25c3e6595e10d7f4cd37c5c09 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 15:07:52 +0800 Subject: [PATCH 10/11] Trigger CI rerun From f81b14328345dadbc5355f1ea9355391383c5e48 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Tue, 2 Jun 2026 17:43:32 +0800 Subject: [PATCH 11/11] Cover HookWorkBegin metadata usage --- client_test.go | 29 +++++++++++++++++++++++++++++ metadata.go | 11 +++++++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index 45ff844d..d7aed575 100644 --- a/client_test.go +++ b/client_test.go @@ -1455,6 +1455,35 @@ func Test_Client_Common(t *testing.T) { require.Equal(t, "hook_value", metadata["hook_key"]) }) + t.Run("WithWorkBeginHookSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + bundle.config.Hooks = []rivertype.Hook{ + HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { + return SetMetadata(ctx, "hook_begin_key", "hook_begin_value") + }), + } + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, noOpArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "hook_begin_value", metadata["hook_begin_key"]) + }) + t.Run("WithWorkerMiddlewareSettingMetadata", func(t *testing.T) { t.Parallel() diff --git a/metadata.go b/metadata.go index cfc85277..1647f62f 100644 --- a/metadata.go +++ b/metadata.go @@ -14,8 +14,15 @@ var errMetadataNotSettable = errors.New("SetMetadata must be called within a wor // when the current work attempt finishes. // // This function is only valid from a worker, worker middleware, or work hook -// like rivertype.HookWorkEnd. Keys prefixed with `river:` are reserved for -// internal use and may not be set by user code. +// like rivertype.HookWorkBegin or rivertype.HookWorkEnd. +// +// Metadata updates are stored on the work context and merged into the job row +// when the current work attempt finishes, whether the attempt succeeds or +// errors. Values must be JSON marshalable because metadata is stored in a +// jsonb column, and setting a key replaces any existing value at that key. +// +// Keys prefixed with `river:` are reserved for internal use and may not be set +// by user code. func SetMetadata(ctx context.Context, key string, value any) error { if strings.HasPrefix(key, "river:") { return errors.New("SetMetadata cannot be used with keys prefixed with `river:`")