Skip to content

Commit

Permalink
spanconfig/sqlwatcher: speed up tests
Browse files Browse the repository at this point in the history
Using a more aggressive closed timestamp target duration brings the
runtime for TestSQLWatcherReactsToUpdate down from 45s+ to single
digits. To speed it up, we also just re-use the same sqlwatcher instead
of creating a new one for every test case.

Other tests in the package benefit from a similar treatment. Using the
default target duration in fact masked a buggy test; in a future commit
we end up rewrite that test so it's skipped for now.

Release note: None
  • Loading branch information
irfansharif committed Nov 25, 2021
1 parent 2ac3cf4 commit ee68771
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 73 deletions.
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
"//pkg/sql/catalog/descpb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
Expand Down
160 changes: 87 additions & 73 deletions pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -40,7 +43,6 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
setup string
stmt string
expectedIDs descpb.IDs
}{
Expand All @@ -49,17 +51,22 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
expectedIDs: descpb.IDs{52},
},
{
setup: "CREATE TABLE t2()",
stmt: "ALTER TABLE t2 CONFIGURE ZONE USING num_replicas = 3",
stmt: "CREATE TABLE t2(); ALTER TABLE t2 CONFIGURE ZONE USING num_replicas = 3",
expectedIDs: descpb.IDs{53},
},
{
setup: "CREATE DATABASE d; CREATE TABLE d.t1(); CREATE TABLE d.t2()",
stmt: "CREATE DATABASE d; CREATE TABLE d.t1(); CREATE TABLE d.t2()",
expectedIDs: descpb.IDs{54, 55, 56},
},
{
stmt: "ALTER DATABASE d CONFIGURE ZONE USING num_replicas=5",
expectedIDs: descpb.IDs{54},
},
{
setup: "CREATE TABLE t3(); CREATE TABLE t4()",
stmt: "CREATE TABLE t3(); CREATE TABLE t4()",
expectedIDs: descpb.IDs{57, 58},
},
{
stmt: "ALTER TABLE t3 CONFIGURE ZONE USING num_replicas=5; CREATE TABLE t5(); DROP TABLE t4;",
expectedIDs: descpb.IDs{57, 58, 59},
},
Expand All @@ -86,8 +93,7 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
},
// Test that events on types/schemas are also captured.
{
setup: "CREATE DATABASE db",
stmt: "CREATE SCHEMA db.sc",
stmt: "CREATE DATABASE db; CREATE SCHEMA db.sc",
// One ID each for the parent database and the schema.
expectedIDs: descpb.IDs{60, 61},
},
Expand All @@ -112,82 +118,89 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {

ts := tc.Server(0 /* idx */)

sqlDB := tc.ServerConn(0 /* idx */)
for _, tc := range testCases {
sqlWatcher := spanconfigsqlwatcher.NewFactory(
keys.SystemSQLCodec,
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
ts.Stopper(),
nil, /* knobs */
).New()

_, err := sqlDB.Exec(tc.setup)
require.NoError(t, err)

// Save the startTS here before the test case is executed to ensure the
// watcher can pick up the change when we start it in a separate goroutine.
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

sqlWatcher := spanconfigsqlwatcher.NewFactory(
keys.SystemSQLCodec,
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
ts.Stopper(),
nil, /* knobs */
).New()

var mu struct {
syncutil.Mutex
receivedIDs map[descpb.ID]struct{}
lastCheckpoint hlc.Timestamp
}
mu.receivedIDs = make(map[descpb.ID]struct{})

var wg sync.WaitGroup
watcherCtx, watcherCancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()

startTS := ts.Clock().Now()
mu.Lock()
mu.lastCheckpoint = startTS
mu.Unlock()

var mu struct {
syncutil.Mutex
receivedIDs map[descpb.ID]struct{}
}
mu.receivedIDs = make(map[descpb.ID]struct{})

watcherCtx, watcherCancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
prevCheckpointTS := startTS
_ = sqlWatcher.WatchForSQLUpdates(watcherCtx,
startTS,
func(ctx context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error {
require.True(t, prevCheckpointTS.Less(checkpointTS))
mu.Lock()
defer mu.Unlock()
for _, update := range updates {
mu.receivedIDs[update.ID] = struct{}{}
}
prevCheckpointTS = checkpointTS
return nil
})
}()

_, err = sqlDB.Exec(tc.stmt)
require.NoError(t, err)
_ = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS,
func(ctx context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error {
mu.Lock()
defer mu.Unlock()

require.True(t, mu.lastCheckpoint.Less(checkpointTS))
mu.lastCheckpoint = checkpointTS

for _, update := range updates {
mu.receivedIDs[update.ID] = struct{}{}
}
return nil
})
}()

for _, tc := range testCases {
tdb.Exec(t, tc.stmt)
afterStmtTS := ts.Clock().Now()

testutils.SucceedsSoon(t, func() error {
mu.Lock()
defer mu.Unlock()
if len(mu.receivedIDs) == len(tc.expectedIDs) {
return nil

if mu.lastCheckpoint.Less(afterStmtTS) {
return errors.New("checkpoint precedes statement timestamp")
}
return errors.Newf("expected to receive %d IDs, but found %d", len(tc.expectedIDs), len(mu.receivedIDs))
return nil
})

// Rangefeed events aren't guaranteed to be in any particular order for
// different keys.
mu.Lock()
require.Equal(t, len(tc.expectedIDs), len(mu.receivedIDs))
for _, id := range tc.expectedIDs {
_, seen := mu.receivedIDs[id]
require.True(t, seen)
delete(mu.receivedIDs, id)
}
mu.Unlock()

// Stop the watcher and wait for the goroutine to complete.
watcherCancel()
wg.Wait()
}

// Stop the watcher and wait for the goroutine to complete.
watcherCancel()
wg.Wait()
}

// TestSQLWatcherFactory tests that the SQLWatcherFactory can create multiple
// SQLWatchers and that every SQLWatcher is only good for a single
// WatchForSQLUpdates.
func TestSQLWatcherFactory(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.IgnoreLint(t, "buggy test; handler's invoked but test doesn't expect it to be")

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Expand All @@ -202,7 +215,9 @@ func TestSQLWatcherFactory(t *testing.T) {
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
ts := tc.Server(0 /* idx */)
sqlDB := tc.ServerConn(0 /* idx */)
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

sqlWatcherFactory := spanconfigsqlwatcher.NewFactory(
keys.SystemSQLCodec,
Expand All @@ -214,8 +229,7 @@ func TestSQLWatcherFactory(t *testing.T) {
)

startTS := ts.Clock().Now()
_, err := sqlDB.Exec("CREATE TABLE t()")
require.NoError(t, err)
tdb.Exec(t, "CREATE TABLE t()")

sqlWatcher := sqlWatcherFactory.New()

Expand All @@ -234,7 +248,7 @@ func TestSQLWatcherFactory(t *testing.T) {
watcherCancel()
wg.Wait()

err = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error {
err := sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error {
t.Fatal("handler should never run")
return nil
})
Expand Down Expand Up @@ -277,7 +291,9 @@ func TestSQLWatcherOnEventError(t *testing.T) {
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
ts := tc.Server(0 /* idx */)
sqlDB := tc.ServerConn(0 /* idx */)
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

sqlWatcherFactory := spanconfigsqlwatcher.NewFactory(
keys.SystemSQLCodec,
Expand All @@ -293,11 +309,10 @@ func TestSQLWatcherOnEventError(t *testing.T) {
)

startTS := ts.Clock().Now()
_, err := sqlDB.Exec("CREATE TABLE t()")
require.NoError(t, err)
tdb.Exec(t, "CREATE TABLE t()")

sqlWatcher := sqlWatcherFactory.New()
err = sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error {
err := sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error {
t.Fatal("handler should never run")
return nil
})
Expand All @@ -323,7 +338,9 @@ func TestSQLWatcherHandlerError(t *testing.T) {
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
ts := tc.Server(0 /* idx */)
sqlDB := tc.ServerConn(0 /* idx */)
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

sqlWatcher := spanconfigsqlwatcher.NewFactory(
keys.SystemSQLCodec,
Expand All @@ -334,8 +351,7 @@ func TestSQLWatcherHandlerError(t *testing.T) {
nil, /* knobs */
).New()

_, err := sqlDB.Exec("CREATE TABLE t()")
require.NoError(t, err)
tdb.Exec(t, "CREATE TABLE t()")

stopTraffic := make(chan struct{})
startTS := ts.Clock().Now()
Expand All @@ -352,11 +368,9 @@ func TestSQLWatcherHandlerError(t *testing.T) {
case <-stopTraffic:
return
default:
time.Sleep(100 * time.Millisecond)
}
_, err = sqlDB.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=5")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
tdb.Exec(t, "ALTER TABLE t CONFIGURE ZONE USING num_replicas=5")
}
}()

Expand Down

0 comments on commit ee68771

Please sign in to comment.