Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ cache: add a synthetic delay to the cache server #2742

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ test-e2e-sharded: require-kind build-all build-kind-images
kind get kubeconfig > "$(WORK_DIR)/.kcp/kind.kubeconfig"
rm -f "$(WORK_DIR)/.kcp/ready-to-test"
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 \
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) --cache-synthetic-delay=500ms 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \
Expand Down
3 changes: 2 additions & 1 deletion cmd/sharded-test-server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/kcp-dev/kcp/test/e2e/framework"
)

func startCacheServer(ctx context.Context, logDirPath, workingDir string) (<-chan error, string, error) {
func startCacheServer(ctx context.Context, logDirPath, workingDir string, syntheticDelay time.Duration) (<-chan error, string, error) {
cyan := color.New(color.BgHiCyan, color.FgHiWhite).SprintFunc()
inverse := color.New(color.BgHiWhite, color.FgHiCyan).SprintFunc()
out := lineprefix.New(
Expand All @@ -59,6 +59,7 @@ func startCacheServer(ctx context.Context, logDirPath, workingDir string) (<-cha
"--embedded-etcd-client-port=8010",
"--embedded-etcd-peer-port=8011",
fmt.Sprintf("--secure-port=%d", cachePort),
fmt.Sprintf("--synthetic-delay=%s", syntheticDelay.String()),
)
fmt.Fprintf(out, "running: %v\n", strings.Join(commandLine, " "))
cmd := exec.CommandContext(ctx, commandLine[0], commandLine[1:]...) //nolint:gosec
Expand Down
8 changes: 5 additions & 3 deletions cmd/sharded-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -46,6 +47,7 @@ func main() {
logDirPath := flag.String("log-dir-path", "", "Path to the log files. If empty, log files are stored in the dot directories.")
workDirPath := flag.String("work-dir-path", "", "Path to the working directory where the .kcp* dot directories are created. If empty, the working directory is the current directory.")
numberOfShards := flag.Int("number-of-shards", 1, "The number of shards to create. The first created is assumed root.")
cacheSyntheticDelay := flag.Duration("cache-synthetic-delay", 0, "The duration of time the cache server will inject a delay for to all inbound requests.")
quiet := flag.Bool("quiet", false, "Suppress output of the subprocesses")

// split flags into --proxy-*, --shard-* and everything else (generic). The former are
Expand All @@ -62,13 +64,13 @@ func main() {
}
flag.CommandLine.Parse(genericFlags) //nolint:errcheck

if err := start(proxyFlags, shardFlags, *logDirPath, *workDirPath, *numberOfShards, *quiet); err != nil {
if err := start(proxyFlags, shardFlags, *logDirPath, *workDirPath, *numberOfShards, *quiet, *cacheSyntheticDelay); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
}

func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numberOfShards int, quiet bool) error {
func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numberOfShards int, quiet bool, cacheSyntheticDelay time.Duration) error {
// We use a shutdown context to know that it's time to gather metrics, before stopping the shards, proxy, etc.
shutdownCtx, shutdownCancel := context.WithCancel(genericapiserver.SetupSignalContext())
defer shutdownCancel()
Expand Down Expand Up @@ -186,7 +188,7 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb

cacheServerErrCh := make(chan indexErrTuple)
cacheServerConfigPath := ""
cacheServerCh, configPath, err := startCacheServer(ctx, logDirPath, workDirPath)
cacheServerCh, configPath, err := startCacheServer(ctx, logDirPath, workDirPath, cacheSyntheticDelay)
if err != nil {
return fmt.Errorf("error starting the cache server: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func NewConfig(opts *cacheserveroptions.CompletedOptions, optionalLocalShardRest
apiHandler = filters.WithClusterScope(apiHandler)
apiHandler = WithShardScope(apiHandler)
apiHandler = WithServiceScope(apiHandler)
apiHandler = WithSyntheticDelay(apiHandler, opts.SyntheticDelay)
return apiHandler
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/cache/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/url"
"regexp"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -141,3 +142,11 @@ func WithServiceScope(handler http.Handler) http.Handler {
handler.ServeHTTP(w, req)
})
}

// WithSyntheticDelay injects a synthetic delay to calls, to exacerbate timing issues and expose inconsistent client behavior.
func WithSyntheticDelay(handler http.Handler, delay time.Duration) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
time.Sleep(delay)
stevekuznetsov marked this conversation as resolved.
Show resolved Hide resolved
handler.ServeHTTP(w, req)
})
}
5 changes: 5 additions & 0 deletions pkg/cache/server/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package options

import (
"time"

"github.com/spf13/pflag"

genericoptions "k8s.io/apiserver/pkg/server/options"
Expand All @@ -34,6 +36,7 @@ type Options struct {
Authorization *genericoptions.DelegatingAuthorizationOptions
APIEnablement *genericoptions.APIEnablementOptions
EmbeddedEtcd etcdoptions.Options
SyntheticDelay time.Duration
}

type completedOptions struct {
Expand All @@ -44,6 +47,7 @@ type completedOptions struct {
Authorization *genericoptions.DelegatingAuthorizationOptions
APIEnablement *genericoptions.APIEnablementOptions
EmbeddedEtcd etcdoptions.CompletedOptions
SyntheticDelay time.Duration
}

type CompletedOptions struct {
Expand Down Expand Up @@ -113,4 +117,5 @@ func (o *Options) Complete() (*CompletedOptions, error) {
func (o *Options) AddFlags(fs *pflag.FlagSet) {
o.EmbeddedEtcd.AddFlags(fs)
o.SecureServing.AddFlags(fs)
fs.DurationVar(&o.SyntheticDelay, "synthetic-delay", 0, "The duration of time the cache server will inject a delay for to all inbound requests. Useful for testing.")
}