From 3f3fe4aaeae8d89d8811d8735bb458bf4e6e8006 Mon Sep 17 00:00:00 2001 From: Brandur Date: Mon, 13 Apr 2026 00:16:17 -0500 Subject: [PATCH] Reindexer: Mock reindex function to reduce parallel contention This one's aimed at fixing an intermittently failing test case: https://github.com/riverqueue/river/actions/runs/24322049420/job/71009998068?pr=1208 --- FAIL: TestReindexer (0.00s) --- FAIL: TestReindexer/ReindexesConfiguredIndexes (10.07s) reindexer_test.go:219: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_04" [user facing: "maintenance_2026_04_13t01_54_14_schema_04"] after cleaning in 26.436456ms [4 generated] [7 reused] test_signal.go:95: timed out waiting on test signal after 10s logger.go:256: time=2026-04-13T01:54:27.581Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_04"; 1 idle schema(s) [4 generated] [10 reused] --- FAIL: TestReindexer/ReindexesMinimalSubsetofIndexes (10.14s) reindexer_test.go:183: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_01" [user facing: "maintenance_2026_04_13t01_54_14_schema_01"] after cleaning in 28.042877ms [4 generated] [10 reused] test_signal.go:95: timed out waiting on test signal after 10s reindexer_test.go:211: Error Trace: /home/runner/work/river/river/internal/maintenance/reindexer_test.go:211 Error: Should be false Test: TestReindexer/ReindexesMinimalSubsetofIndexes logger.go:256: time=2026-04-13T01:54:28.444Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_01"; 1 idle schema(s) [5 generated] [24 reused] FAIL FAIL github.com/riverqueue/river/internal/maintenance 18.764s I'm diagnosing with Claude's help here, but what appears to be happening is that although a reindex operation in Postgres is often fast, it is still a heavy operation, and can slow down even further when there's a lot of concurrent activity hammering a database. Many reindexer test cases run in parallel, and it appears that was happening here is that we got a reindex that exceeded our maximum timeout of 10x. We have some evidence this during the test run from the runtime of 10.07s and the line: Signaled to stop during index build; attempting to clean up concurrent artifacts Here, I'm putting forward a solution proposed by Claude, which is to mock out the reindex operation, especially where we have a number of reindexes going in parallel. The tests `ReindexOneSuccess` and `ReindexSkippedWithReindexArtifact` still put load on the real reindex operation, so we're not going full mock here, and should see our intermittency considerably reduced while still being confident that everything still works. --- internal/maintenance/reindexer_test.go | 50 ++++++++++++++++++-------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index a78dbd73..ca71d93c 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "sync/atomic" "testing" "time" @@ -24,6 +25,7 @@ type reindexerExecutorMock struct { indexesExistCalls atomic.Int32 indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) indexesExistSignal chan struct{} + indexReindexFunc func(ctx context.Context, params *riverdriver.IndexReindexParams) error } func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock { @@ -31,6 +33,7 @@ func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock Executor: exec, indexesExistFunc: exec.IndexesExist, indexesExistSignal: make(chan struct{}, 10), + indexReindexFunc: exec.IndexReindex, } } @@ -45,6 +48,10 @@ func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverd return m.indexesExistFunc(ctx, params) } +func (m *reindexerExecutorMock) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error { + return m.indexReindexFunc(ctx, params) +} + func TestReindexer(t *testing.T) { t.Parallel() @@ -182,6 +189,23 @@ func TestReindexer(t *testing.T) { svc, bundle := setup(t) + var ( + // Mock IndexReindex so the test doesn't depend on the speed of real + // REINDEX CONCURRENTLY operations on the shared CI database. Track + // which indexes got reindexed so we can verify the expected set. + mockExec = newReindexerExecutorMock(bundle.exec) + + reindexedNames []string + reindexedMu sync.Mutex + ) + mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error { + reindexedMu.Lock() + defer reindexedMu.Unlock() + reindexedNames = append(reindexedNames, params.Index) + return nil + } + svc.exec = mockExec + svc.Config.IndexNames = []string{ "river_job_kind", "river_job_prioritized_fetching_index", @@ -198,25 +222,23 @@ func TestReindexer(t *testing.T) { case <-time.After(100 * time.Millisecond): } - // Make sure that no `CONCURRENTLY` artifacts exist after reindexing is - // supposed to be done. Postgres creates a new index suffixed with - // `_ccnew` before swapping it in as the new index. The existing index - // is renamed `_ccold` before being dropped concurrently. - // - // https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY - for _, indexName := range svc.Config.IndexNames { - for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} { - indexExists, err := bundle.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: bundle.schema}) - require.NoError(t, err) - require.False(t, indexExists) - } - } + reindexedMu.Lock() + require.ElementsMatch(t, svc.Config.IndexNames, reindexedNames) + reindexedMu.Unlock() }) t.Run("ReindexesConfiguredIndexes", func(t *testing.T) { t.Parallel() - svc, _ := setup(t) + svc, bundle := setup(t) + + // Mock IndexReindex so the test doesn't depend on the speed of real + // REINDEX CONCURRENTLY operations on the shared CI database. + mockExec := newReindexerExecutorMock(bundle.exec) + mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error { + return nil + } + svc.exec = mockExec svc.Config.ScheduleFunc = runImmediatelyThenOnceAnHour()