Skip to content

Commit

Permalink
workload: add unique key counting webhook sink
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed May 1, 2024
1 parent e572349 commit 308f118
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ go_library(
"//pkg/workload/bank",
"//pkg/workload/bulkingest",
"//pkg/workload/cli",
"//pkg/workload/debug",
"//pkg/workload/examples",
"//pkg/workload/insights",
"//pkg/workload/kv",
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/workload/bank" // registers workloads
_ "github.com/cockroachdb/cockroach/pkg/workload/bulkingest" // registers workloads
workloadcli "github.com/cockroachdb/cockroach/pkg/workload/cli"
_ "github.com/cockroachdb/cockroach/pkg/workload/debug" // registers workloads
_ "github.com/cockroachdb/cockroach/pkg/workload/examples" // registers workloads
_ "github.com/cockroachdb/cockroach/pkg/workload/insights" // registers workloads
_ "github.com/cockroachdb/cockroach/pkg/workload/kv" // registers workloads
Expand Down
4 changes: 4 additions & 0 deletions pkg/workload/debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ go_library(
srcs = [
"debug.go",
"tpcc_results.go",
"webhook_server.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/workload/debug",
visibility = ["//visibility:public"],
deps = [
"//pkg/cli/exit",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/workload/cli",
"//pkg/workload/histogram",
"//pkg/workload/tpcc",
Expand Down
1 change: 1 addition & 0 deletions pkg/workload/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var debugCmd = &cobra.Command{

func init() {
debugCmd.AddCommand(tpccMergeResultsCmd)
debugCmd.AddCommand(webhookServerCmd)
cli.AddSubCmd(func(userFacing bool) *cobra.Command {
return debugCmd
})
Expand Down
157 changes: 157 additions & 0 deletions pkg/workload/debug/webhook_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package debug

import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"log"
"math/big"
"net/http"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/spf13/cobra"
)

var webhookServerCmd = &cobra.Command{
Use: "webhook-server",
Short: "webhook-server opens an http server on 3000 to which cdc's webhook can emit a table with a numeric unique 'id' column",
RunE: webhookServer,
Args: cobra.NoArgs,
}

const (
port = 9707
)

func webhookServer(cmd *cobra.Command, args []string) error {
var (
mu syncutil.Mutex
seen = map[int]struct{}{}
size int64
dupes int
)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var req struct {
Length int `json:"length"`
Payload []struct {
After struct {
ID int `json:"id"`
} `json:"after"`
} `json:"payload"`
}

err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
log.Printf("decoding body: %v", err)
return
}
var before, after, d int
func() {
mu.Lock()
defer mu.Unlock()
before = len(seen)
after = before
for _, i := range req.Payload {
if _, ok := seen[i.After.ID]; !ok {
seen[i.After.ID] = struct{}{}
after++
} else {
dupes++
}
}
if r.ContentLength > 0 {
size += r.ContentLength
}
d = dupes
}()
const printEvery = 10000
if before/printEvery != after/printEvery {
log.Printf("keys seen: %d (%d dupes); %.1f MB", after, d, float64(size)/float64(1<<20))
}
})
mux.HandleFunc("/reset", func(w http.ResponseWriter, r *http.Request) {
func() {
mu.Lock()
defer mu.Unlock()
seen = make(map[int]struct{}, len(seen))
dupes = 0
size = 0
}()
log.Printf("reset")
})
mux.HandleFunc("/unique", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
l := len(seen)
log.Printf("keys seen: %d", l)
fmt.Fprintf(w, "%d", l)
})
mux.HandleFunc("/dupes", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "%d", dupes)
})

mux.HandleFunc("/exit", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
go func() {
time.Sleep(time.Millisecond * 5)
exit.WithCode(exit.Success())
}()
})

cert, err := genKeyPair()
if err != nil {
return err
}
log.Printf("starting server on port %d", port)
return (&http.Server{
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
Handler: mux,
Addr: fmt.Sprintf(":%d", port),
}).ListenAndServeTLS("", "")
}

func genKeyPair() (tls.Certificate, error) {
now := timeutil.Now()
tpl := &x509.Certificate{
Subject: pkix.Name{CommonName: "localhost"},
SerialNumber: big.NewInt(now.Unix()),
NotBefore: now,
NotAfter: now.AddDate(0, 0, 30), // Valid for one day
BasicConstraintsValid: true,
IsCA: true,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
}

k, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return tls.Certificate{}, err
}

cert, err := x509.CreateCertificate(rand.Reader, tpl, tpl, k.Public(), k)
if err != nil {
return tls.Certificate{}, err
}
return tls.Certificate{PrivateKey: k, Certificate: [][]byte{cert}}, nil
}

0 comments on commit 308f118

Please sign in to comment.