Skip to content

Commit

Permalink
demonstrate alternative constructor w/ listener support
Browse files Browse the repository at this point in the history
Add a `NewWithListener` constructor to `riverdatabasesql` that allows
the `database/sql` driver to be used with a functioning listener
implementation. Also add a `NewListener` constructor to the `riverpgxv5`
driver to allow creating a listener with a raw pgx pool.

These can be combined to allow full listener support as long as the
underlying database driver supports it, even when it's used within an
abstraction like `database/sql` or Bun.
  • Loading branch information
bgentry committed May 23, 2024
1 parent 755295f commit 4518593
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
26 changes: 21 additions & 5 deletions riverdriver/riverdatabasesql/river_database_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (

// Driver is an implementation of riverdriver.Driver for database/sql.
type Driver struct {
dbPool *sql.DB
queries *dbsqlc.Queries
dbPool *sql.DB
listener riverdriver.Listener
queries *dbsqlc.Queries
}

// New returns a new database/sql River driver for use with River.
Expand All @@ -41,13 +42,28 @@ func New(dbPool *sql.DB) *Driver {
return &Driver{dbPool: dbPool, queries: dbsqlc.New()}
}

// NewWithListener returns a new database/sql River driver for use with River
// just like New, except it also takes a riverdriver.Listener to use for
// listening to notifications.
func NewWithListener(dbPool *sql.DB, listener riverdriver.Listener) *Driver {
driver := New(dbPool)
driver.listener = listener
return driver
}

func (d *Driver) GetExecutor() riverdriver.Executor {
return &Executor{d.dbPool, d.dbPool, dbsqlc.New()}
}

func (d *Driver) GetListener() riverdriver.Listener { panic(riverdriver.ErrNotImplemented) }
func (d *Driver) HasPool() bool { return d.dbPool != nil }
func (d *Driver) SupportsListener() bool { return false }
func (d *Driver) GetListener() riverdriver.Listener {
if d.listener == nil {
panic(riverdriver.ErrNotImplemented)
}
return d.listener
}

func (d *Driver) HasPool() bool { return d.dbPool != nil }
func (d *Driver) SupportsListener() bool { return d.listener != nil }

func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx {
return &ExecutorTx{Executor: Executor{nil, tx, dbsqlc.New()}, tx: tx}
Expand Down
21 changes: 21 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,27 @@ type Listener struct {
mu sync.Mutex
}

// NewListener returns a Listener that can be used to enable other drivers
// (notably `riverdatabasesql`) to listen for notifications when their
// abstraction doesn't allow direct access to pgx connections, even though they
// are using pgx under the hood. This constructor is not applicable to
// applications which only use pgx directly.
//
// Users of `database/sql` or Bun can use this with the
// `riverdatabasesql.NewWithListener` constructor to enable listener support in
// that driver.
//
// The dbPool will solely be used for acquiring new connections for `LISTEN`
// commands. As such, a pool must be provided that supports that command. Users
// of pgbouncer should ensure that this specific pool is configured with session
// pooling, even if their main application does not use session pooling.
//
// A single Client will never use more than one listener connection at a time,
// no matter how many topics are being listened to.
func NewListener(dbPool *pgxpool.Pool) riverdriver.Listener {
return &Listener{dbPool: dbPool}
}

func (l *Listener) Close(ctx context.Context) error {
l.mu.Lock()
defer l.mu.Unlock()
Expand Down

0 comments on commit 4518593

Please sign in to comment.