Skip to content

Commit

Permalink
Merge pull request #2596 from sttts/sttts-random-workspace-scheduling
Browse files Browse the repository at this point in the history
✨ enable shard scheduling and fix e2e tests
  • Loading branch information
openshift-merge-robot committed Jan 30, 2023
2 parents 3799080 + 8f04da8 commit da4b8d7
Show file tree
Hide file tree
Showing 92 changed files with 3,536 additions and 1,188 deletions.
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ endif
test-e2e-sharded: TEST_ARGS ?=
test-e2e-sharded: WHAT ?= ./test/e2e...
test-e2e-sharded: WORK_DIR ?= .
test-e2e-sharded: SHARDS ?= 2
ifdef ARTIFACT_DIR
test-e2e-sharded: LOG_DIR ?= $(ARTIFACT_DIR)/kcp
else
Expand All @@ -316,12 +317,13 @@ test-e2e-sharded: require-kind build-all build-kind-images
kind get kubeconfig > "$(WORK_DIR)/.kcp/kind.kubeconfig"
rm -f "$(WORK_DIR)/.kcp/admin.kubeconfig"
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 \
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=2 2>&1 & PID=$$!; echo "PID $$PID" && \
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/admin.kubeconfig" ]; do sleep 1; done && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \
$(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --root-shard-kubeconfig=$(PWD)/.kcp-0/admin.kubeconfig $(SUITES_ARG) \
-args --use-default-kcp-server --shard-kubeconfigs=root=$(PWD)/.kcp-0/admin.kubeconfig$(shell if [ $(SHARDS) -gt 1 ]; then seq 1 $$[$(SHARDS) - 1]; fi | while read n; do echo -n ",shard-$$n=$(PWD)/.kcp-$$n/admin.kubeconfig"; done) \
$(SUITES_ARG) \
--syncer-image="$(SYNCER_IMAGE)" --kcp-test-image="$(TEST_IMAGE)" --pcluster-kubeconfig="$(abspath $(WORK_DIR)/.kcp/kind.kubeconfig)" \
$(if $(value WAIT),|| { echo "Terminated with $$?"; wait "$$PID"; },)

Expand All @@ -332,6 +334,7 @@ endif
test-e2e-sharded-minimal: TEST_ARGS ?=
test-e2e-sharded-minimal: WHAT ?= ./test/e2e...
test-e2e-sharded-minimal: WORK_DIR ?= .
test-e2e-sharded-minimal: SHARDS ?= 2
ifdef ARTIFACT_DIR
test-e2e-sharded-minimal: LOG_DIR ?= $(ARTIFACT_DIR)/kcp
else
Expand All @@ -340,11 +343,12 @@ endif
test-e2e-sharded-minimal: build-all
mkdir -p "$(LOG_DIR)" "$(WORK_DIR)/.kcp"
rm -f "$(WORK_DIR)/.kcp/admin.kubeconfig"
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=2 2>&1 & PID=$$!; echo "PID $$PID" && \
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/admin.kubeconfig" ]; do sleep 1; done && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) $(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --root-shard-kubeconfig=$(PWD)/.kcp-0/admin.kubeconfig $(SUITES_ARGS) \
-args --use-default-kcp-server --shard-kubeconfigs=root=$(PWD)/.kcp-0/admin.kubeconfig$(shell if [ $(SHARDS) -gt 1 ]; then seq 1 $$[$(SHARDS) - 1]; fi | while read n; do echo -n ",shard-$$n=$(PWD)/.kcp-$$n/admin.kubeconfig"; done) \
$(SUITES_ARGS) \
$(if $(value WAIT),|| { echo "Terminated with $$?"; wait "$$PID"; },)

.PHONY: test
Expand Down
3 changes: 2 additions & 1 deletion cmd/apigen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"

admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
Expand Down Expand Up @@ -172,7 +173,7 @@ func loadCustomResourceDefinitions(logger logr.Logger, baseDir string) (map[meta
Group: parts[0],
Resource: parts[1],
}
if gr.Group == apis.GroupName || gr.Group == rbacv1.GroupName {
if gr.Group == apis.GroupName || gr.Group == rbacv1.GroupName || gr.Group == admissionregistrationv1.GroupName {
logger.Info(fmt.Sprintf("Skipping CustomResourceDefinition %s from %s", gr.String(), path))
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/cache-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main

import (
"flag"
"math/rand"
"os"
"strings"
"time"

"github.com/spf13/cobra"

Expand All @@ -34,6 +36,8 @@ import (
)

func main() {
rand.Seed(time.Now().UTC().UnixNano())

rootDir := flag.String("root-directory", ".kcp-cache", "Path to the root directory where all files required by this server will be stored")

var cacheServerFlags, remainingFlags []string //nolint:prealloc
Expand Down
4 changes: 4 additions & 0 deletions cmd/kcp/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main

import (
"fmt"
"math/rand"
"os"
"strings"
"time"

"github.com/spf13/cobra"

Expand All @@ -44,6 +46,8 @@ import (
)

func main() {
rand.Seed(time.Now().UTC().UnixNano())

cmd := &cobra.Command{
Use: "kcp",
Short: "Kube for Control Plane (KCP)",
Expand Down
85 changes: 84 additions & 1 deletion cmd/sharded-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ import (
"path/filepath"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
machineryutilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
kuser "k8s.io/apiserver/pkg/authentication/user"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"

"github.com/kcp-dev/kcp/cmd/sharded-test-server/third_party/library-go/crypto"
shard "github.com/kcp-dev/kcp/cmd/test-server/kcp"
"github.com/kcp-dev/kcp/pkg/apis/core"
"github.com/kcp-dev/kcp/pkg/authorization/bootstrap"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
)

func main() {
Expand Down Expand Up @@ -209,7 +215,7 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
vwPort = "7444"

for i := 0; i < numberOfShards; i++ {
vw, err := newVirtualWorkspace(ctx, i, servingCA, hostIP.String(), logDirPath, workDirPath, clientCA)
vw, err := newVirtualWorkspace(ctx, i, servingCA, hostIP.String(), logDirPath, workDirPath, clientCA, cacheServerConfigPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -262,6 +268,37 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
return err
}

// Label region of shards
clientConfig, err := loadKubeConfig(filepath.Join(workDirPath, ".kcp/admin.kubeconfig"), "base")
if err != nil {
return err
}
config, err := clientConfig.ClientConfig()
if err != nil {
return err
}
client, err := kcpclientset.NewForConfig(config)
if err != nil {
return err
}
for i := range shards {
name := fmt.Sprintf("shard-%d", i)
if i == 0 {
name = "root"
}

if i >= len(regions) {
break
}
patch := fmt.Sprintf(`{"metadata":{"labels":{"region":%q}}}`, regions[i])
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := client.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().Patch(ctx, name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}); err != nil {
return err
}
}

select {
case shardIndexErr := <-shardsErrCh:
return fmt.Errorf("shard %d exited: %w", shardIndexErr.index, shardIndexErr.error)
Expand All @@ -278,3 +315,49 @@ type indexErrTuple struct {
index int
error error
}

func loadKubeConfig(kubeconfigPath, contextName string) (clientcmd.ClientConfig, error) {
fs, err := os.Stat(kubeconfigPath)
if err != nil {
return nil, err
}
if fs.Size() == 0 {
return nil, fmt.Errorf("%s points to an empty file", kubeconfigPath)
}

rawConfig, err := clientcmd.LoadFromFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load admin kubeconfig: %w", err)
}

return clientcmd.NewNonInteractiveClientConfig(*rawConfig, contextName, nil, nil), nil
}

var regions = []string{
"us-east-2",
"us-east-1",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-2",
"ap-southeast-3",
"ap-south-1",
"ap-northeast-3",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-northeast-1",
"ca-central-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-south-1",
"eu-west-3",
"eu-south-2",
"eu-north-1",
"eu-central-2",
"me-south-1",
"me-central-1",
"sa-east-1",
}
2 changes: 1 addition & 1 deletion cmd/sharded-test-server/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newShard(ctx context.Context, n int, args []string, standaloneVW bool, serv
fmt.Sprintf("--shard-virtual-workspace-ca-file=%s", filepath.Join(workDirPath, ".kcp", "serving-ca.crt")),
)
if len(cacheServerConfigPath) > 0 {
args = append(args, fmt.Sprintf("--cache-server-kubeconfig-file=%s", cacheServerConfigPath))
args = append(args, fmt.Sprintf("--cache-kubeconfig=%s", cacheServerConfigPath))
}

if standaloneVW {
Expand Down
3 changes: 2 additions & 1 deletion cmd/sharded-test-server/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type VirtualWorkspace struct {
writer headWriter
}

func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, hostIP string, logDirPath, workDirPath string, clientCA *crypto.CA) (*VirtualWorkspace, error) {
func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, hostIP string, logDirPath, workDirPath string, clientCA *crypto.CA, cacheServerConfigPath string) (*VirtualWorkspace, error) {
logger := klog.FromContext(ctx)

// create serving cert
Expand Down Expand Up @@ -131,6 +131,7 @@ func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, h
args := []string{}
args = append(args,
fmt.Sprintf("--kubeconfig=%s", kubeconfigPath),
fmt.Sprintf("--cache-kubeconfig=%s", cacheServerConfigPath),
fmt.Sprintf("--authentication-kubeconfig=%s", authenticationKubeconfigPath),
fmt.Sprintf("--client-ca-file=%s", clientCAFilePath),
fmt.Sprintf("--tls-private-key-file=%s", servingKeyFile),
Expand Down
4 changes: 4 additions & 0 deletions cmd/syncer/cmd/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package cmd
import (
"context"
"errors"
"math/rand"
"os"
"time"

"github.com/kcp-dev/logicalcluster/v3"
"github.com/spf13/cobra"
Expand All @@ -38,6 +40,8 @@ import (
const numThreads = 2

func NewSyncerCommand() *cobra.Command {
rand.Seed(time.Now().UTC().UnixNano())

options := synceroptions.NewOptions()
syncerCommand := &cobra.Command{
Use: "syncer",
Expand Down
18 changes: 17 additions & 1 deletion cmd/virtual-workspaces/command/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ func Run(ctx context.Context, o *options.Options) error {
return err
}

// parse cache kubeconfig
defaultCacheClientConfig, err := kubeConfig.ClientConfig()
if err != nil {
return err
}
cacheConfig, err := o.Cache.RestConfig(defaultCacheClientConfig)
if err != nil {
return err
}
cacheKcpClusterClient, err := kcpclientset.NewForConfig(cacheConfig)
if err != nil {
return err
}

// Don't throttle
nonIdentityConfig.QPS = -1

Expand Down Expand Up @@ -127,14 +141,15 @@ func Run(ctx context.Context, o *options.Options) error {
return err
}
wildcardKcpInformers := kcpinformers.NewSharedInformerFactory(kcpClusterClient, 10*time.Minute)
cacheKcpInformers := kcpinformers.NewSharedInformerFactory(cacheKcpClusterClient, 10*time.Minute)

if o.ProfilerAddress != "" {
//nolint:errcheck,gosec
go http.ListenAndServe(o.ProfilerAddress, nil)
}

// create apiserver
virtualWorkspaces, err := o.VirtualWorkspaces.NewVirtualWorkspaces(identityConfig, o.RootPathPrefix, wildcardKubeInformers, wildcardKcpInformers)
virtualWorkspaces, err := o.VirtualWorkspaces.NewVirtualWorkspaces(identityConfig, o.RootPathPrefix, wildcardKubeInformers, wildcardKcpInformers, cacheKcpInformers)
if err != nil {
return err
}
Expand All @@ -157,6 +172,7 @@ func Run(ctx context.Context, o *options.Options) error {
rootAPIServerConfig, err := virtualrootapiserver.NewRootAPIConfig(recommendedConfig, []virtualrootapiserver.InformerStart{
wildcardKubeInformers.Start,
wildcardKcpInformers.Start,
cacheKcpInformers.Start,
}, virtualWorkspaces)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions cmd/virtual-workspaces/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/component-base/logs"

cacheoptions "github.com/kcp-dev/kcp/pkg/cache/client/options"
virtualworkspacesoptions "github.com/kcp-dev/kcp/pkg/virtual/options"
)

Expand All @@ -44,6 +45,7 @@ type Options struct {
Context string
RootPathPrefix string

Cache cacheoptions.Cache
SecureServing genericapiserveroptions.SecureServingOptions
Authentication genericapiserveroptions.DelegatingAuthenticationOptions
Authorization virtualworkspacesoptions.Authorization
Expand All @@ -61,6 +63,7 @@ func NewOptions() *Options {

RootPathPrefix: DefaultRootPathPrefix,

Cache: *cacheoptions.NewCache(),
SecureServing: *genericapiserveroptions.NewSecureServingOptions(),
Authentication: *genericapiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: *virtualworkspacesoptions.NewAuthorization(),
Expand All @@ -79,6 +82,7 @@ func NewOptions() *Options {
}

func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.Cache.AddFlags(flags)
o.SecureServing.AddFlags(flags)
o.Authentication.AddFlags(flags)
o.Logs.AddFlags(flags)
Expand All @@ -94,6 +98,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {

func (o *Options) Validate() error {
errs := []error{}
errs = append(errs, o.Cache.Validate()...)
errs = append(errs, o.SecureServing.Validate()...)
errs = append(errs, o.Authentication.Validate()...)
errs = append(errs, o.VirtualWorkspaces.Validate()...)
Expand Down

0 comments on commit da4b8d7

Please sign in to comment.