Skip to content

Commit

Permalink
roachtest: add rolling restart test
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt authored and wenyihu6 committed May 1, 2024
1 parent 308f118 commit d472092
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 0 deletions.
176 changes: 176 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ import (
"context"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
gosql "database/sql"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"math/big"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -905,6 +908,167 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
m.Wait()
}

func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
const rowCount, splitCount = 1000000, 500
startOpts := option.DefaultStartOpts()
ips, err := c.InternalIP(ctx, t.L(), c.Node(1))
sinkURL := fmt.Sprintf("https://%s:9707", ips[0])
sink := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
if err != nil {
t.Fatal(err)
}

// We configure "racks" localities to push replicas off n1 later.
racks := install.MakeClusterSettings(install.NumRacksOption(c.Spec().NodeCount))
racks.Env = append(racks.Env, `COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true`)
c.Start(ctx, t.L(), option.DefaultStartOpts(), racks)
m := c.NewMonitor(ctx, c.Range(1, 5))

restart := func(n int) error {
cmd := fmt.Sprintf("./cockroach node drain --certs-dir=%s --port={pgport:%d} --self", install.CockroachNodeCertsDir, n)
if err := c.RunE(ctx, option.WithNodes(c.Node(n)), cmd); err != nil {
return err
}
m.ExpectDeath()
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(n))
opts := startOpts
opts.RoachprodOpts.IsRestart = true
c.Start(ctx, t.L(), opts, racks, c.Node(n))
m.ResetDeaths()
return nil
}

// Restart n1 to shed any leases it still has.
if err := restart(1); err != nil {
t.Fatal(err)
}

db := c.Conn(ctx, t.L(), 1)

// Setup a 1M row table that is split into >= 500 scattered ranges.
// Keep ranges off n1 so that our plans use 2, 3, and 4.
t.L().Printf("setting up test data...")
for _, s := range []string{
`ALTER RANGE default CONFIGURE ZONE USING constraints = '[-rack=0]'`,
fmt.Sprintf(`CREATE TABLE t (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, rowCount),
`ALTER TABLE t SCATTER`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount/4),
`ALTER TABLE t SCATTER`,
// Finish splitting, so that drained ranges spread out evenly.
fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount),
`SET CLUSTER SETTING jobs.registry.retry.initial_delay = '.1s'`,
`SET CLUSTER SETTING jobs.registry.retry.max_delay = '.4s'`,
} {
t.L().Printf(s)
if _, err := db.Exec(s); err != nil {
t.Fatal(err)
}
}
t.L().Printf("test data is setup")

c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1))

// Run the sink server.
m.Go(func(ctx context.Context) error {
t.L().Printf("starting up sink server at %s...", sinkURL)
err := c.RunE(ctx, option.WithNodes(c.Node(1)), "./workload debug webhook-server")
if err != nil {
return err
}
t.L().Printf("sink server exited")
return nil
})

// Restart nodes 2, 3, and 4 in a loop.
stopRestarts := make(chan struct{})
m.Go(func(ctx context.Context) error {
defer func() {
t.L().Printf("done restarting nodes")
}()
t.L().Printf("starting rolling drain+restarts of 2, 3, 4...")
for {
for _, n := range []int{2, 3, 4, 5} {
select {
case <-stopRestarts:
return nil
case <-ctx.Done():
return nil
default:
if err := restart(n); err != nil {
return err
}
}
}
}
})

wait := make(chan struct{})

// Run the changefeed, then ask the sink how many rows it saw.
var unique int
func() {
defer close(wait)
defer close(stopRestarts)
defer func() {
_, err := sink.Get(sinkURL + "/exit")
t.L().Printf("exiting webhook sink status: %v", err)
}()

for i := 1; i < 5; i++ {
t.L().Printf("starting changefeed...")
var job int
if err := db.QueryRow(
fmt.Sprintf("CREATE CHANGEFEED FOR TABLE t INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL),
).Scan(&job); err != nil {
t.Fatal(err)
}

t.L().Printf("waiting for changefeed %d...", job)
if _, err := db.ExecContext(ctx, "SHOW JOB WHEN COMPLETE $1", job); err != nil {
t.Fatal(err)
}

t.L().Printf("changefeed complete, checking sink...")
get := func(p string) (int, error) {
b, err := sink.Get(sinkURL + p)
if err != nil {
return 0, err
}
body, err := io.ReadAll(b.Body)
if err != nil {
return 0, err
}
i, err := strconv.Atoi(string(body))
if err != nil {
return 0, err
}
return i, nil
}
unique, err = get("/unique")
if err != nil {
t.Fatal(err)
}
dupes, err := get("/dupes")
if err != nil {
t.Fatal(err)
}
t.L().Printf("sink got %d unique, %d dupes", unique, dupes)
if unique != rowCount {
t.Fatalf("expected %d, got %d", rowCount, unique)
}
_, err = sink.Get(sinkURL + "/reset")
t.L().Printf("resetting sink %v", err)
}
}()

<-wait
// TODO(#116314)
if runtime.GOOS != "darwin" {
m.Wait()
}
}

// This test verifies that the changefeed avro + confluent schema registry works
// end-to-end (including the schema registry default of requiring backward
// compatibility within a topic).
Expand Down Expand Up @@ -1121,6 +1285,18 @@ func registerCDC(r registry.Registry) {
exportStatsFile()
},
})
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-rolling-restart",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(5),
RequiresLicense: true,
CompatibleClouds: registry.OnlyLocal,
Suites: registry.Suites(registry.Nightly),
Timeout: time.Minute * 15,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBackfillRollingRestart(ctx, t, c)
},
})
r.Add(registry.TestSpec{
Name: "cdc/tpcc-1000/sink=kafka",
Owner: registry.OwnerCDC,
Expand Down
1 change: 1 addition & 0 deletions pkg/workload/debug/webhook_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func webhookServer(cmd *cobra.Command, args []string) error {
defer mu.Unlock()
before = len(seen)
after = before
// TODO(cdc): add check for ordering guarantees using resolved timestamps and event timestamps
for _, i := range req.Payload {
if _, ok := seen[i.After.ID]; !ok {
seen[i.After.ID] = struct{}{}
Expand Down

0 comments on commit d472092

Please sign in to comment.