Skip to content

Commit

Permalink
feat(k8s): sidecar containers (#9321)
Browse files Browse the repository at this point in the history
* feat(k8s): sidecar containers
* test(e2e_env): adjust container patch e2e test
* feat(kuma-cp): add experimental config for sidecar containers
* test(injector): add unit test for sidecar feature
* ci: add job for sidecar container feature
* chore: add warning if feature enabled but not supported

Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont committed Feb 22, 2024
1 parent c875628 commit 80c150e
Show file tree
Hide file tree
Showing 81 changed files with 5,999 additions and 203 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/build-test-distribute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,24 @@ jobs:
"k8sVersion": ["kindIpv6", "${{ env.K8S_MIN_VERSION }}", "${{ env.K8S_MAX_VERSION }}"],
"arch": ["amd64"],
"parallelism": [4],
"cniNetworkPlugin": ["flannel"]
"cniNetworkPlugin": ["flannel"],
"sidecarContainers": [""]
},
"test_e2e_env": {
"target": ["kubernetes", "universal", "multizone"],
"k8sVersion": ["kind", "kindIpv6", "${{ env.K8S_MIN_VERSION }}", "${{ env.K8S_MAX_VERSION }}"],
"arch": ["amd64"],
"parallelism": [1],
"cniNetworkPlugin": ["flannel"],
"sidecarContainers": [""],
"exclude":[
{"target": "kubernetes", "k8sVersion":"kind"},
{"target": "multizone", "k8sVersion":"kind"},
{"target":"universal", "k8sVersion":"${{ env.K8S_MIN_VERSION }}"},
{"target":"universal", "k8sVersion":"${{ env.K8S_MAX_VERSION }}"}
],
"include":[
{"sidecarContainers": "sidecarContainers", "k8sVersion": "${{ env.K8S_MAX_VERSION }}", "target": "kubernetes", "arch": "amd64"},
{"k8sVersion": "${{ env.K8S_MAX_VERSION }}", "target": "multizone", "arch": "arm64"},
{"k8sVersion": "${{ env.K8S_MAX_VERSION }}", "target": "kubernetes", "arch": "arm64"},
{"k8sVersion": "${{ env.K8S_MAX_VERSION }}", "target": "universal", "arch": "arm64"},
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ env:
E2E_PARAM_K8S_VERSION: ${{ fromJSON(inputs.matrix).k8sVersion }}
E2E_PARAM_CNI_NETWORK_PLUGIN: ${{ fromJSON(inputs.matrix).cniNetworkPlugin }}
E2E_PARAM_ARCH: ${{ fromJSON(inputs.matrix).arch }}
E2E_PARAM_SIDECAR_CONTAINERS: ${{ fromJSON(inputs.matrix).sidecarContainers }}
E2E_PARAM_TARGET: ${{ fromJSON(inputs.matrix).target }}
E2E_PARAM_PARALLELISM: ${{ fromJSON(inputs.matrix).parallelism }}
CI_TOOLS_DIR: /home/runner/work/kuma/kuma/.ci_tools
Expand Down Expand Up @@ -123,6 +124,10 @@ jobs:
export MAKE_PARAMETERS="-j2"
fi
if [[ "${{ env.E2E_PARAM_SIDECAR_CONTAINERS }}" != "" ]]; then
export KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS=true
fi
if [[ "${{ env.E2E_PARAM_TARGET }}" == "" ]]; then
export GINKGO_E2E_LABEL_FILTERS="job-${{ matrix.parallelRunnerId }}"
fi
Expand Down
2 changes: 1 addition & 1 deletion app/cni/pkg/cni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func cmdAdd(args *skel.CmdArgs) error {
return prepareResult(conf, logger)
}

if containerCount < 2 {
if _, sidecarInInitContainers := initContainersMap[util.KumaSidecarContainerName]; containerCount < 2 && !sidecarInInitContainers {
logger.Info("pod excluded - not enough containers in pod. Kuma-sidecar container required")
return prepareResult(conf, logger)
}
Expand Down
3 changes: 3 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ experimental:
# If true then control plane computes reachable services automatically based on MeshTrafficPermission.
# Lack of MeshTrafficPermission is treated as Deny the traffic.
autoReachableServices: false # ENV: KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES
# Enables sidecar containers in Kubernetes if supported by the Kubernetes
# environment.
sidecarContainers: false # ENV: KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS
proxy:
gateway:
# Sets the envoy runtime value to limit maximum number of incoming
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ var DefaultConfig = func() Config {
FullResyncInterval: config_types.Duration{Duration: 1 * time.Minute},
DelayFullResync: false,
},
SidecarContainers: false,
},
Proxy: xds.DefaultProxyConfig(),
InterCp: intercp.DefaultInterCpConfig(),
Expand Down Expand Up @@ -424,6 +425,9 @@ type ExperimentalConfig struct {
// If true then control plane computes reachable services automatically based on MeshTrafficPermission.
// Lack of MeshTrafficPermission is treated as Deny the traffic.
AutoReachableServices bool `json:"autoReachableServices" envconfig:"KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES"`
// Enables sidecar containers in Kubernetes if supported by the Kubernetes
// environment.
SidecarContainers bool `json:"sidecarContainers" envconfig:"KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS"`
}

type ExperimentalKDSEventBasedWatchdog struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ experimental:
# If true then control plane computes reachable services automatically based on MeshTrafficPermission.
# Lack of MeshTrafficPermission is treated as Deny the traffic.
autoReachableServices: false # ENV: KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES
# Enables sidecar containers in Kubernetes if supported by the Kubernetes
# environment.
sidecarContainers: false # ENV: KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS
proxy:
gateway:
# Sets the envoy runtime value to limit maximum number of incoming
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.KDSEventBasedWatchdog.FullResyncInterval.Duration).To(Equal(15 * time.Second))
Expect(cfg.Experimental.KDSEventBasedWatchdog.DelayFullResync).To(BeTrue())
Expect(cfg.Experimental.AutoReachableServices).To(BeTrue())
Expect(cfg.Experimental.SidecarContainers).To(BeTrue())

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
Expect(cfg.EventBus.BufferSize).To(Equal(uint(30)))
Expand Down Expand Up @@ -709,6 +710,7 @@ experimental:
fullResyncInterval: 15s
delayFullResync: true
autoReachableServices: true
sidecarContainers: true
proxy:
gateway:
globalDownstreamMaxConnections: 1
Expand Down Expand Up @@ -983,6 +985,7 @@ tracing:
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL": "15s",
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC": "true",
"KUMA_EXPERIMENTAL_AUTO_REACHABLE_SERVICES": "true",
"KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS": "true",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
"KUMA_TRACING_OPENTELEMETRY_ENDPOINT": "otel-collector:4317",
"KUMA_TRACING_OPENTELEMETRY_ENABLED": "true",
Expand Down
16 changes: 12 additions & 4 deletions pkg/plugins/runtime/k8s/controllers/inbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ func inboundForService(zone string, pod *kube_core.Pod, service *kube_core.Servi
// to figure out which container implements which service. Since we know container we can check its status
// and map it to the Dataplane health
if container != nil {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil && !cs.Ready {
if cs := util_k8s.FindContainerStatus(container.Name, pod.Status.ContainerStatuses); cs != nil && !cs.Ready {
state = mesh_proto.Dataplane_Networking_Inbound_NotReady
health.Ready = false
}
}

// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil && !cs.Ready {
if cs := util_k8s.FindContainerOrInitContainerStatus(
util_k8s.KumaSidecarContainerName,
pod.Status.ContainerStatuses,
pod.Status.InitContainerStatuses,
); cs != nil && !cs.Ready {
state = mesh_proto.Dataplane_Networking_Inbound_NotReady
health.Ready = false
}
Expand Down Expand Up @@ -98,15 +102,19 @@ func inboundForServiceless(zone string, pod *kube_core.Pod, name string) *mesh_p

for _, container := range pod.Spec.Containers {
if container.Name != util_k8s.KumaSidecarContainerName {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil && !cs.Ready {
if cs := util_k8s.FindContainerStatus(container.Name, pod.Status.ContainerStatuses); cs != nil && !cs.Ready {
state = mesh_proto.Dataplane_Networking_Inbound_NotReady
health.Ready = false
}
}
}

// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil && !cs.Ready {
if cs := util_k8s.FindContainerOrInitContainerStatus(
util_k8s.KumaSidecarContainerName,
pod.Status.ContainerStatuses,
pod.Status.InitContainerStatuses,
); cs != nil && !cs.Ready {
state = mesh_proto.Dataplane_Networking_Inbound_NotReady
health.Ready = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

// PodStatusReconciler tracks pods status changes and signals kuma-dp when it has to complete
// but only when Kuma isn't using the SidecarContainer feature
type PodStatusReconciler struct {
kube_client.Client
kube_record.EventRecorder
Expand Down
28 changes: 25 additions & 3 deletions pkg/plugins/runtime/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package k8s
import (
"fmt"

"github.com/Masterminds/semver/v3"
"github.com/pkg/errors"
"k8s.io/client-go/discovery"
kube_ctrl "sigs.k8s.io/controller-runtime"
kube_webhook "sigs.k8s.io/controller-runtime/pkg/webhook"
kube_admission "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand All @@ -29,9 +31,11 @@ import (
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/webhooks/injector"
)

var log = core.Log.WithName("plugin").WithName("runtime").WithName("k8s")

var _ core_plugins.RuntimePlugin = &plugin{}
var (
log = core.Log.WithName("plugin").WithName("runtime").WithName("k8s")
sidecarContainerVersion = semver.New(1, 29, 0, "", "")
_ core_plugins.RuntimePlugin = &plugin{}
)

type plugin struct{}

Expand Down Expand Up @@ -302,10 +306,28 @@ func addValidators(mgr kube_ctrl.Manager, rt core_runtime.Runtime, converter k8s
func addMutators(mgr kube_ctrl.Manager, rt core_runtime.Runtime, converter k8s_common.Converter) error {
if rt.Config().Mode != config_core.Global {
address := fmt.Sprintf("https://%s.%s:%d", rt.Config().Runtime.Kubernetes.ControlPlaneServiceName, rt.Config().Store.Kubernetes.SystemNamespace, rt.Config().DpServer.Port)
kubeConfig := mgr.GetConfig()
discClient, err := discovery.NewDiscoveryClientForConfig(kubeConfig)
if err != nil {
return err
}
k8sVersion, err := discClient.ServerVersion()
if err != nil {
return err
}
var sidecarContainersEnabled bool
if v, err := semver.NewVersion(
fmt.Sprintf("%s.%s.0", k8sVersion.Major, k8sVersion.Minor),
); err == nil && !v.LessThan(sidecarContainerVersion) {
sidecarContainersEnabled = rt.Config().Experimental.SidecarContainers
} else if rt.Config().Experimental.SidecarContainers {
log.Info("WARNING: sidecarContainers feature is enabled but Kubernetes server does not support it")
}
kumaInjector, err := injector.New(
rt.Config().Runtime.Kubernetes.Injector,
address,
mgr.GetClient(),
sidecarContainersEnabled,
converter,
rt.Config().GetEnvoyAdminPort(),
rt.Config().Store.Kubernetes.SystemNamespace,
Expand Down
12 changes: 10 additions & 2 deletions pkg/plugins/runtime/k8s/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,23 @@ func FindPort(pod *kube_core.Pod, svcPort *kube_core.ServicePort) (int, *kube_co
return 0, nil, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}

func FindContainerStatus(pod *kube_core.Pod, containerName string) *kube_core.ContainerStatus {
for _, cs := range pod.Status.ContainerStatuses {
func findContainerStatus(containerName string, status []kube_core.ContainerStatus, initStatus []kube_core.ContainerStatus) *kube_core.ContainerStatus {
for _, cs := range append(status, initStatus...) {
if cs.Name == containerName {
return &cs
}
}
return nil
}

func FindContainerStatus(containerName string, status []kube_core.ContainerStatus) *kube_core.ContainerStatus {
return findContainerStatus(containerName, status, nil)
}

func FindContainerOrInitContainerStatus(containerName string, status []kube_core.ContainerStatus, initStatus []kube_core.ContainerStatus) *kube_core.ContainerStatus {
return findContainerStatus(containerName, status, initStatus)
}

func CopyStringMap(in map[string]string) map[string]string {
if in == nil {
return nil
Expand Down
35 changes: 22 additions & 13 deletions pkg/plugins/runtime/k8s/webhooks/injector/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func New(
cfg runtime_k8s.Injector,
controlPlaneURL string,
client kube_client.Client,
sidecarContainersEnabled bool,
converter k8s_common.Converter,
envoyAdminPort uint32,
systemNamespace string,
Expand All @@ -53,23 +54,25 @@ func New(
caCert = string(bytes)
}
return &KumaInjector{
cfg: cfg,
client: client,
converter: converter,
defaultAdminPort: envoyAdminPort,
cfg: cfg,
client: client,
sidecarContainersEnabled: sidecarContainersEnabled,
converter: converter,
defaultAdminPort: envoyAdminPort,
proxyFactory: containers.NewDataplaneProxyFactory(controlPlaneURL, caCert, envoyAdminPort,
cfg.SidecarContainer.DataplaneContainer, cfg.BuiltinDNS, cfg.SidecarContainer.WaitForDataplaneReady),
systemNamespace: systemNamespace,
}, nil
}

type KumaInjector struct {
cfg runtime_k8s.Injector
client kube_client.Client
converter k8s_common.Converter
proxyFactory *containers.DataplaneProxyFactory
defaultAdminPort uint32
systemNamespace string
cfg runtime_k8s.Injector
client kube_client.Client
sidecarContainersEnabled bool
converter k8s_common.Converter
proxyFactory *containers.DataplaneProxyFactory
defaultAdminPort uint32
systemNamespace string
}

func (i *KumaInjector) InjectKuma(ctx context.Context, pod *kube_core.Pod) error {
Expand Down Expand Up @@ -121,9 +124,6 @@ func (i *KumaInjector) InjectKuma(ctx context.Context, pod *kube_core.Pod) error
pod.Annotations[kube_podcmd.DefaultContainerAnnotationName] = pod.Spec.Containers[0].Name
}

// inject sidecar as first container
pod.Spec.Containers = append([]kube_core.Container{patchedContainer}, pod.Spec.Containers...)

annotations, err := i.NewAnnotations(pod, meshName, logger)
if err != nil {
return errors.Wrap(err, "could not generate annotations for pod")
Expand Down Expand Up @@ -172,6 +172,15 @@ func (i *KumaInjector) InjectKuma(ctx context.Context, pod *kube_core.Pod) error
}
}

if i.sidecarContainersEnabled {
// inject sidecar after init
patchedContainer.RestartPolicy = pointer.To(kube_core.ContainerRestartPolicyAlways)
pod.Spec.InitContainers = append(pod.Spec.InitContainers, patchedContainer)
} else {
// inject sidecar as first container
pod.Spec.Containers = append([]kube_core.Container{patchedContainer}, pod.Spec.Containers...)
}

if err := i.overrideHTTPProbes(pod); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 80c150e

Please sign in to comment.