Skip to content

Commit

Permalink
Merge pull request #1481 from fasaxc/v3.20-backport-container-id
Browse files Browse the repository at this point in the history
[v3.20] Add round tripping of ContainerID to KDD.
  • Loading branch information
caseydavenport committed Jul 28, 2021
2 parents e38be01 + a41c2a7 commit 2794579
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 34 deletions.
6 changes: 5 additions & 1 deletion lib/backend/k8s/conversion/constants.go
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2020 Tigera, Inc. All rights reserved.
// Copyright (c) 2017-2021 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,10 @@ const (
AnnotationPodIP = "cni.projectcalico.org/podIP"
// AnnotationPodIPs is similar for the plural PodIPs field.
AnnotationPodIPs = "cni.projectcalico.org/podIPs"
// AnnotationContainerID stores the container ID of the pod. This allows us to disambiguate different pods
// that have the same name and namespace. For example, stateful set pod that is restarted. May be missing
// on older Pods.
AnnotationContainerID = "cni.projectcalico.org/containerID"

// NameLabel is a label that can be used to match a serviceaccount or namespace
// name exactly.
Expand Down
5 changes: 5 additions & 0 deletions lib/backend/k8s/conversion/workload_endpoint_default.go
Expand Up @@ -211,6 +211,10 @@ func (wc defaultWorkloadEndpointConverter) podToDefaultWorkloadEndpoint(pod *kap
}
}

// Get the container ID if present. This is used in the CNI plugin to distinguish different pods that have
// the same name. For example, restarted stateful set pods.
containerID := pod.Annotations[AnnotationContainerID]

// Create the workload endpoint.
wep := libapiv3.NewWorkloadEndpoint()
wep.ObjectMeta = metav1.ObjectMeta{
Expand All @@ -225,6 +229,7 @@ func (wc defaultWorkloadEndpointConverter) podToDefaultWorkloadEndpoint(pod *kap
Orchestrator: "k8s",
Node: pod.Spec.NodeName,
Pod: pod.Name,
ContainerID: containerID,
Endpoint: "eth0",
InterfaceName: interfaceName,
Profiles: profiles,
Expand Down
49 changes: 27 additions & 22 deletions lib/backend/k8s/resources/workloadendpoint.go
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2020 Tigera, Inc. All rights reserved.
// Copyright (c) 2016-2021 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,15 +60,15 @@ func (c *WorkloadEndpointClient) Create(ctx context.Context, kvp *model.KVPair)
// Note: it's a bit odd to do this in the Create, but the CNI plugin uses CreateOrUpdate(). Doing it
// here makes sure that, if the update fails: we retry here, and, we don't report success without
// making the patch.
return c.patchInPodIPs(ctx, kvp)
return c.patchInAnnotations(ctx, kvp)
}

func (c *WorkloadEndpointClient) Update(ctx context.Context, kvp *model.KVPair) (*model.KVPair, error) {
log.Debug("Received Update request on WorkloadEndpoint type")
// As a special case for the CNI plugin, try to patch the Pod with the IP that we've calculated.
// This works around a bug in kubelet that causes it to delay writing the Pod IP for a long time:
// https://github.com/kubernetes/kubernetes/issues/39113.
return c.patchInPodIPs(ctx, kvp)
return c.patchInAnnotations(ctx, kvp)
}

func (c *WorkloadEndpointClient) DeleteKVP(ctx context.Context, kvp *model.KVPair) (*model.KVPair, error) {
Expand All @@ -77,16 +77,17 @@ func (c *WorkloadEndpointClient) DeleteKVP(ctx context.Context, kvp *model.KVPai

func (c *WorkloadEndpointClient) Delete(ctx context.Context, key model.Key, revision string, uid *types.UID) (*model.KVPair, error) {
log.Debug("Delete for WorkloadEndpoint, patching out annotations.")
return c.patchOutPodIPs(ctx, key, revision, uid)
return c.patchOutAnnotations(ctx, key, revision, uid)
}

// patchInPodIPs PATCHes the Kubernetes Pod associated with the given KVPair with the IP addresses it contains.
// patchInAnnotations PATCHes the Kubernetes Pod associated with the given KVPair with the IP addresses it contains.
// This is a no-op if there is no IP address.
//
// We store the IP addresses in annotations because patching the PodStatus directly races with changes that
// kubelet makes so kubelet can undo our changes.
func (c *WorkloadEndpointClient) patchInPodIPs(ctx context.Context, kvp *model.KVPair) (*model.KVPair, error) {
ips := kvp.Value.(*libapiv3.WorkloadEndpoint).Spec.IPNetworks
func (c *WorkloadEndpointClient) patchInAnnotations(ctx context.Context, kvp *model.KVPair) (*model.KVPair, error) {
wep := kvp.Value.(*libapiv3.WorkloadEndpoint)
ips := wep.Spec.IPNetworks
if len(ips) == 0 {
return kvp, nil
}
Expand All @@ -96,16 +97,20 @@ func (c *WorkloadEndpointClient) patchInPodIPs(ctx context.Context, kvp *model.K

// Note: we drop the revision here because the CNI plugin can't handle a retry right now (and the kubelet
// ensures that only one CNI ADD for a given UID can be in progress).
return c.patchPodIPAnnotations(ctx, key, "", kvp.UID, ips)
return c.patchPodAnnotations(ctx, key, "", kvp.UID, wep.Spec.ContainerID, ips)
}

// patchOutPodIPs sets our pod IP annotations to empty strings; this is used to signal that the IP has been removed
// patchOutAnnotations sets our pod IP annotations to empty strings; this is used to signal that the IP has been removed
// from the pod at teardown.
func (c *WorkloadEndpointClient) patchOutPodIPs(ctx context.Context, key model.Key, revision string, uid *types.UID) (*model.KVPair, error) {
return c.patchPodIPAnnotations(ctx, key, revision, uid, nil)
func (c *WorkloadEndpointClient) patchOutAnnotations(ctx context.Context, key model.Key, revision string, uid *types.UID) (*model.KVPair, error) {
// Passing "" for containerID means "don't touch". Passing nil for IPs will result in all annotations being
// explicitly set to the empty string. Setting the podIPs to empty string is used to signal that the CNI DEL
// has removed the IP from the Pod. We leave the container ID in place to allow any repeat invocations of the
// CNI DEL to tell which instance of a Pod they are seeing.
return c.patchPodAnnotations(ctx, key, revision, uid, "", nil)
}

func (c *WorkloadEndpointClient) patchPodIPAnnotations(ctx context.Context, key model.Key, revision string, uid *types.UID, ips []string) (*model.KVPair, error) {
func (c *WorkloadEndpointClient) patchPodAnnotations(ctx context.Context, key model.Key, revision string, uid *types.UID, containerID string, ips []string) (*model.KVPair, error) {
wepID, err := c.converter.ParseWorkloadEndpointName(key.(model.ResourceKey).Name)
if err != nil {
return nil, err
Expand All @@ -120,11 +125,15 @@ func (c *WorkloadEndpointClient) patchPodIPAnnotations(ctx context.Context, key
if len(ips) > 0 {
firstIP = ips[0]
}
patch, err := calculateAnnotationPatch(
revision, uid,
conversion.AnnotationPodIP, firstIP,
conversion.AnnotationPodIPs, strings.Join(ips, ","),
)
annotations := map[string]string{
conversion.AnnotationPodIP: firstIP,
conversion.AnnotationPodIPs: strings.Join(ips, ","),
}
if containerID != "" {
log.WithField("containerID", containerID).Debug("Container ID specified, including in patch")
annotations[conversion.AnnotationContainerID] = containerID
}
patch, err := calculateAnnotationPatch(revision, uid, annotations)
if err != nil {
log.WithError(err).Error("failed to calculate Pod patch.")
return nil, err
Expand All @@ -144,16 +153,12 @@ func (c *WorkloadEndpointClient) patchPodIPAnnotations(ctx context.Context, key
return kvps[0], nil
}

func calculateAnnotationPatch(revision string, uid *types.UID, namesAndValues ...string) ([]byte, error) {
func calculateAnnotationPatch(revision string, uid *types.UID, annotations map[string]string) ([]byte, error) {
patch := map[string]interface{}{}
metadata := map[string]interface{}{}
patch["metadata"] = metadata
annotations := map[string]interface{}{}
metadata["annotations"] = annotations

for i := 0; i < len(namesAndValues); i += 2 {
annotations[namesAndValues[i]] = namesAndValues[i+1]
}
if revision != "" {
// We have a revision. Since the revision is immutable, if our patch revision doesn't match then the
// patch will fail.
Expand Down
32 changes: 21 additions & 11 deletions lib/backend/k8s/resources/workloadendpoint_test.go
Expand Up @@ -124,7 +124,8 @@ var _ = Describe("WorkloadEndpointClient", func() {
Namespace: "testNamespace",
},
Spec: libapiv3.WorkloadEndpointSpec{
IPNetworks: []string{"192.168.91.117/32", "192.168.91.118/32"},
ContainerID: "abcde12345",
IPNetworks: []string{"192.168.91.117/32", "192.168.91.118/32"},
},
}

Expand All @@ -143,8 +144,9 @@ var _ = Describe("WorkloadEndpointClient", func() {
pod, err := k8sClient.CoreV1().Pods("testNamespace").Get(ctx, "simplePod", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
Expect(pod.GetAnnotations()).Should(Equal(map[string]string{
conversion.AnnotationPodIP: "192.168.91.117/32",
conversion.AnnotationPodIPs: "192.168.91.117/32,192.168.91.118/32",
conversion.AnnotationPodIP: "192.168.91.117/32",
conversion.AnnotationPodIPs: "192.168.91.117/32,192.168.91.118/32",
conversion.AnnotationContainerID: "abcde12345",
}))
})
})
Expand Down Expand Up @@ -227,7 +229,8 @@ var _ = Describe("WorkloadEndpointClient", func() {
Namespace: "testNamespace",
},
Spec: libapiv3.WorkloadEndpointSpec{
IPNetworks: []string{"192.168.91.117/32", "192.168.91.118/32"},
IPNetworks: []string{"192.168.91.117/32", "192.168.91.118/32"},
ContainerID: "abcd1234",
},
}

Expand All @@ -246,24 +249,26 @@ var _ = Describe("WorkloadEndpointClient", func() {
pod, err := k8sClient.CoreV1().Pods("testNamespace").Get(ctx, "simplePod", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
Expect(pod.GetAnnotations()).Should(Equal(map[string]string{
conversion.AnnotationPodIP: "192.168.91.117/32",
conversion.AnnotationPodIPs: "192.168.91.117/32,192.168.91.118/32",
conversion.AnnotationPodIP: "192.168.91.117/32",
conversion.AnnotationPodIPs: "192.168.91.117/32,192.168.91.118/32",
conversion.AnnotationContainerID: "abcd1234",
}))
})
})
})

Describe("Delete", func() {
Context("WorkloadEndpoint has no IPs set", func() {
It("zeros out the cni.projectcalico.org/podIP and cni.projectcalico.org/podIPs annotations", func() {
It("zeros out the annotations", func() {
podUID := types.UID(uuid.NewString())
k8sClient := fake.NewSimpleClientset(&k8sapi.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "simplePod",
Namespace: "testNamespace",
Annotations: map[string]string{
conversion.AnnotationPodIP: "192.168.91.117/32",
conversion.AnnotationPodIPs: "192.168.91.117/32,192.168.91.118/32",
conversion.AnnotationPodIP: "192.168.91.117/32",
conversion.AnnotationPodIPs: "192.168.91.117/32,192.168.91.118/32",
conversion.AnnotationContainerID: "abcde12345",
},
UID: podUID,
},
Expand Down Expand Up @@ -310,8 +315,9 @@ var _ = Describe("WorkloadEndpointClient", func() {
pod, err := k8sClient.CoreV1().Pods("testNamespace").Get(ctx, "simplePod", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
Expect(pod.GetAnnotations()).Should(Equal(map[string]string{
conversion.AnnotationPodIP: "",
conversion.AnnotationPodIPs: "",
conversion.AnnotationPodIP: "",
conversion.AnnotationPodIPs: "",
conversion.AnnotationContainerID: "abcde12345",
}))
})
})
Expand All @@ -323,6 +329,9 @@ var _ = Describe("WorkloadEndpointClient", func() {
ObjectMeta: metav1.ObjectMeta{
Name: "simplePod",
Namespace: "testNamespace",
Annotations: map[string]string{
conversion.AnnotationContainerID: "abcde12345",
},
},
Spec: k8sapi.PodSpec{
NodeName: "test-node",
Expand Down Expand Up @@ -368,6 +377,7 @@ var _ = Describe("WorkloadEndpointClient", func() {
Profiles: []string{"kns.testNamespace"},
IPNetworks: []string{},
InterfaceName: "caliedff4356bd6",
ContainerID: "abcde12345",
},
}))
})
Expand Down

0 comments on commit 2794579

Please sign in to comment.