Skip to content

Commit

Permalink
Merge pull request #481 from wzshiming/feat/general-stage
Browse files Browse the repository at this point in the history
[kwok] Support Stage API for other resource
  • Loading branch information
wzshiming committed Dec 8, 2023
2 parents d6869d3 + 29c760a commit 5b4e807
Show file tree
Hide file tree
Showing 48 changed files with 1,191 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
k8s.io/client-go v0.28.0
k8s.io/code-generator v0.28.0
k8s.io/cri-api v0.28.0
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9
k8s.io/kubelet v0.28.0
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
sigs.k8s.io/controller-runtime v0.16.0
Expand Down Expand Up @@ -121,7 +122,6 @@ require (
k8s.io/apiextensions-apiserver v0.28.0 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/cmd/config v0.11.3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
Expand Down
3 changes: 0 additions & 3 deletions kustomize/crd/bases/kwok.x-k8s.io_stages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ spec:
type: string
kind:
description: Kind of the referent.
enum:
- Pod
- Node
type: string
required:
- kind
Expand Down
2 changes: 2 additions & 0 deletions kustomize/kwok/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
- --enable-crds=ClusterLogs
- --enable-crds=PortForward
- --enable-crds=ClusterPortForward
- --enable-stage-for-refs=node
- --enable-stage-for-refs=pod
env:
- name: POD_IP
valueFrom:
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/config/v1alpha1/kwok_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type KwokConfigurationOptions struct {
// Once listed in this field, it will no longer be supported by the --config flag.
EnableCRDs []string `json:"enableCRDs,omitempty"`

// EnableStageForRefs is a list of refs to enable stage for.
// +default=["node", "pod"]
EnableStageForRefs []string `json:"enableStageForRefs,omitempty"`

// The default IP assigned to the Pod on maintained Nodes.
// is the default value for flag --cidr
// +default="10.0.0.1/24"
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/config/v1alpha1/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type KwokctlConfigurationOptions struct {
// Once listed in this field, it will no longer be supported by the --config flag.
EnableCRDs []string `json:"enableCRDs,omitempty"`

// EnableStageForRefs is a list of refs to enable stage for.
// +default=["node", "pod"]
EnableStageForRefs []string `json:"enableStageForRefs,omitempty"`

// KubeApiserverPort is the port to expose apiserver.
// is the default value for flag --kube-apiserver-port and env KWOK_KUBE_APISERVER_PORT
KubeApiserverPort uint32 `json:"kubeApiserverPort,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/apis/config/v1alpha1/zz_generated.defaults.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/apis/internalversion/kwok_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type KwokConfigurationOptions struct {
// EnableCRDs is a list of CRDs to enable.
EnableCRDs []string

// EnableStageForRefs is a list of refs to enable stage for.
EnableStageForRefs []string

// The default IP assigned to the Pod on maintained Nodes.
CIDR string

Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/internalversion/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type KwokctlConfigurationOptions struct {
// EnableCRDs is a list of CRDs to enable.
EnableCRDs []string

// EnableStageForRefs is a list of refs to enable stage for.
EnableStageForRefs []string

// KubeApiserverPort is the port to expose apiserver.
KubeApiserverPort uint32

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/internalversion/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/apis/internalversion/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/apis/v1alpha1/stage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type StageResourceRef struct {
// +kubebuilder:default="v1"
APIGroup string `json:"apiGroup,omitempty"`
// Kind of the referent.
// +kubebuilder:validation:Enum=Pod;Node
Kind string `json:"kind"`
}

Expand Down
74 changes: 55 additions & 19 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"time"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/clock"

nodefast "sigs.k8s.io/kwok/kustomize/stage/node/fast"
Expand Down Expand Up @@ -92,6 +94,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on")
cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds")
cmd.Flags().StringSliceVar(&flags.Options.EnableCRDs, "enable-crds", flags.Options.EnableCRDs, "List of CRDs to enable")
cmd.Flags().StringSliceVar(&flags.Options.EnableStageForRefs, "enable-stage-for-refs", flags.Options.EnableStageForRefs, "List of refs to enable stage for")

cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux")
if config.GOOS != "linux" {
Expand Down Expand Up @@ -141,19 +144,26 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

nodeStages := filterStages(stagesData, "v1", "Node")
podStages := filterStages(stagesData, "v1", "Pod")
var groupStages map[internalversion.StageResourceRef][]*internalversion.Stage

if !slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
if len(nodeStages) == 0 {
groupStages = slices.GroupBy(stagesData, func(stage *internalversion.Stage) internalversion.StageResourceRef {
return stage.Spec.ResourceRef
})

nodeRef := internalversion.StageResourceRef{APIGroup: "v1", Kind: "Node"}
podRef := internalversion.StageResourceRef{APIGroup: "v1", Kind: "Pod"}

if len(groupStages[nodeRef]) == 0 {
logger.Warn("No node stages found, using default node stages")
nodeStages, err = getDefaultNodeStages(flags.Options.NodeLeaseDurationSeconds == 0)
groupStages[nodeRef], err = getDefaultNodeStages(flags.Options.NodeLeaseDurationSeconds == 0)
if err != nil {
return err
}
}

if len(podStages) == 0 {
podStages, err = getDefaultPodStages()
if len(groupStages[podRef]) == 0 {
groupStages[podRef], err = getDefaultPodStages()
if err != nil {
return err
}
Expand All @@ -174,6 +184,21 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

dynamicClient, err := clientset.ToDynamicClient()
if err != nil {
return err
}

restMapper, err := clientset.ToRESTMapper()
if err != nil {
return err
}

restClient, err := rest.RESTClientFor(restConfig)
if err != nil {
return err
}

typedClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
Expand All @@ -183,6 +208,11 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

err = waitForReady(ctx, typedClient)
if err != nil {
return err
}

switch {
case flags.Options.ManageSingleNode != "":
logger.Info("Watch single node",
Expand All @@ -203,10 +233,27 @@ func runE(ctx context.Context, flags *flagpole) error {
}
ctx = log.NewContext(ctx, logger.With("id", id))

restMappings, err := slices.MapWithError(flags.Options.EnableStageForRefs, func(ref string) (*meta.RESTMapping, error) {
return client.MappingFor(restMapper, ref)
})
if err != nil {
return err
}

stageWithRefs := slices.Map(restMappings, func(m *meta.RESTMapping) internalversion.StageResourceRef {
return internalversion.StageResourceRef{
APIGroup: m.GroupVersionKind.GroupVersion().String(),
Kind: m.GroupVersionKind.Kind,
}
})

metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
enableMetrics := len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind)
ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
DynamicClient: dynamicClient,
RESTClient: restClient,
RESTMapper: restMapper,
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
Expand All @@ -224,8 +271,8 @@ func runE(ctx context.Context, flags *flagpole) error {
NodePort: flags.Options.NodePort,
PodPlayStageParallelism: flags.Options.PodPlayStageParallelism,
NodePlayStageParallelism: flags.Options.NodePlayStageParallelism,
NodeStages: nodeStages,
PodStages: podStages,
StageWithRefs: stageWithRefs,
LocalStages: groupStages,
NodeLeaseParallelism: flags.Options.NodeLeaseParallelism,
NodeLeaseDurationSeconds: flags.Options.NodeLeaseDurationSeconds,
ID: id,
Expand All @@ -234,11 +281,6 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

err = waitForReady(ctx, typedClient)
if err != nil {
return err
}

err = ctr.Start(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -385,12 +427,6 @@ func checkConfigOrCRD[T metav1.Object](crds []string, kind string, crs []T) erro
return nil
}

func filterStages(stages []*internalversion.Stage, apiGroup, kind string) []*internalversion.Stage {
return slices.Filter(stages, func(stage *internalversion.Stage) bool {
return stage.Spec.ResourceRef.APIGroup == apiGroup && stage.Spec.ResourceRef.Kind == kind
})
}

func waitForReady(ctx context.Context, clientset kubernetes.Interface) error {
logger := log.FromContext(ctx)
backoff := wait.Backoff{
Expand Down
Loading

0 comments on commit 5b4e807

Please sign in to comment.