diff --git a/client_test.go b/client_test.go index 8b54954a..d7aed575 100644 --- a/client_test.go +++ b/client_test.go @@ -1293,6 +1293,37 @@ func Test_Client_Common(t *testing.T) { require.True(t, workEndHookCalled) }) + t.Run("WithWorkerSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + AddWorkerArgs(bundle.config.Workers, JobArgs{}, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return SetMetadata(ctx, "worker_key", "worker_value") + })) + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "worker_value", metadata["worker_key"]) + }) + t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) { t.Parallel() @@ -1394,6 +1425,110 @@ func Test_Client_Common(t *testing.T) { require.True(t, hookWorkEndCalled.Load()) }) + t.Run("WithWorkEndHookSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + bundle.config.Hooks = []rivertype.Hook{ + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { + require.NoError(t, SetMetadata(ctx, "hook_key", "hook_value")) + return err + }), + } + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, noOpArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "hook_value", metadata["hook_key"]) + }) + + t.Run("WithWorkBeginHookSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + bundle.config.Hooks = []rivertype.Hook{ + HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { + return SetMetadata(ctx, "hook_begin_key", "hook_begin_value") + }), + } + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, noOpArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "hook_begin_value", metadata["hook_begin_key"]) + }) + + t.Run("WithWorkerMiddlewareSettingMetadata", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + worker := &workerWithMiddleware[JobArgs]{ + workFunc: func(ctx context.Context, job *Job[JobArgs]) error { + return nil + }, + middlewareFunc: func(job *rivertype.JobRow) []rivertype.WorkerMiddleware { + require.Equal(t, (JobArgs{}).Kind(), job.Kind) + + return []rivertype.WorkerMiddleware{ + WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + require.NoError(t, SetMetadata(ctx, "middleware_key", "middleware_value")) + return doInner(ctx) + }), + } + }, + } + + AddWorker(bundle.config.Workers, worker) + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadata map[string]any + require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata)) + require.Equal(t, "middleware_value", metadata["middleware_key"]) + }) + t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) { t.Parallel() @@ -1986,8 +2121,10 @@ func Test_Client_Stop_Common(t *testing.T) { client := runNewTestClient(ctx, t, newTestConfig(t, "")) - // Should shut down quickly: - ctx, cancel := context.WithTimeout(ctx, time.Second) + // Shutdown should still complete promptly, but the client may need a full + // leadership resign attempt before stopping, which can take about a second + // under race instrumentation. + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() require.NoError(t, client.Stop(ctx)) diff --git a/metadata.go b/metadata.go new file mode 100644 index 00000000..1647f62f --- /dev/null +++ b/metadata.go @@ -0,0 +1,38 @@ +package river + +import ( + "context" + "errors" + "strings" + + "github.com/riverqueue/river/internal/jobexecutor" +) + +var errMetadataNotSettable = errors.New("SetMetadata must be called within a worker, worker middleware, or work hook") + +// SetMetadata records a metadata value to be merged into the job's metadata +// when the current work attempt finishes. +// +// This function is only valid from a worker, worker middleware, or work hook +// like rivertype.HookWorkBegin or rivertype.HookWorkEnd. +// +// Metadata updates are stored on the work context and merged into the job row +// when the current work attempt finishes, whether the attempt succeeds or +// errors. Values must be JSON marshalable because metadata is stored in a +// jsonb column, and setting a key replaces any existing value at that key. +// +// Keys prefixed with `river:` are reserved for internal use and may not be set +// by user code. +func SetMetadata(ctx context.Context, key string, value any) error { + if strings.HasPrefix(key, "river:") { + return errors.New("SetMetadata cannot be used with keys prefixed with `river:`") + } + + metadataUpdates, ok := jobexecutor.MetadataUpdatesFromWorkContext(ctx) + if !ok { + return errMetadataNotSettable + } + + metadataUpdates[key] = value + return nil +} diff --git a/metadata_test.go b/metadata_test.go new file mode 100644 index 00000000..cdf7bafa --- /dev/null +++ b/metadata_test.go @@ -0,0 +1,40 @@ +package river + +import ( + "context" + "testing" + + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/stretchr/testify/require" +) + +func TestSetMetadata(t *testing.T) { + t.Parallel() + + t.Run("RejectsReservedPrefix", func(t *testing.T) { + t.Parallel() + + ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, map[string]any{}) + + err := SetMetadata(ctx, "river:reserved", "value") + require.EqualError(t, err, "SetMetadata cannot be used with keys prefixed with `river:`") + }) + + t.Run("RequiresWorkContext", func(t *testing.T) { + t.Parallel() + + err := SetMetadata(context.Background(), "key", "value") + require.EqualError(t, err, "SetMetadata must be called within a worker, worker middleware, or work hook") + }) + + t.Run("SetsValueOnWorkContext", func(t *testing.T) { + t.Parallel() + + metadataUpdates := map[string]any{} + ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates) + + err := SetMetadata(ctx, "key", "value") + require.NoError(t, err) + require.Equal(t, "value", metadataUpdates["key"]) + }) +}