Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ type Config struct {

// WithDefaults returns a copy of the Config with all default values applied.
func (c *Config) WithDefaults() *Config {
if c == nil {
c = &Config{}
}

// Use the existing logger if set, otherwise create a default one.
logger := c.Logger
if logger == nil {
Expand Down
50 changes: 34 additions & 16 deletions rivertest/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -79,14 +80,14 @@ func NewWorker[T river.JobArgs, TTx any](tb testing.TB, driver riverdriver.Drive
}
}

func (w *Worker[T, TTx]) insertJob(ctx context.Context, tb testing.TB, tx TTx, args T, opts *river.InsertOpts) *rivertype.JobRow {
func (w *Worker[T, TTx]) insertJob(ctx context.Context, tb testing.TB, tx TTx, args T, opts *river.InsertOpts) (*rivertype.JobRow, error) {
tb.Helper()

result, err := w.client.InsertTx(ctx, tx, args, opts)
if err != nil {
tb.Fatalf("failed to insert job: %s", err)
return nil, err
}
return result.Job
return result.Job, nil
}

// Work inserts a job with the provided arguments and optional insert options,
Expand All @@ -99,10 +100,13 @@ func (w *Worker[T, TTx]) insertJob(ctx context.Context, tb testing.TB, tx TTx, a
//
// The returned error only reflects _real_ errors and does not include
// explicitly returned snooze or cancel errors from the worker.
func (w *Worker[T, TTx]) Work(ctx context.Context, tb testing.TB, tx TTx, args T, opts *river.InsertOpts) (WorkResult, error) {
func (w *Worker[T, TTx]) Work(ctx context.Context, tb testing.TB, tx TTx, args T, opts *river.InsertOpts) (*WorkResult, error) {
tb.Helper()

job := w.insertJob(ctx, tb, tx, args, opts)
job, err := w.insertJob(ctx, tb, tx, args, opts)
if err != nil {
return nil, fmt.Errorf("failed to insert job: %w", err)
}
return w.WorkJob(ctx, tb, tx, job)
}

Expand All @@ -116,12 +120,12 @@ func (w *Worker[T, TTx]) Work(ctx context.Context, tb testing.TB, tx TTx, args T
//
// The returned error only reflects _real_ errors and does not include
// explicitly returned snooze or cancel errors from the worker.
func (w *Worker[T, TTx]) WorkJob(ctx context.Context, tb testing.TB, tx TTx, job *rivertype.JobRow) (WorkResult, error) {
func (w *Worker[T, TTx]) WorkJob(ctx context.Context, tb testing.TB, tx TTx, job *rivertype.JobRow) (*WorkResult, error) {
tb.Helper()
return w.workJob(ctx, tb, tx, job)
}

func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job *rivertype.JobRow) (WorkResult, error) {
func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job *rivertype.JobRow) (*WorkResult, error) {
tb.Helper()

timeGen := w.config.Test.Time
Expand Down Expand Up @@ -151,7 +155,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job
State: rivertype.JobStateRunning,
})
if err != nil && !errors.Is(err, rivertype.ErrNotFound) {
tb.Fatalf("test worker internal error: failed to update job to running state: %s", err)
return nil, fmt.Errorf("test worker internal error: failed to update job to running state: %w", err)
}
job = updatedJobRow

Expand Down Expand Up @@ -181,7 +185,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job
return nil
},
HandlePanicFunc: func(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *jobexecutor.ErrorHandlerResult {
tb.Fatalf("panic: %v", panicVal)
resultErr = &PanicError{Cause: panicVal, Trace: trace}
return nil
},
},
Expand Down Expand Up @@ -212,14 +216,14 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job

// WorkResult is the result of working a job in the test Worker.
type WorkResult struct {
// EventKind is the kind of event that occurred following execution.
EventKind river.EventKind

// Job is the updated job row from the database _after_ it has been worked.
Job *rivertype.JobRow

// Kind is the kind of event that occurred following execution.
Kind river.EventKind
}

func completerResultToWorkResult(tb testing.TB, completerResult jobcompleter.CompleterJobUpdated) WorkResult {
func completerResultToWorkResult(tb testing.TB, completerResult jobcompleter.CompleterJobUpdated) *WorkResult {
tb.Helper()

var kind river.EventKind
Expand All @@ -239,12 +243,26 @@ func completerResultToWorkResult(tb testing.TB, completerResult jobcompleter.Com
panic("test worker internal error: unreachable state to distribute, river bug")
}

return WorkResult{
Job: completerResult.Job,
Kind: kind,
return &WorkResult{
EventKind: kind,
Job: completerResult.Job,
}
}

type PanicError struct {
Cause any
Trace string
}

func (e *PanicError) Error() string {
return fmt.Sprintf("rivertest.PanicError: %v\n%s", e.Cause, e.Trace)
}

func (e *PanicError) Is(target error) bool {
_, ok := target.(*PanicError)
return ok
}

type errorHandlerWrapper struct {
HandleErrorFunc func(ctx context.Context, job *rivertype.JobRow, err error) *jobexecutor.ErrorHandlerResult
HandlePanicFunc func(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *jobexecutor.ErrorHandlerResult
Expand Down
137 changes: 127 additions & 10 deletions rivertest/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rivertest

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -13,6 +14,8 @@ import (
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/testfactory"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)

Expand All @@ -22,6 +25,47 @@ type testArgs struct {

func (testArgs) Kind() string { return "rivertest_work_test" }

func TestPanicError(t *testing.T) {
t.Parallel()

panicErr := &PanicError{Cause: errors.New("test panic error"), Trace: "test trace"}
require.Equal(t, "rivertest.PanicError: test panic error\ntest trace", panicErr.Error())
}

func TestWorker_NewWorker(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
config *river.Config
driver *riverpgxv5.Driver
tx pgx.Tx
}

setup := func(t *testing.T) *testBundle {
t.Helper()

return &testBundle{
config: &river.Config{ID: "rivertest-worker"},
driver: riverpgxv5.New(nil),
tx: riverinternaltest.TestTx(ctx, t),
}
}

t.Run("HandlesNilRiverConfig", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error {
return nil
})
tw := NewWorker(t, bundle.driver, nil, worker)
require.NotNil(t, tw.config)
})
}

func TestWorker_Work(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -83,7 +127,7 @@ func TestWorker_Work(t *testing.T) {
tw := NewWorker(t, bundle.driver, bundle.config, worker)
res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
})

t.Run("Reusable", func(t *testing.T) {
Expand All @@ -97,10 +141,10 @@ func TestWorker_Work(t *testing.T) {
tw := NewWorker(t, bundle.driver, bundle.config, worker)
res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
res, err = tw.Work(ctx, t, bundle.tx, testArgs{Value: "test2"}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
})

t.Run("SetsCustomInsertOpts", func(t *testing.T) {
Expand Down Expand Up @@ -145,7 +189,7 @@ func TestWorker_Work(t *testing.T) {
Tags: []string{"tag1", "tag2"},
})
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
})

t.Run("UniqueOptsAreIgnored", func(t *testing.T) {
Expand All @@ -171,7 +215,7 @@ func TestWorker_Work(t *testing.T) {
UniqueOpts: river.UniqueOpts{ByPeriod: 1 * time.Hour},
})
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
})

t.Run("ReturnsASnoozeEventKindWhenSnoozed", func(t *testing.T) {
Expand All @@ -186,7 +230,7 @@ func TestWorker_Work(t *testing.T) {

res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobSnoozed, res.Kind)
require.Equal(t, river.EventKindJobSnoozed, res.EventKind)
})

t.Run("ReturnsACancelEventKindWhenCancelled", func(t *testing.T) {
Expand All @@ -201,7 +245,7 @@ func TestWorker_Work(t *testing.T) {

res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCancelled, res.Kind)
require.Equal(t, river.EventKindJobCancelled, res.EventKind)
})

t.Run("UsesACustomClockWhenProvided", func(t *testing.T) {
Expand All @@ -223,9 +267,65 @@ func TestWorker_Work(t *testing.T) {

res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
require.WithinDuration(t, hourFromNow, *res.Job.FinalizedAt, time.Millisecond)
})

t.Run("ErrorFromWorker", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

errToReturn := errors.New("test error")
worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error {
return errToReturn
})
tw := NewWorker(t, bundle.driver, bundle.config, worker)

res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.ErrorIs(t, err, errToReturn)
require.Equal(t, river.EventKindJobFailed, res.EventKind)
})

t.Run("PanicFromWorker", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

errToReturn := errors.New("test panic error")
worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error {
panic(errToReturn)
})
tw := NewWorker(t, bundle.driver, bundle.config, worker)

res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.ErrorIs(t, err, &PanicError{})
require.ErrorContains(t, err, "test panic error")
require.Equal(t, river.EventKindJobFailed, res.EventKind)

var panicErr *PanicError
require.ErrorAs(t, err, &panicErr)
require.Equal(t, errToReturn, panicErr.Cause)
require.Contains(t, panicErr.Trace, "github.com/riverqueue/river/rivertest.TestWorker_Work")
require.Len(t, res.Job.Errors, 1)
require.Contains(t, res.Job.Errors[0].Error, "test panic error")
})

t.Run("ErrorsWithAlreadyClosedTransaction", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

// Immediately roll back the transaction to force an error:
require.NoError(t, bundle.tx.Rollback(ctx))

worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { return nil })
tw := NewWorker(t, bundle.driver, bundle.config, worker)

res, err := tw.Work(ctx, t, bundle.tx, testArgs{Value: "test"}, nil)
require.ErrorContains(t, err, "failed to insert job: tx is closed")
require.Nil(t, res)
})
}

func TestWorker_WorkJob(t *testing.T) {
Expand Down Expand Up @@ -284,7 +384,7 @@ func TestWorker_WorkJob(t *testing.T) {

res, err := testWorker.WorkJob(ctx, t, bundle.tx, insertRes.Job)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)
})

t.Run("JobCompleteTxWithInsertedJobRow", func(t *testing.T) {
Expand All @@ -309,10 +409,27 @@ func TestWorker_WorkJob(t *testing.T) {

res, err := testWorker.WorkJob(ctx, t, bundle.tx, insertRes.Job)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, res.Kind)
require.Equal(t, river.EventKindJobCompleted, res.EventKind)

updatedJob, err := bundle.driver.UnwrapExecutor(bundle.tx).JobGetByID(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCompleted, updatedJob.State)
})

t.Run("ErrorsWhenGivenAlreadyCompletedJob", func(t *testing.T) {
t.Parallel()

ctx := context.Background()
testWorker, bundle := setup(t)

job := testfactory.Job(ctx, t, bundle.driver.UnwrapExecutor(bundle.tx), &testfactory.JobOpts{
EncodedArgs: []byte(`{"value": "test"}`),
Kind: ptrutil.Ptr("rivertest_work_test"),
State: ptrutil.Ptr(rivertype.JobStateCompleted),
})

res, err := testWorker.WorkJob(ctx, t, bundle.tx, job)
require.ErrorContains(t, err, "failed to update job to running state")
require.Nil(t, res)
})
}