Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions internal/maintenance/reindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -24,13 +25,15 @@ type reindexerExecutorMock struct {
indexesExistCalls atomic.Int32
indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)
indexesExistSignal chan struct{}
indexReindexFunc func(ctx context.Context, params *riverdriver.IndexReindexParams) error
}

func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock {
return &reindexerExecutorMock{
Executor: exec,
indexesExistFunc: exec.IndexesExist,
indexesExistSignal: make(chan struct{}, 10),
indexReindexFunc: exec.IndexReindex,
}
}

Expand All @@ -45,6 +48,10 @@ func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverd
return m.indexesExistFunc(ctx, params)
}

func (m *reindexerExecutorMock) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error {
return m.indexReindexFunc(ctx, params)
}

func TestReindexer(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -182,6 +189,23 @@ func TestReindexer(t *testing.T) {

svc, bundle := setup(t)

var (
// Mock IndexReindex so the test doesn't depend on the speed of real
// REINDEX CONCURRENTLY operations on the shared CI database. Track
// which indexes got reindexed so we can verify the expected set.
mockExec = newReindexerExecutorMock(bundle.exec)

reindexedNames []string
reindexedMu sync.Mutex
)
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
reindexedMu.Lock()
defer reindexedMu.Unlock()
reindexedNames = append(reindexedNames, params.Index)
return nil
}
svc.exec = mockExec

svc.Config.IndexNames = []string{
"river_job_kind",
"river_job_prioritized_fetching_index",
Expand All @@ -198,25 +222,23 @@ func TestReindexer(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}

// Make sure that no `CONCURRENTLY` artifacts exist after reindexing is
// supposed to be done. Postgres creates a new index suffixed with
// `_ccnew` before swapping it in as the new index. The existing index
// is renamed `_ccold` before being dropped concurrently.
//
// https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY
for _, indexName := range svc.Config.IndexNames {
for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} {
indexExists, err := bundle.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: bundle.schema})
require.NoError(t, err)
require.False(t, indexExists)
}
}
reindexedMu.Lock()
require.ElementsMatch(t, svc.Config.IndexNames, reindexedNames)
reindexedMu.Unlock()
})

t.Run("ReindexesConfiguredIndexes", func(t *testing.T) {
t.Parallel()

svc, _ := setup(t)
svc, bundle := setup(t)

// Mock IndexReindex so the test doesn't depend on the speed of real
// REINDEX CONCURRENTLY operations on the shared CI database.
mockExec := newReindexerExecutorMock(bundle.exec)
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
return nil
}
svc.exec = mockExec

svc.Config.ScheduleFunc = runImmediatelyThenOnceAnHour()

Expand Down
Loading