Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using database/sql driver with functioning pgx listener #352

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)

// In poll only mode, we don't try to initialize a notifier that uses
// listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
if driver.SupportsListener() {
// In poll only mode, we don't try to initialize a notifier that
// uses listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
}
} else {
logger.Info("Driver does not support listener; entering poll only mode")
}

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
Expand Down Expand Up @@ -1264,6 +1268,15 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
}
if tags == nil {
tags = []string{}
} else {
for _, tag := range tags {
if len(tag) > 255 {
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
}
if !tagRE.MatchString(tag) {
return nil, nil, errors.New("tags should match regex " + tagRE.String())
}
}
}

if priority > 4 {
Expand All @@ -1284,10 +1297,10 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
}

insertParams := &riverdriver.JobInsertFastParams{
EncodedArgs: encodedArgs,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
MaxAttempts: maxAttempts,
Metadata: metadata,
Metadata: json.RawMessage(metadata),
Priority: priority,
Queue: queue,
State: rivertype.JobStateAvailable,
Expand Down
87 changes: 70 additions & 17 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/require"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -158,7 +160,7 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p
return client
}

func startClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
func startClient[TTx any](ctx context.Context, t *testing.T, client *Client[TTx]) {
t.Helper()

if err := client.Start(ctx); err != nil {
Expand All @@ -181,6 +183,21 @@ func runNewTestClient(ctx context.Context, t *testing.T, config *Config) *Client
return client
}

func subscribe[TTx any](t *testing.T, client *Client[TTx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
}

func Test_Client(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -211,21 +228,6 @@ func Test_Client(t *testing.T) {
return newTestClient(t, bundle.dbPool, config), bundle
}

subscribe := func(t *testing.T, client *Client[pgx.Tx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
}

t.Run("StartInsertAndWork", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -604,7 +606,40 @@ func Test_Client(t *testing.T) {
}
})

t.Run("PollOnly", func(t *testing.T) {
t.Run("PollOnlyDriver", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
bundle.config.PollOnly = true

stdPool := stdlib.OpenDBFromPool(bundle.dbPool)
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })

client, err := NewClient(riverdatabasesql.New(stdPool), config)
require.NoError(t, err)

client.testSignals.Init()

// Notifier should not have been initialized at all.
require.Nil(t, client.notifier)

insertRes, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

// Despite no notifier, the client should still be able to elect itself
// leader.
client.testSignals.electedLeader.WaitOrTimeout()

event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)
require.Equal(t, rivertype.JobStateCompleted, event.Job.State)
})

t.Run("PollOnlyOption", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
Expand Down Expand Up @@ -4185,6 +4220,24 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.Equal(t, []string{"tag1", "tag2"}, insertParams.Tags)
})

t.Run("TagFormatValidated", func(t *testing.T) {
t.Parallel()

{
_, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{strings.Repeat("h", 256)},
})
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
}

{
_, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{"tag,with,comma"},
})
require.EqualError(t, err, "tags should match regex "+tagRE.String())
}
})

t.Run("UniqueOpts", func(t *testing.T) {
t.Parallel()

Expand Down
53 changes: 26 additions & 27 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/riverqueue/river/rivertype"
)

func TestDriverDatabaseSQL_Executor(t *testing.T) {
func TestDriverDatabaseSQL(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand All @@ -29,42 +29,41 @@ func TestDriverDatabaseSQL_Executor(t *testing.T) {
stdPool := stdlib.OpenDBFromPool(dbPool)
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })

driver := riverdatabasesql.New(nil)
riverdrivertest.ExerciseExecutorMigrationOnly(ctx, t, driver, func(ctx context.Context, t *testing.T) *sql.Tx {
t.Helper()
riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T) riverdriver.Driver[*sql.Tx] {
t.Helper()

tx, err := stdPool.BeginTx(ctx, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback() })
return riverdatabasesql.New(stdPool)
},
func(ctx context.Context, t *testing.T) riverdriver.Executor {
t.Helper()

return tx
})
}

func TestDriverRiverPgxV5_Executor(t *testing.T) {
t.Parallel()

ctx := context.Background()
tx, err := stdPool.BeginTx(ctx, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback() })

driver := riverpgxv5.New(nil)
riverdrivertest.ExerciseExecutorFull(ctx, t, driver, func(ctx context.Context, t *testing.T) pgx.Tx {
t.Helper()

return riverinternaltest.TestTx(ctx, t)
})
return riverdatabasesql.New(nil).UnwrapExecutor(tx)
})
}

func TestDriverRiverPgxV5_Listener(t *testing.T) {
func TestDriverRiverPgxV5(t *testing.T) {
t.Parallel()

ctx := context.Background()

riverdrivertest.ExerciseListener(ctx, t, func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
t.Helper()
riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)
return riverpgxv5.New(dbPool)
})
dbPool := riverinternaltest.TestDB(ctx, t)
return riverpgxv5.New(dbPool)
},
func(ctx context.Context, t *testing.T) riverdriver.Executor {
t.Helper()

tx := riverinternaltest.TestTx(ctx, t)
return riverpgxv5.New(nil).UnwrapExecutor(tx)
})
}

func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
Expand Down
12 changes: 12 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@ package river
import (
"errors"
"fmt"
"regexp"
"slices"
"time"

"github.com/riverqueue/river/rivertype"
)

// Regular expression to which the format of tags must comply. Mainly, no
// special characters, and with hyphens in the middle.
//
// A key property here (in case this is relaxed in the future) is that commas
// must never be allowed because they're used as a delimiter during batch job
// insertion for the `riverdatabasesql` driver.
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)

// InsertOpts are optional settings for a new job which can be provided at job
// insertion time. These will override any default InsertOpts settings provided
// by JobArgsWithInsertOpts, as well as any global defaults.
Expand Down Expand Up @@ -58,6 +67,9 @@ type InsertOpts struct {
// functional behavior and are meant entirely as a user-specified construct
// to help group and categorize jobs.
//
// Tags should conform to the regex `\A[\w][\w\-]+[\w]\z` and be a maximum
// of 255 characters long. No special characters are allowed.
//
// If tags are specified from both a job args override and from options on
// Insert, the latter takes precedence. Tags are not merged.
Tags []string
Expand Down
23 changes: 23 additions & 0 deletions insert_opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ import (
"github.com/riverqueue/river/rivertype"
)

func TestTagRE(t *testing.T) {
t.Parallel()

require.Regexp(t, tagRE, "aaa")
require.Regexp(t, tagRE, "_aaa")
require.Regexp(t, tagRE, "aaa_")
require.Regexp(t, tagRE, "777")
require.Regexp(t, tagRE, "my-tag")
require.Regexp(t, tagRE, "my_tag")
require.Regexp(t, tagRE, "my-longer-tag")
require.Regexp(t, tagRE, "my_longer_tag")
require.Regexp(t, tagRE, "My_Capitalized_Tag")
require.Regexp(t, tagRE, "ALL_CAPS")
require.Regexp(t, tagRE, "1_2_3")

require.NotRegexp(t, tagRE, "a")
require.NotRegexp(t, tagRE, "aa")
require.NotRegexp(t, tagRE, "-aaa")
require.NotRegexp(t, tagRE, "aaa-")
require.NotRegexp(t, tagRE, "special@characters$banned")
require.NotRegexp(t, tagRE, "commas,never,allowed")
}

func TestJobUniqueOpts_validate(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading