Skip to content

Commit

Permalink
Fully functional database/sql driver
Browse files Browse the repository at this point in the history
Here, implement the rest of driver functionality on `riverdatabasesql`,
the existing driver for Go's built-in `database/sql` package. Previously
it only supported a minimal interface allowing it to run migrations, but
nothing more sophisticated like inserting jobs.

The benefit of a fully functional driver is that it will allow River to
be integrated with with other Go database packages that aren't built
around Pgx like Bun (requested in #302) and GORM (requested in #58).
I'll need to write up some documentation, but this change should make
both of those integrations possible immediately.

It also lays the groundwork for future non-Postgres drivers. It's going
to be a little more still, but I want to take a stab at SQLite, and this
change will get us a lot of the way there.

There's no way with `database/sql` to support listen/notify, so here we
introduce the idea of a poll only driver. River's client checks whether
a driver can support listen/notify on initialization, and if not, it
enters poll only mode the same way as if configured with `PollOnly`.

An intuitive idiosyncrasy of this set up is that even when using the
`database/sql` driver bundled here, regardless of whether they're
working with Bun, GORM, or whatever,  users will generally still be
using Pgx under the hood since it's the only maintained and fully
functional Postgres driver in the Go ecosystem. With that said, the
driver still has to bundle in `lib/pq` for various constructs like
`pq.Array` because we're using sqlc, and sqlc's `database/sql` driver
always uses `lib/pq`. I tried to find a way around this, but came out
fairly convinced that there is none. To rid ourselves of `lib/pq`
completely we'd need sqlc to ship an alternative Pgx driver that used
Pgx internally, but exposed a `database/sql` interface using `*sql.Tx`
instead of `pgx.Tx`.
  • Loading branch information
brandur committed May 13, 2024
1 parent 793f370 commit a8db540
Show file tree
Hide file tree
Showing 24 changed files with 1,578 additions and 524 deletions.
32 changes: 24 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)

// In poll only mode, we don't try to initialize a notifier that uses
// listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
if driver.SupportsListener() {
// In poll only mode, we don't try to initialize a notifier that
// uses listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
}
} else {
logger.Info("Driver does not support listener; entering poll only mode")
}

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
Expand Down Expand Up @@ -1264,6 +1268,18 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
}
if tags == nil {
tags = []string{}
} else {
for _, tag := range tags {
if len(tag) > 255 {
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
}
// Restricted commas because we need those for batch inserts with
// the riverdatabasesql driver. We may want to restrict other
// special characters as well.
if strings.Contains(tag, ",") {
return nil, nil, errors.New("tags should not contain commas")
}
}
}

if priority > 4 {
Expand All @@ -1284,10 +1300,10 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
}

insertParams := &riverdriver.JobInsertFastParams{
EncodedArgs: encodedArgs,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
MaxAttempts: maxAttempts,
Metadata: metadata,
Metadata: json.RawMessage(metadata),
Priority: priority,
Queue: queue,
State: rivertype.JobStateAvailable,
Expand Down
87 changes: 70 additions & 17 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/require"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -158,7 +160,7 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p
return client
}

func startClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
func startClient[TTx any](ctx context.Context, t *testing.T, client *Client[TTx]) {
t.Helper()

if err := client.Start(ctx); err != nil {
Expand All @@ -181,6 +183,21 @@ func runNewTestClient(ctx context.Context, t *testing.T, config *Config) *Client
return client
}

func subscribe[TTx any](t *testing.T, client *Client[TTx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
}

func Test_Client(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -211,21 +228,6 @@ func Test_Client(t *testing.T) {
return newTestClient(t, bundle.dbPool, config), bundle
}

subscribe := func(t *testing.T, client *Client[pgx.Tx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
}

t.Run("StartInsertAndWork", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -604,7 +606,40 @@ func Test_Client(t *testing.T) {
}
})

t.Run("PollOnly", func(t *testing.T) {
t.Run("PollOnlyDriver", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
bundle.config.PollOnly = true

stdPool := stdlib.OpenDBFromPool(bundle.dbPool)
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })

client, err := NewClient(riverdatabasesql.New(stdPool), config)
require.NoError(t, err)

client.testSignals.Init()

// Notifier should not have been initialized at all.
require.Nil(t, client.notifier)

insertRes, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

// Despite no notifier, the client should still be able to elect itself
// leader.
client.testSignals.electedLeader.WaitOrTimeout()

event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)
require.Equal(t, rivertype.JobStateCompleted, event.Job.State)
})

t.Run("PollOnlyOption", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
Expand Down Expand Up @@ -4185,6 +4220,24 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.Equal(t, []string{"tag1", "tag2"}, insertParams.Tags)
})

t.Run("TagFormatValidated", func(t *testing.T) {
t.Parallel()

{
_, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{strings.Repeat("h", 256)},
})
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
}

{
_, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{"tag,with,comma"},
})
require.EqualError(t, err, "tags should not contain commas")
}
})

t.Run("UniqueOpts", func(t *testing.T) {
t.Parallel()

Expand Down
53 changes: 26 additions & 27 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/riverqueue/river/rivertype"
)

func TestDriverDatabaseSQL_Executor(t *testing.T) {
func TestDriverDatabaseSQL(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand All @@ -29,42 +29,41 @@ func TestDriverDatabaseSQL_Executor(t *testing.T) {
stdPool := stdlib.OpenDBFromPool(dbPool)
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })

driver := riverdatabasesql.New(nil)
riverdrivertest.ExerciseExecutorMigrationOnly(ctx, t, driver, func(ctx context.Context, t *testing.T) *sql.Tx {
t.Helper()
riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T) riverdriver.Driver[*sql.Tx] {
t.Helper()

tx, err := stdPool.BeginTx(ctx, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback() })
return riverdatabasesql.New(stdPool)
},
func(ctx context.Context, t *testing.T) riverdriver.Executor {
t.Helper()

return tx
})
}

func TestDriverRiverPgxV5_Executor(t *testing.T) {
t.Parallel()

ctx := context.Background()
tx, err := stdPool.BeginTx(ctx, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback() })

driver := riverpgxv5.New(nil)
riverdrivertest.ExerciseExecutorFull(ctx, t, driver, func(ctx context.Context, t *testing.T) pgx.Tx {
t.Helper()

return riverinternaltest.TestTx(ctx, t)
})
return riverdatabasesql.New(nil).UnwrapExecutor(tx)
})
}

func TestDriverRiverPgxV5_Listener(t *testing.T) {
func TestDriverRiverPgxV5(t *testing.T) {
t.Parallel()

ctx := context.Background()

riverdrivertest.ExerciseListener(ctx, t, func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
t.Helper()
riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)
return riverpgxv5.New(dbPool)
})
dbPool := riverinternaltest.TestDB(ctx, t)
return riverpgxv5.New(dbPool)
},
func(ctx context.Context, t *testing.T) riverdriver.Executor {
t.Helper()

tx := riverinternaltest.TestTx(ctx, t)
return riverpgxv5.New(nil).UnwrapExecutor(tx)
})
}

func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
Expand Down
11 changes: 11 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@ package river
import (
"errors"
"fmt"
"regexp"
"slices"
"time"

"github.com/riverqueue/river/rivertype"
)

// Regular expression to which the format of tags must comply. Mainly, no
// special characters, and with hyphens in the middle.
//
// A key property here (in case this is relaxed in the future) is that commas
// must never be allowed because they're used as a delimiter during batch job
// insertion for the `riverdatabasesql` driver.
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)

// InsertOpts are optional settings for a new job which can be provided at job
// insertion time. These will override any default InsertOpts settings provided
// by JobArgsWithInsertOpts, as well as any global defaults.
Expand Down Expand Up @@ -58,6 +67,8 @@ type InsertOpts struct {
// functional behavior and are meant entirely as a user-specified construct
// to help group and categorize jobs.
//
// Tags should
//
// If tags are specified from both a job args override and from options on
// Insert, the latter takes precedence. Tags are not merged.
Tags []string
Expand Down
20 changes: 20 additions & 0 deletions insert_opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ import (
"github.com/riverqueue/river/rivertype"
)

func TestTagRE(t *testing.T) {

Check failure on line 12 in insert_opts_test.go

View workflow job for this annotation

GitHub Actions / lint

Function TestTagRE missing the call to method parallel (paralleltest)
require.Regexp(t, tagRE, "aaa")
require.Regexp(t, tagRE, "_aaa")
require.Regexp(t, tagRE, "aaa_")
require.Regexp(t, tagRE, "777")
require.Regexp(t, tagRE, "my-tag")
require.Regexp(t, tagRE, "my_tag")
require.Regexp(t, tagRE, "my_longer_tag")
require.Regexp(t, tagRE, "My_Capitalized_Tag")
require.Regexp(t, tagRE, "ALL_CAPS")
require.Regexp(t, tagRE, "1_2_3")

require.NotRegexp(t, tagRE, "a")
require.NotRegexp(t, tagRE, "aa")
require.NotRegexp(t, tagRE, "-aaa")
require.NotRegexp(t, tagRE, "aaa-")
require.NotRegexp(t, tagRE, "special@characters$banned")
require.NotRegexp(t, tagRE, "commas,never,allowed")
}

func TestJobUniqueOpts_validate(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit a8db540

Please sign in to comment.