From b6ffe02c6ab7483e34a65984359263013e525ab3 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 26 Apr 2025 11:04:28 -0700 Subject: [PATCH] Fix intermittently failing leader-related tests There's a bunch of clock-based testing happening in the leader tests for drivers, resulting in intermittent failures like this one [1]: --- FAIL: TestDriverRiverPgxV5 (23.18s) driver_test.go:73: Reusing idle schema "river_2025_04_26t17_52_50_schema_07" after cleaning in 15.347655ms [7 generated] [6 reused] --- FAIL: TestDriverRiverPgxV5/LeaderAttemptElect (0.00s) --- FAIL: TestDriverRiverPgxV5/LeaderAttemptElect/ElectsLeader (0.22s) riverdrivertest.go:2244: TestTx using schema: river_2025_04_26t17_52_50_schema_06 riverdrivertest.go:2258: Error Trace: /home/runner/work/river/river/internal/riverinternaltest/riverdrivertest/riverdrivertest.go:2258 Error: Max difference between 2025-04-26 17:53:14.813635844 +0000 UTC m=+24.487676847 and 2025-04-26 17:53:14.59636 +0000 UTC allowed is 100ms, but difference was 217.275844ms Test: TestDriverRiverPgxV5/LeaderAttemptElect/ElectsLeader FAIL FAIL github.com/riverqueue/river 31.602s As with all these other clock-based failures, the problem is that in slower environments like GitHub Actions, goroutines may be paused for extended periods, so by the time they wake up `time.Now()` might be significantly different, and easily off by a few hundred milliseconds. Here, make the clock for these functions injectable so we can inject our own time and get to better test reliability. I'm not necessarily against other approaches either, but I'm still getting reasonable frequent failures on these, so I'm definitely in favor of getting some sort of fix into place. A side benefit is that it makes clock injection from the client-level a little more thorough as well. [1] https://github.com/riverqueue/river/actions/runs/14683687209/job/41209541920 --- internal/leadership/elector.go | 3 + .../riverdrivertest/riverdrivertest.go | 117 ++++++++++++------ riverdriver/river_driver_interface.go | 3 + .../internal/dbsqlc/river_leader.sql.go | 26 ++-- .../river_database_sql_driver.go | 5 +- .../internal/dbsqlc/river_leader.sql | 12 +- .../internal/dbsqlc/river_leader.sql.go | 26 ++-- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 5 +- rivershared/testfactory/test_factory.go | 2 + 9 files changed, 130 insertions(+), 69 deletions(-) diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 8be55946..de1b9284 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -197,6 +197,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Now: e.Time.NowUTCOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), }) @@ -333,6 +334,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Now: e.Time.NowUTCOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), }) @@ -515,6 +517,7 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea return dbutil.WithTxV(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) (bool, error) { if _, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{ + Now: params.Now, Schema: params.Schema, }); err != nil { return false, err diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 057b6f2f..0f4b0fd2 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -2204,37 +2204,6 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, const leaderTTL = 10 * time.Second - t.Run("LeaderDeleteExpired", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) - - now := time.Now().UTC() - - { - numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{ - Schema: "", - }) - require.NoError(t, err) - require.Zero(t, numDeleted) - } - - _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ - ElectedAt: ptrutil.Ptr(now.Add(-2 * time.Hour)), - ExpiresAt: ptrutil.Ptr(now.Add(-1 * time.Hour)), - LeaderID: ptrutil.Ptr(clientID), - Schema: "", - }) - - { - numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{ - Schema: "", - }) - require.NoError(t, err) - require.Equal(t, 1, numDeleted) - } - }) - t.Run("LeaderAttemptElect", func(t *testing.T) { t.Parallel() @@ -2243,8 +2212,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) + now := time.Now() + elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Now: &now, Schema: "", TTL: leaderTTL, }) @@ -2255,8 +2227,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, Schema: "", }) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, 100*time.Millisecond) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 100*time.Millisecond) + require.WithinDuration(t, now, leader.ElectedAt, time.Microsecond) + require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, time.Microsecond) }) t.Run("CannotElectTwiceInARow", func(t *testing.T) { @@ -2296,8 +2268,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) + now := time.Now() + elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Now: &now, Schema: "", TTL: leaderTTL, }) @@ -2308,8 +2283,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, Schema: "", }) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, 100*time.Millisecond) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 100*time.Millisecond) + require.WithinDuration(t, now, leader.ElectedAt, time.Microsecond) + require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, time.Microsecond) }) t.Run("ReelectsSameLeader", func(t *testing.T) { @@ -2343,19 +2318,80 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("LeaderDeleteExpired", func(t *testing.T) { + t.Parallel() + + t.Run("DeletesExpired", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + { + numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{ + Schema: "", + }) + require.NoError(t, err) + require.Zero(t, numDeleted) + } + + _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(now.Add(-2 * time.Hour)), + ExpiresAt: ptrutil.Ptr(now.Add(-1 * time.Hour)), + LeaderID: ptrutil.Ptr(clientID), + Schema: "", + }) + + { + numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{ + Schema: "", + }) + require.NoError(t, err) + require.Equal(t, 1, numDeleted) + } + }) + + t.Run("WithInjectedNow", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + // Elected in the future. + _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(now.Add(1 * time.Hour)), + ExpiresAt: ptrutil.Ptr(now.Add(2 * time.Hour)), + LeaderID: ptrutil.Ptr(clientID), + Schema: "", + }) + + numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{ + Now: ptrutil.Ptr(now.Add(2*time.Hour + 1*time.Second)), + Schema: "", + }) + require.NoError(t, err) + require.Equal(t, 1, numDeleted) + }) + }) + t.Run("LeaderInsert", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) + now := time.Now() + leader, err := exec.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ LeaderID: clientID, + Now: &now, Schema: "", TTL: leaderTTL, }) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, 500*time.Millisecond) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 500*time.Millisecond) + require.WithinDuration(t, now, leader.ElectedAt, time.Microsecond) + require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, time.Microsecond) require.Equal(t, clientID, leader.LeaderID) }) @@ -2364,8 +2400,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) + now := time.Now() + _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr(clientID), + Now: &now, Schema: "", }) @@ -2373,8 +2412,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, Schema: "", }) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, 500*time.Millisecond) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 500*time.Millisecond) + require.WithinDuration(t, now, leader.ElectedAt, time.Microsecond) + require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, time.Microsecond) require.Equal(t, clientID, leader.LeaderID) }) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 5315b911..03a25ee0 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -509,6 +509,7 @@ type Leader struct { } type LeaderDeleteExpiredParams struct { + Now *time.Time Schema string } @@ -520,12 +521,14 @@ type LeaderInsertParams struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID string + Now *time.Time Schema string TTL time.Duration } type LeaderElectParams struct { LeaderID string + Now *time.Time Schema string TTL time.Duration } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go index f7fbc913..c47aa007 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go @@ -13,18 +13,19 @@ import ( const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader (leader_id, elected_at, expires_at) - VALUES ($1, now(), now() + $2::interval) + VALUES ($1, coalesce($2::timestamptz, now()), coalesce($2::timestamptz, now()) + $3::interval) ON CONFLICT (name) DO NOTHING ` type LeaderAttemptElectParams struct { LeaderID string + Now *time.Time TTL time.Duration } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) if err != nil { return 0, err } @@ -33,21 +34,22 @@ func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAt const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader (leader_id, elected_at, expires_at) - VALUES ($1, now(), now() + $2::interval) + VALUES ($1, coalesce($2::timestamptz, now()), coalesce($2::timestamptz, now()) + $3::interval) ON CONFLICT (name) DO UPDATE SET - expires_at = now() + $2 + expires_at = coalesce($2::timestamptz, now()) + $3 WHERE river_leader.leader_id = $1 ` type LeaderAttemptReelectParams struct { LeaderID string + Now *time.Time TTL time.Duration } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) if err != nil { return 0, err } @@ -56,11 +58,11 @@ func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *Leader const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE expires_at < now() +WHERE expires_at < coalesce($1::timestamptz, now()) ` -func (q *Queries) LeaderDeleteExpired(ctx context.Context, db DBTX) (int64, error) { - result, err := db.ExecContext(ctx, leaderDeleteExpired) +func (q *Queries) LeaderDeleteExpired(ctx context.Context, db DBTX, now *time.Time) (int64, error) { + result, err := db.ExecContext(ctx, leaderDeleteExpired, now) if err != nil { return 0, err } @@ -90,14 +92,15 @@ INSERT INTO /* TEMPLATE: schema */river_leader( expires_at, leader_id ) VALUES ( - coalesce($1::timestamptz, now()), - coalesce($2::timestamptz, now() + $3::interval), - $4 + coalesce($1::timestamptz, coalesce($2::timestamptz, now())), + coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + $4::interval), + $5 ) RETURNING elected_at, expires_at, leader_id, name ` type LeaderInsertParams struct { ElectedAt *time.Time + Now *time.Time ExpiresAt *time.Time TTL time.Duration LeaderID string @@ -106,6 +109,7 @@ type LeaderInsertParams struct { func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { row := db.QueryRowContext(ctx, leaderInsert, arg.ElectedAt, + arg.Now, arg.ExpiresAt, arg.TTL, arg.LeaderID, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index a0cbec11..1609d3b5 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -601,6 +601,7 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Now: params.Now, TTL: params.TTL, }) if err != nil { @@ -612,6 +613,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Now: params.Now, TTL: params.TTL, }) if err != nil { @@ -621,7 +623,7 @@ func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { - numDeleted, err := dbsqlc.New().LeaderDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx) + numDeleted, err := dbsqlc.New().LeaderDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.Now) if err != nil { return 0, interpretError(err) } @@ -641,6 +643,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Now: params.Now, TTL: params.TTL, }) if err != nil { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql index a2ae64e1..30197a47 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql @@ -9,22 +9,22 @@ CREATE UNLOGGED TABLE river_leader( -- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader (leader_id, elected_at, expires_at) - VALUES (@leader_id, now(), now() + @ttl::interval) + VALUES (@leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), coalesce(sqlc.narg('now')::timestamptz, now()) + @ttl::interval) ON CONFLICT (name) DO NOTHING; -- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader (leader_id, elected_at, expires_at) - VALUES (@leader_id, now(), now() + @ttl::interval) + VALUES (@leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), coalesce(sqlc.narg('now')::timestamptz, now()) + @ttl::interval) ON CONFLICT (name) DO UPDATE SET - expires_at = now() + @ttl + expires_at = coalesce(sqlc.narg('now')::timestamptz, now()) + @ttl WHERE river_leader.leader_id = @leader_id; -- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE expires_at < now(); +WHERE expires_at < coalesce(sqlc.narg('now')::timestamptz, now()); -- name: LeaderGetElectedLeader :one SELECT * @@ -36,8 +36,8 @@ INSERT INTO /* TEMPLATE: schema */river_leader( expires_at, leader_id ) VALUES ( - coalesce(sqlc.narg('elected_at')::timestamptz, now()), - coalesce(sqlc.narg('expires_at')::timestamptz, now() + @ttl::interval), + coalesce(sqlc.narg('elected_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now())), + coalesce(sqlc.narg('expires_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now()) + @ttl::interval), @leader_id ) RETURNING *; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go index b4b5b251..e0fc6f1d 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go @@ -14,18 +14,19 @@ import ( const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader (leader_id, elected_at, expires_at) - VALUES ($1, now(), now() + $2::interval) + VALUES ($1, coalesce($2::timestamptz, now()), coalesce($2::timestamptz, now()) + $3::interval) ON CONFLICT (name) DO NOTHING ` type LeaderAttemptElectParams struct { LeaderID string + Now *time.Time TTL time.Duration } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptElect, arg.LeaderID, arg.TTL) + result, err := db.Exec(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) if err != nil { return 0, err } @@ -34,21 +35,22 @@ func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAt const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader (leader_id, elected_at, expires_at) - VALUES ($1, now(), now() + $2::interval) + VALUES ($1, coalesce($2::timestamptz, now()), coalesce($2::timestamptz, now()) + $3::interval) ON CONFLICT (name) DO UPDATE SET - expires_at = now() + $2 + expires_at = coalesce($2::timestamptz, now()) + $3 WHERE river_leader.leader_id = $1 ` type LeaderAttemptReelectParams struct { LeaderID string + Now *time.Time TTL time.Duration } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptReelect, arg.LeaderID, arg.TTL) + result, err := db.Exec(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) if err != nil { return 0, err } @@ -57,11 +59,11 @@ func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *Leader const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE expires_at < now() +WHERE expires_at < coalesce($1::timestamptz, now()) ` -func (q *Queries) LeaderDeleteExpired(ctx context.Context, db DBTX) (int64, error) { - result, err := db.Exec(ctx, leaderDeleteExpired) +func (q *Queries) LeaderDeleteExpired(ctx context.Context, db DBTX, now *time.Time) (int64, error) { + result, err := db.Exec(ctx, leaderDeleteExpired, now) if err != nil { return 0, err } @@ -91,14 +93,15 @@ INSERT INTO /* TEMPLATE: schema */river_leader( expires_at, leader_id ) VALUES ( - coalesce($1::timestamptz, now()), - coalesce($2::timestamptz, now() + $3::interval), - $4 + coalesce($1::timestamptz, coalesce($2::timestamptz, now())), + coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + $4::interval), + $5 ) RETURNING elected_at, expires_at, leader_id, name ` type LeaderInsertParams struct { ElectedAt *time.Time + Now *time.Time ExpiresAt *time.Time TTL time.Duration LeaderID string @@ -107,6 +110,7 @@ type LeaderInsertParams struct { func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { row := db.QueryRow(ctx, leaderInsert, arg.ElectedAt, + arg.Now, arg.ExpiresAt, arg.TTL, arg.LeaderID, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 2808f54a..10796091 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -494,6 +494,7 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Now: params.Now, TTL: params.TTL, }) if err != nil { @@ -505,6 +506,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Now: params.Now, TTL: params.TTL, }) if err != nil { @@ -514,7 +516,7 @@ func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { - numDeleted, err := dbsqlc.New().LeaderDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx) + numDeleted, err := dbsqlc.New().LeaderDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.Now) if err != nil { return 0, interpretError(err) } @@ -534,6 +536,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Now: params.Now, TTL: params.TTL, }) if err != nil { diff --git a/rivershared/testfactory/test_factory.go b/rivershared/testfactory/test_factory.go index 7dba1005..40abd2d3 100644 --- a/rivershared/testfactory/test_factory.go +++ b/rivershared/testfactory/test_factory.go @@ -98,6 +98,7 @@ type LeaderOpts struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID *string + Now *time.Time Schema string } @@ -108,6 +109,7 @@ func Leader(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts ElectedAt: opts.ElectedAt, ExpiresAt: opts.ExpiresAt, LeaderID: ptrutil.ValOrDefault(opts.LeaderID, "test-client-id"), + Now: opts.Now, Schema: opts.Schema, TTL: 10 * time.Second, })