Skip to content

Commit

Permalink
ovn: retry pod setup in update if it fails during add
Browse files Browse the repository at this point in the history
If a failure occurred during a pod Add the error would be logged
but setup would never be retried because the pod would remain
scheduled. Instead, use the 'ovn' annotation as a marker of
whether the pod has been set up or not. Since setting the annotation
should be the last thing we do, we can mostly use this as an
indicator of setup success.

Signed-off-by: Dan Williams <dcbw@redhat.com>
  • Loading branch information
dcbw committed Nov 10, 2019
1 parent d3b05e8 commit e081c4a
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 50 deletions.
36 changes: 30 additions & 6 deletions go-controller/pkg/ovn/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,25 +330,49 @@ func (oc *Controller) ovnControllerEventChecker(stopChan chan struct{}) {
}
}

func podScheduledAndNetworked(pod *kapi.Pod) bool {
// Only care about scheduled and networked pods
return pod.Spec.NodeName != "" && !pod.Spec.HostNetwork
}

func podNamespacedName(pod *kapi.Pod) types.NamespacedName {
return types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
}

// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic
func (oc *Controller) WatchPods() error {
handledPods := make(map[types.NamespacedName]bool)
_, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kapi.Pod)
if pod.Spec.NodeName != "" {
oc.addLogicalPort(pod)
if !podScheduledAndNetworked(pod) {
return
}
if err := oc.addLogicalPort(pod); err != nil {
logrus.Errorf(err.Error())
} else {
handledPods[podNamespacedName(pod)] = true
}
},
UpdateFunc: func(old, newer interface{}) {
podNew := newer.(*kapi.Pod)
podOld := old.(*kapi.Pod)
if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" {
oc.addLogicalPort(podNew)
pod := newer.(*kapi.Pod)
namespacedName := podNamespacedName(pod)
_, podHandled := handledPods[namespacedName]
if podScheduledAndNetworked(pod) && !podHandled {
if err := oc.addLogicalPort(pod); err != nil {
logrus.Errorf(err.Error())
} else {
handledPods[namespacedName] = true
}
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*kapi.Pod)
oc.deleteLogicalPort(pod)
delete(handledPods, podNamespacedName(pod))
},
}, oc.syncPods)
return err
Expand Down
48 changes: 20 additions & 28 deletions go-controller/pkg/ovn/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ func (oc *Controller) syncPods(pods []interface{}) {
logrus.Errorf("Spurious object in syncPods: %v", podInterface)
continue
}
logicalPort := podLogicalPortName(pod)
expectedLogicalPorts[logicalPort] = true
if podScheduledAndNetworked(pod) {
logicalPort := podLogicalPortName(pod)
expectedLogicalPorts[logicalPort] = true
}
}

// get the list of logical ports from OVN
Expand Down Expand Up @@ -229,18 +231,17 @@ func (oc *Controller) waitForNodeLogicalSwitch(nodeName string) error {
return nil
}

func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
func (oc *Controller) addLogicalPort(pod *kapi.Pod) error {
var out, stderr string
var err error
if pod.Spec.HostNetwork {
return
return nil
}

logicalSwitch := pod.Spec.NodeName
if logicalSwitch == "" {
logrus.Errorf("Failed to find the logical switch for pod %s/%s",
return fmt.Errorf("Failed to find the logical switch for pod %s/%s",
pod.Namespace, pod.Name)
return
}

// Keep track of how long syncs take.
Expand All @@ -250,7 +251,7 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
}()

if err = oc.waitForNodeLogicalSwitch(pod.Spec.NodeName); err != nil {
return
return err
}

portName := podLogicalPortName(pod)
Expand All @@ -270,10 +271,9 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
"external-ids:pod=true", "--", "--if-exists",
"clear", "logical_switch_port", portName, "dynamic_addresses")
if err != nil {
logrus.Errorf("Failed to add logical port to switch "+
return fmt.Errorf("Failed to add logical port to switch "+
"stdout: %q, stderr: %q (%v)",
out, stderr, err)
return
}
} else {
out, stderr, err = util.RunOVNNbctl("--wait=sb", "--",
Expand All @@ -285,20 +285,17 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
"external-ids:logical_switch="+logicalSwitch,
"external-ids:pod=true")
if err != nil {
logrus.Errorf("Error while creating logical port %s "+
return fmt.Errorf("Error while creating logical port %s "+
"stdout: %q, stderr: %q (%v)",
portName, out, stderr, err)
return
}

}

oc.logicalPortCache[portName] = logicalSwitch

gatewayIP, err := oc.getGatewayFromSwitch(logicalSwitch)
if err != nil {
logrus.Errorf("Error obtaining gateway address for switch %s", logicalSwitch)
return
return fmt.Errorf("Error obtaining gateway address for switch %s", logicalSwitch)
}

var podMac net.HardwareAddr
Expand All @@ -310,17 +307,14 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
break
}
if err != nil {
logrus.Errorf("Error while obtaining addresses for %s - %v", portName,
err)
return
return fmt.Errorf("Error while obtaining addresses for %s - %v", portName, err)
}
time.Sleep(time.Second)
count--
}
if count == 0 {
logrus.Errorf("Error while obtaining addresses for %s "+
return fmt.Errorf("Error while obtaining addresses for %s "+
"stdout: %q, stderr: %q, (%v)", portName, out, stderr, err)
return
}

podCIDR := &net.IPNet{IP: podIP, Mask: gatewayIP.Mask}
Expand All @@ -329,9 +323,8 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
out, stderr, err = util.RunOVNNbctl("lsp-set-port-security", portName,
fmt.Sprintf("%s %s", podMac, podCIDR))
if err != nil {
logrus.Errorf("error while setting port security for logical port %s "+
return fmt.Errorf("error while setting port security for logical port %s "+
"stdout: %q, stderr: %q (%v)", portName, out, stderr, err)
return
}

routes := []util.PodRoute{}
Expand All @@ -355,17 +348,16 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
Routes: routes,
})
if err != nil {
logrus.Errorf("error creating pod network annotation: %v", err)
return
return fmt.Errorf("error creating pod network annotation: %v", err)
}

oc.addPodToNamespace(pod.Namespace, podIP, portName)

logrus.Debugf("Annotation values: ip=%s ; mac=%s ; gw=%s\nAnnotation=%s",
podCIDR, podMac, gatewayIP, annotation)
err = oc.kube.SetAnnotationOnPod(pod, "ovn", marshalledAnnotation)
if err != nil {
logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err)
if err = oc.kube.SetAnnotationOnPod(pod, "ovn", marshalledAnnotation); err != nil {
return fmt.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err)
}
oc.addPodToNamespace(pod.Namespace, podIP, portName)

return
return nil
}
104 changes: 88 additions & 16 deletions go-controller/pkg/ovn/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func newTPod(nodeName, nodeSubnet, nodeMgtIP, nodeGWIP, podName, podIP, podMAC,
return
}

func (p pod) addCmds(fexec *ovntest.FakeExec, exists bool) {
// node setup
func (p pod) addNodeSetupCmds(fexec *ovntest.FakeExec) {
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 get logical_switch " + p.nodeName + " other-config",
Output: `{exclude_ips="10.128.1.2", subnet="` + p.nodeSubnet + `"}`,
Expand All @@ -86,6 +85,9 @@ func (p pod) addCmds(fexec *ovntest.FakeExec, exists bool) {
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --may-exist acl-add " + p.nodeName + " to-lport 1001 ip4.src==" + p.nodeMgtIP + " allow-related",
})
}

func (p pod) addCmds(fexec *ovntest.FakeExec, exists, fail, gatewayCached bool) {
// pod setup
if exists {
fexec.AddFakeCmdsNoOutputNoError([]string{
Expand All @@ -96,25 +98,42 @@ func (p pod) addCmds(fexec *ovntest.FakeExec, exists bool) {
"ovn-nbctl --timeout=15 --wait=sb -- --may-exist lsp-add " + p.nodeName + " " + p.portName + " -- lsp-set-addresses " + p.portName + " dynamic -- set logical_switch_port " + p.portName + " external-ids:namespace=" + p.namespace + " external-ids:logical_switch=" + p.nodeName + " external-ids:pod=true",
})
}
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --if-exists get logical_switch " + p.nodeName + " external_ids:gateway_ip",
Output: fmt.Sprintf("%s/24", p.nodeGWIP),
})
if !gatewayCached {
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --if-exists get logical_switch " + p.nodeName + " external_ids:gateway_ip",
Output: fmt.Sprintf("%s/24", p.nodeGWIP),
})
}
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 get logical_switch_port " + p.portName + " dynamic_addresses",
Output: `"` + p.podMAC + " " + p.podIP + `"`,
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 lsp-set-port-security " + p.portName + " " + p.podMAC + " " + p.podIP + "/24",
})
if fail {
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 lsp-set-port-security " + p.portName + " " + p.podMAC + " " + p.podIP + "/24",
Err: fmt.Errorf("adsfadsfasfdasfd"),
})
} else {
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 lsp-set-port-security " + p.portName + " " + p.podMAC + " " + p.podIP + "/24",
})
}
}

func (p pod) addCmdsForNonExistingPod(fexec *ovntest.FakeExec) {
p.addCmds(fexec, false)
p.addCmds(fexec, false, false, false)
}

func (p pod) addCmdsForNonExistingPodGatewayCached(fexec *ovntest.FakeExec) {
p.addCmds(fexec, false, false, true)
}

func (p pod) addCmdsForExistingPod(fexec *ovntest.FakeExec) {
p.addCmds(fexec, true)
p.addCmds(fexec, true, false, false)
}

func (p pod) addCmdsForNonExistingFailedPod(fexec *ovntest.FakeExec) {
p.addCmds(fexec, false, true, false)
}

func (p pod) delCmds(fexec *ovntest.FakeExec) {
Expand Down Expand Up @@ -176,6 +195,7 @@ var _ = Describe("OVN Pod Operations", func() {

// Assign it and perform the update
t.nodeName = "node1"
t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingPod(fExec)

_, err = fakeOvn.fakeClient.CoreV1().Pods(t.namespace).Update(newPod(t.namespace, t.podName, t.nodeName, t.podIP))
Expand Down Expand Up @@ -226,6 +246,7 @@ var _ = Describe("OVN Pod Operations", func() {
Expect(pod).To(BeNil())
Expect(fExec.CalledMatchesExpected()).To(BeTrue())

t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingPod(fExec)

_, err := fakeOvn.fakeClient.CoreV1().Pods(t.namespace).Create(newPod(t.namespace, t.podName, t.nodeName, t.podIP))
Expand Down Expand Up @@ -266,7 +287,7 @@ var _ = Describe("OVN Pod Operations", func() {
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch_port external_ids:pod=true",
Output: t.portName + "\n",
})

t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingPod(fExec)

fakeOvn := FakeOVN{}
Expand Down Expand Up @@ -303,6 +324,57 @@ var _ = Describe("OVN Pod Operations", func() {
Expect(err).NotTo(HaveOccurred())
})

It("retries a failed pod Add on Update", func() {
app.Action = func(ctx *cli.Context) error {

// Setup an unassigned pod, perform an update later on which assigns it.
t := newTPod(
"node1",
"10.128.1.0/24",
"10.128.1.2",
"10.128.1.1",
"myPod",
"10.128.1.4",
"11:22:33:44:55:66",
"namespace",
)

fExec := ovntest.NewFakeExec()
fExec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch_port external_ids:pod=true",
Output: "\n",
})
t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingFailedPod(fExec)

fakeOvn := FakeOVN{}
fakeOvn.start(ctx, fExec, &v1.PodList{
Items: []v1.Pod{
*newPod(t.namespace, t.podName, t.nodeName, t.podIP),
},
})
fakeOvn.controller.WatchPods()
Expect(fExec.CalledMatchesExpected()).To(BeTrue())

// Pod creation should be retried on Update event
t.addCmdsForNonExistingPodGatewayCached(fExec)
_, err := fakeOvn.fakeClient.CoreV1().Pods(t.namespace).Update(newPod(t.namespace, t.podName, t.nodeName, t.podIP))
Expect(err).NotTo(HaveOccurred())
Eventually(fExec.CalledMatchesExpected).Should(BeTrue())

pod, err := fakeOvn.fakeClient.CoreV1().Pods(t.namespace).Get(t.podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

podAnnotation, ok := pod.Annotations["ovn"]
Expect(ok).To(BeTrue())
Expect(podAnnotation).To(MatchJSON(`{"ip_address":"` + t.podIP + `/24", "mac_address":"` + t.podMAC + `", "gateway_ip": "` + t.nodeGWIP + `"}`))

return nil
}

err := app.Run([]string{app.Name})
Expect(err).NotTo(HaveOccurred())
})
})

Context("on startup", func() {
Expand All @@ -326,7 +398,7 @@ var _ = Describe("OVN Pod Operations", func() {
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch_port external_ids:pod=true",
Output: t.portName + "\n",
})

t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingPod(fExec)

fakeOvn := FakeOVN{}
Expand Down Expand Up @@ -414,7 +486,7 @@ var _ = Describe("OVN Pod Operations", func() {
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch_port external_ids:pod=true",
Output: "\n",
})

t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingPod(fExec)

fakeOvn := FakeOVN{}
Expand Down Expand Up @@ -462,7 +534,7 @@ var _ = Describe("OVN Pod Operations", func() {
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch_port external_ids:pod=true",
Output: "\n",
})

t.addNodeSetupCmds(fExec)
t.addCmdsForNonExistingPod(fExec)

fakeOvn := FakeOVN{}
Expand All @@ -486,7 +558,7 @@ var _ = Describe("OVN Pod Operations", func() {
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch_port external_ids:pod=true",
Output: "\n",
})

t.addNodeSetupCmds(fExec)
t.addCmdsForExistingPod(fExec)

fakeOvn.restart()
Expand Down

0 comments on commit e081c4a

Please sign in to comment.