Skip to content

Commit

Permalink
BZ2117310: Fix race when adding and removing pod with same name
Browse files Browse the repository at this point in the history
Adding and removing a pod on changing nodes back to back can end up in a race where
corresponding logical switch port remains in the wrong logical switch and never gets
properly removed. In order for this to happen, the logical switch port has to have
the same name, which is the <namespace>_<podName>.

Conflicts:
    go-controller/pkg/ovn/pods_test.go

Signed-off-by: Flavio Fernandes <flaviof@redhat.com>
Co-authored-by: Tim Rozet <trozet@redhat.com>
(cherry picked from commit be8786a)
  • Loading branch information
flavio-fernandes committed Aug 17, 2022
1 parent 679c713 commit cd2a1cb
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 6 deletions.
80 changes: 74 additions & 6 deletions go-controller/pkg/ovn/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,35 @@ func (oc *Controller) syncPodsRetriable(pods []interface{}) error {
return nil
}

// lookupPortUUIDAndNodeName will use libovsdb to locate the logical switch port uuid as well as the logical switch
// that owns such port (aka nodeName), based on the logical port name.
func (oc *Controller) lookupPortUUIDAndNodeName(logicalPort string) (portUUID string, logicalSwitch string, err error) {

ctx, cancel := context.WithTimeout(context.Background(), ovntypes.OVSDBTimeout)
defer cancel()
lsp := &nbdb.LogicalSwitchPort{Name: logicalPort}
err = oc.nbClient.Get(ctx, lsp)
if err != nil {
return "", "", fmt.Errorf("error getting logical port %+v: %w", lsp, err)
}
p := func(item *nbdb.LogicalSwitch) bool {
for _, currPortUUID := range item.Ports {
if currPortUUID == lsp.UUID {
return true
}
}
return false
}
nodeSwitches, err := libovsdbops.FindLogicalSwitchesWithPredicate(oc.nbClient, p)
if err != nil {
return "", "", fmt.Errorf("failed to get node logical switch for logical port %s (%s): %w", logicalPort, lsp.UUID, err)
}
if len(nodeSwitches) != 1 {
return "", "", fmt.Errorf("found %d node logical switch for logical port %s (%s)", len(nodeSwitches), logicalPort, lsp.UUID)
}
return lsp.UUID, nodeSwitches[0].Name, nil
}

func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err error) {
podDesc := pod.Namespace + "/" + pod.Name
klog.Infof("Deleting pod: %s", podDesc)
Expand All @@ -152,7 +181,8 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er
}

logicalPort := util.GetLogicalPortName(pod.Namespace, pod.Name)
portUUID := ""
var portUUID string
var nodeName string
var podIfAddrs []*net.IPNet
if portInfo == nil {
// If ovnkube-master restarts, it is also possible the Pod's logical switch port
Expand All @@ -161,24 +191,40 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er
if err != nil {
return fmt.Errorf("unable to unmarshal pod annocations for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}

// Since portInfo is not available, use ovn to locate the logical switch (named after the node name) for the logical port.
portUUID, nodeName, err = oc.lookupPortUUIDAndNodeName(logicalPort)
if err != nil {
return fmt.Errorf("unable to locate portUUID+nodeName for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
podIfAddrs = annotation.IPs

klog.Warningf("No cached port info for deleting pod: %s. Using logical switch %s port uuid %s and addrs %v",
podDesc, nodeName, portUUID, podIfAddrs)
} else {
portUUID = portInfo.uuid
nodeName = portInfo.logicalSwitch // ls <==> nodeName
podIfAddrs = portInfo.ips
}

// Sanity check. The nodeName from pod spec is expected to be the same as the logical switch obtained from the port.
if nodeName != pod.Spec.NodeName {
klog.Errorf("Deleting pod %s has an unexpected node name in spec: %s, ovn expects it to be %s for port uuid %s",
podDesc, pod.Spec.NodeName, nodeName, portUUID)
}

shouldRelease := true
// check to make sure no other pods are using this IP before we try to release it if this is a completed pod.
if util.PodCompleted(pod) {
if shouldRelease, err = oc.lsManager.ConditionalIPRelease(pod.Spec.NodeName, podIfAddrs, func() (bool, error) {
if shouldRelease, err = oc.lsManager.ConditionalIPRelease(nodeName, podIfAddrs, func() (bool, error) {
pods, err := oc.watchFactory.GetAllPods()
if err != nil {
return false, fmt.Errorf("unable to get pods to determine if completed pod IP is in use by another pod. "+
"Will not release pod %s/%s IP: %#v from allocator", pod.Namespace, pod.Name, podIfAddrs)
}
// iterate through all pods, ignore pods on other nodes
for _, p := range pods {
if util.PodCompleted(p) || !util.PodWantsNetwork(p) || !util.PodScheduled(p) || p.Spec.NodeName != pod.Spec.NodeName {
if util.PodCompleted(p) || !util.PodWantsNetwork(p) || !util.PodScheduled(p) || p.Spec.NodeName != nodeName {
continue
}
// check if the pod addresses match in the OVN annotation
Expand Down Expand Up @@ -214,7 +260,7 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er
}
allOps = append(allOps, ops...)
}
ops, err = oc.delLSPOps(logicalPort, pod.Spec.NodeName, portUUID)
ops, err = oc.delLSPOps(logicalPort, nodeName, portUUID)
if err != nil {
return fmt.Errorf("failed to create delete ops for the lsp: %s: %s", logicalPort, err)
}
Expand All @@ -231,7 +277,7 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er
}

if config.Gateway.DisableSNATMultipleGWs {
if err := deletePerPodGRSNAT(oc.nbClient, pod.Spec.NodeName, []*net.IPNet{}, podIfAddrs); err != nil {
if err := deletePerPodGRSNAT(oc.nbClient, nodeName, []*net.IPNet{}, podIfAddrs); err != nil {
return fmt.Errorf("cannot delete GR SNAT for pod %s: %w", podDesc, err)
}
}
Expand All @@ -246,7 +292,7 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er
// while it is now on another pod
klog.Infof("Attempting to release IPs for pod: %s/%s, ips: %s", pod.Namespace, pod.Name,
util.JoinIPNetIPs(podIfAddrs, " "))
if err := oc.lsManager.ReleaseIPs(pod.Spec.NodeName, podIfAddrs); err != nil {
if err := oc.lsManager.ReleaseIPs(nodeName, podIfAddrs); err != nil {
return fmt.Errorf("cannot release IPs for pod %s: %w", podDesc, err)
}

Expand Down Expand Up @@ -424,6 +470,28 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) (err error) {
lspExist = true
}

// Sanity check. If port exists, it should be in the logical switch obtained from the pod spec.
if lspExist {
portFound := false
ls, err = libovsdbops.FindSwitchByName(oc.nbClient, logicalSwitch)
// ls, err = libovsdbops.GetLogicalSwitch(oc.nbClient, ls)
if err != nil {
return fmt.Errorf("[%s/%s] unable to find logical switch %s in NBDB", pod.Namespace, pod.Name,
logicalSwitch)
}
for _, currPortUUID := range ls.Ports {
if currPortUUID == getLSP.UUID {
portFound = true
break
}
}
if !portFound {
// This should never happen and indicates we failed to clean up an LSP for a pod that was recreated
return fmt.Errorf("[%s/%s] failed to locate existing logical port %s (%s) in logical switch %s",
pod.Namespace, pod.Name, getLSP.Name, getLSP.UUID, logicalSwitch)
}
}

lsp.Options = make(map[string]string)
// Unique identifier to distinguish interfaces for recreated pods, also set by ovnkube-node
// ovn-controller will claim the OVS interface only if external_ids:iface-id
Expand Down
178 changes: 178 additions & 0 deletions go-controller/pkg/ovn/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,109 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("correctly remove a LSP from a pod that has stale nodeName annotation", func() {
app.Action = func(ctx *cli.Context) error {
namespace1 := *newNamespace("namespace1")
podTest := newTPod(
"node1",
"10.128.1.0/24",
"10.128.1.2",
"10.128.1.1",
"myPod",
"10.128.1.3",
"0a:58:0a:80:01:03",
namespace1.Name,
)
pod := newPod(podTest.namespace, podTest.podName, podTest.nodeName, podTest.podIP)
expectedData := []libovsdbtest.TestData{getExpectedDataPodsAndSwitches([]testPod{podTest}, []string{"node1"})}
fakeOvn.startWithDBSetup(initialDB,
&v1.NamespaceList{
Items: []v1.Namespace{
namespace1,
},
},
&v1.PodList{
Items: []v1.Pod{*pod},
},
)

podTest.populateLogicalSwitchCache(fakeOvn, getLogicalSwitchUUID(fakeOvn.controller.nbClient, "node1"))
fakeOvn.controller.WatchNamespaces()
fakeOvn.controller.WatchPods()

gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(expectedData...))
fakeOvn.asf.ExpectAddressSetWithIPs(podTest.namespace, []string{podTest.podIP})

// Get pod from api with its metadata filled in
pod, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(podTest.namespace).Get(context.TODO(), podTest.podName, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// Fudge nodename from pod's spec, to ensure it is not used by deleteLogicalPort
pod.Spec.NodeName = "this_is_the_wrong_nodeName"

// Deleting port from a pod that has no portInfo and the wrong nodeName should still be okay!
err = fakeOvn.controller.deleteLogicalPort(pod, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// OVN db should be empty now
fakeOvn.asf.ExpectEmptyAddressSet(podTest.namespace)
gomega.Eventually(fakeOvn.controller.nbClient).Should(
libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{}, []string{"node1"})...))

err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, *metav1.NewDeleteOptions(0))
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// check the retry cache has no entry
gomega.Eventually(func() *retryEntry {
return fakeOvn.controller.getPodRetryEntry(pod)
}).Should(gomega.BeNil())
return nil
}

err := app.Run([]string{app.Name})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("remove a LSP from a pod that has no OVN annotations", func() {
app.Action = func(ctx *cli.Context) error {
namespaceT := *newNamespace("namespace1")
t := newTPod(
"node1",
"10.128.1.0/24",
"10.128.1.2",
"10.128.1.1",
"myPod",
"10.128.1.3",
"0a:58:0a:80:01:03",
namespaceT.Name,
)
pod := newPod(t.namespace, t.podName, t.nodeName, t.podIP)
fakeOvn.startWithDBSetup(initialDB,
&v1.NamespaceList{
Items: []v1.Namespace{
namespaceT,
},
},
&v1.PodList{
Items: []v1.Pod{
*pod,
},
},
)
annotations := getPodAnnotations(fakeOvn.fakeClient.KubeClient, t.namespace, t.podName)
gomega.Expect(annotations).To(gomega.Equal(""))

// Deleting port from a pod that has no annotations should be okay
err := fakeOvn.controller.deleteLogicalPort(pod, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

return nil
}

err := app.Run([]string{app.Name})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("reconciles a deleted pod", func() {
app.Action = func(ctx *cli.Context) error {

Expand Down Expand Up @@ -1240,6 +1343,81 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() {
err := app.Run([]string{app.Name})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("Negative test: fails to add existing pod with an existing logical switch port on wrong node", func() {
app.Action = func(ctx *cli.Context) error {
namespaceT := *newNamespace("namespace1")
// use 2 pods for different test options
t1 := newTPod(
"node1",
"10.128.1.0/24",
"10.128.1.2",
"10.128.1.1",
"myPod1",
"10.128.1.3",
"0a:58:0a:80:01:03",
namespaceT.Name,
)

initialDB = libovsdbtest.TestSetup{
NBData: []libovsdbtest.TestData{
&nbdb.LogicalSwitchPort{
UUID: t1.portUUID,
Name: util.GetLogicalPortName(t1.namespace, t1.podName),
Addresses: []string{t1.podMAC, t1.podIP},
ExternalIDs: map[string]string{
"pod": "true",
"namespace": t1.namespace,
},
Options: map[string]string{
// check requested-chassis will be updated to correct t1.nodeName value
"requested-chassis": t1.nodeName,
// check old value for iface-id-ver will be updated to pod.UID
"iface-id-ver": "wrong_value",
},
PortSecurity: []string{fmt.Sprintf("%s %s", t1.podMAC, t1.podIP)},
},
&nbdb.LogicalSwitch{
Name: "node1",
Ports: []string{},
},
&nbdb.LogicalSwitch{
Name: "node2",
Ports: []string{t1.portUUID},
},
},
}

pod1 := newPod(t1.namespace, t1.podName, t1.nodeName, t1.podIP)
setPodAnnotations(pod1, t1)
fakeOvn.startWithDBSetup(initialDB,
&v1.NamespaceList{
Items: []v1.Namespace{
namespaceT,
},
},
&v1.PodList{
Items: []v1.Pod{
*pod1,
},
},
)
t1.populateLogicalSwitchCache(fakeOvn, getLogicalSwitchUUID(fakeOvn.controller.nbClient, "node1"))
// pod annotations and lsp exist now

fakeOvn.controller.WatchNamespaces()
fakeOvn.controller.WatchPods()

// should fail to update a port on the wrong switch
gomega.Eventually(func() *retryEntry {
return fakeOvn.controller.getPodRetryEntry(pod1)
}).ShouldNot(gomega.BeNil())
return nil
}

err := app.Run([]string{app.Name})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})
})

ginkgo.Context("with hybrid overlay gw mode", func() {
Expand Down

0 comments on commit cd2a1cb

Please sign in to comment.