Skip to content
Open
141 changes: 139 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,37 @@ func Test_Client_Common(t *testing.T) {
require.True(t, workEndHookCalled)
})

t.Run("WithWorkerSettingMetadata", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

AddWorkerArgs(bundle.config.Workers, JobArgs{}, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return SetMetadata(ctx, "worker_key", "worker_value")
}))
Comment thread
peter941221 marked this conversation as resolved.

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "worker_value", metadata["worker_key"])
})

t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1394,6 +1425,110 @@ func Test_Client_Common(t *testing.T) {
require.True(t, hookWorkEndCalled.Load())
})

t.Run("WithWorkEndHookSettingMetadata", func(t *testing.T) {
Comment thread
peter941221 marked this conversation as resolved.
t.Parallel()

_, bundle := setup(t)

bundle.config.Hooks = []rivertype.Hook{
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
require.NoError(t, SetMetadata(ctx, "hook_key", "hook_value"))
return err
}),
}

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, noOpArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "hook_value", metadata["hook_key"])
})

t.Run("WithWorkBeginHookSettingMetadata", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

bundle.config.Hooks = []rivertype.Hook{
HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error {
return SetMetadata(ctx, "hook_begin_key", "hook_begin_value")
}),
}

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, noOpArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "hook_begin_value", metadata["hook_begin_key"])
})

t.Run("WithWorkerMiddlewareSettingMetadata", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

worker := &workerWithMiddleware[JobArgs]{
workFunc: func(ctx context.Context, job *Job[JobArgs]) error {
return nil
},
middlewareFunc: func(job *rivertype.JobRow) []rivertype.WorkerMiddleware {
require.Equal(t, (JobArgs{}).Kind(), job.Kind)

return []rivertype.WorkerMiddleware{
WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
require.NoError(t, SetMetadata(ctx, "middleware_key", "middleware_value"))
return doInner(ctx)
}),
}
},
}

AddWorker(bundle.config.Workers, worker)

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "middleware_value", metadata["middleware_key"])
})

t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1986,8 +2121,10 @@ func Test_Client_Stop_Common(t *testing.T) {

client := runNewTestClient(ctx, t, newTestConfig(t, ""))

// Should shut down quickly:
ctx, cancel := context.WithTimeout(ctx, time.Second)
// Shutdown should still complete promptly, but the client may need a full
// leadership resign attempt before stopping, which can take about a second
// under race instrumentation.
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

require.NoError(t, client.Stop(ctx))
Expand Down
38 changes: 38 additions & 0 deletions metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package river

import (
"context"
"errors"
"strings"

"github.com/riverqueue/river/internal/jobexecutor"
)

var errMetadataNotSettable = errors.New("SetMetadata must be called within a worker, worker middleware, or work hook")

// SetMetadata records a metadata value to be merged into the job's metadata
// when the current work attempt finishes.
//
// This function is only valid from a worker, worker middleware, or work hook
// like rivertype.HookWorkBegin or rivertype.HookWorkEnd.
//
// Metadata updates are stored on the work context and merged into the job row
// when the current work attempt finishes, whether the attempt succeeds or
// errors. Values must be JSON marshalable because metadata is stored in a
// jsonb column, and setting a key replaces any existing value at that key.
//
// Keys prefixed with `river:` are reserved for internal use and may not be set
// by user code.
func SetMetadata(ctx context.Context, key string, value any) error {
if strings.HasPrefix(key, "river:") {
return errors.New("SetMetadata cannot be used with keys prefixed with `river:`")
}

metadataUpdates, ok := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !ok {
return errMetadataNotSettable
}

metadataUpdates[key] = value
return nil
}
40 changes: 40 additions & 0 deletions metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package river

import (
"context"
"testing"

"github.com/riverqueue/river/internal/jobexecutor"
"github.com/stretchr/testify/require"
)

func TestSetMetadata(t *testing.T) {
t.Parallel()

t.Run("RejectsReservedPrefix", func(t *testing.T) {
t.Parallel()

ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, map[string]any{})

err := SetMetadata(ctx, "river:reserved", "value")
require.EqualError(t, err, "SetMetadata cannot be used with keys prefixed with `river:`")
})

t.Run("RequiresWorkContext", func(t *testing.T) {
t.Parallel()

err := SetMetadata(context.Background(), "key", "value")
require.EqualError(t, err, "SetMetadata must be called within a worker, worker middleware, or work hook")
})

t.Run("SetsValueOnWorkContext", func(t *testing.T) {
t.Parallel()

metadataUpdates := map[string]any{}
ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates)

err := SetMetadata(ctx, "key", "value")
require.NoError(t, err)
require.Equal(t, "value", metadataUpdates["key"])
})
}
Loading