Skip to content

Commit

Permalink
libovsdb: fix race condition in OVN LB operations (#2625)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Apr 10, 2023
1 parent cfff2db commit 1fc5d85
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 138 deletions.
2 changes: 0 additions & 2 deletions dist/images/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ RUN cd /usr/src/ && git clone -b branch-22.12 --depth=1 https://github.com/ovn-o
curl -s https://github.com/kubeovn/ovn/commit/cd31bebd1c15fb8e50107861a9397e672fcbad27.patch | git apply && \
# fix reaching resubmit limit in underlay
curl -s https://github.com/kubeovn/ovn/commit/993147dd6ab2dd4565379315a6d96ab12f4a294d.patch | git apply && \
# do not remove LB if vips is empty
curl -s https://github.com/kubeovn/ovn/commit/8693053ed3d5ecd5cd70658315d3079f33e9f8e0.patch | git apply && \
sed -i 's/OVN/ovn/g' debian/changelog && \
rm -rf .git && \
./boot.sh && \
Expand Down
68 changes: 34 additions & 34 deletions mocks/pkg/ovs/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controller
import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -192,17 +191,17 @@ func (c *Controller) handleUpdateEndpoint(key string) error {

// for performance reason delete lb with no backends
if len(backends) != 0 {
if err = c.ovnClient.LoadBalancerAddVips(lb, map[string]string{vip: backends}); err != nil {
if err = c.ovnClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", vip, backends, lb, err)
return err
}
} else {
if err := c.ovnClient.LoadBalancerDeleteVips(lb, vip); err != nil {
if err := c.ovnClient.LoadBalancerDeleteVip(lb, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
return err
}

if err := c.ovnClient.LoadBalancerDeleteVips(oldLb, vip); err != nil {
if err := c.ovnClient.LoadBalancerDeleteVip(oldLb, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
return err
}
Expand All @@ -213,7 +212,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
return nil
}

func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP string) string {
func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP string) []string {
backends := []string{}
protocol := util.CheckProtocol(serviceIP)
for _, subset := range endpoints.Subsets {
Expand Down Expand Up @@ -261,5 +260,5 @@ func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort
}
}

return strings.Join(backends, ",")
return backends
}
2 changes: 1 addition & 1 deletion pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (c *Controller) gcLoadBalancer() error {

for vip := range lb.Vips {
if _, ok := svcVips[vip]; !ok {
if err = c.ovnClient.LoadBalancerDeleteVips(lbName, vip); err != nil {
if err = c.ovnClient.LoadBalancerDeleteVip(lbName, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lbName, err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/ovn_dnat.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (c *Controller) AddDnatRule(vpcName, dnatName, externalIp, internalIp, exte
return err
}

if err := c.ovnClient.LoadBalancerAddVips(dnatName, map[string]string{externalEndpoint: internalEndpoint}); err != nil {
if err := c.ovnClient.LoadBalancerAddVip(dnatName, externalEndpoint, internalEndpoint); err != nil {
klog.Errorf("add vip %s with backends %s to LB %s: %v", externalEndpoint, internalEndpoint, dnatName, err)
return err
}
Expand All @@ -609,7 +609,7 @@ func (c *Controller) AddDnatRule(vpcName, dnatName, externalIp, internalIp, exte
func (c *Controller) DelDnatRule(vpcName, dnatName, externalIp, externalPort string) error {
externalEndpoint := net.JoinHostPort(externalIp, externalPort)

if err := c.ovnClient.LoadBalancerDeleteVips(dnatName, externalEndpoint); err != nil {
if err := c.ovnClient.LoadBalancerDeleteVip(dnatName, externalEndpoint); err != nil {
klog.Errorf("delete loadBalancer vips %s: %v", externalEndpoint, err)
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (c *Controller) handleDeleteService(service *vpcService) error {
}

for _, lb := range vpcLB {
if err := c.ovnClient.LoadBalancerDeleteVips(lb, vip); err != nil {
if err := c.ovnClient.LoadBalancerDeleteVip(lb, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
return err
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (c *Controller) handleUpdateService(key string) error {
}
klog.V(3).Infof("existing vips of LB %s: %v", lbName, lb.Vips)
for _, vip := range svcVips {
if err := c.ovnClient.LoadBalancerDeleteVips(oLbName, vip); err != nil {
if err := c.ovnClient.LoadBalancerDeleteVip(oLbName, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oLbName, err)
return err
}
Expand All @@ -366,7 +366,7 @@ func (c *Controller) handleUpdateService(key string) error {
for vip := range lb.Vips {
if ip := parseVipAddr(vip); (util.ContainsString(ips, ip) && !util.IsStringIn(vip, svcVips)) || util.ContainsString(ipsToDel, ip) {
klog.Infof("remove stale vip %s from LB %s", vip, lb)
if err := c.ovnClient.LoadBalancerDeleteVips(lbName, vip); err != nil {
if err := c.ovnClient.LoadBalancerDeleteVip(lbName, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
return err
}
Expand All @@ -386,7 +386,7 @@ func (c *Controller) handleUpdateService(key string) error {
for vip := range oLb.Vips {
if ip := parseVipAddr(vip); util.ContainsString(ips, ip) || util.ContainsString(ipsToDel, ip) {
klog.Infof("remove stale vip %s from LB %s", vip, oLbName)
if err = c.ovnClient.LoadBalancerDeleteVips(oLbName, vip); err != nil {
if err = c.ovnClient.LoadBalancerDeleteVip(oLbName, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oLbName, err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ovs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type LogicalSwitchPort interface {

type LoadBalancer interface {
CreateLoadBalancer(lbName, protocol, selectFields string) error
LoadBalancerAddVips(lbName string, vips map[string]string) error
LoadBalancerDeleteVips(lbName string, vips ...string) error
LoadBalancerAddVip(lbName, vip string, backends ...string) error
LoadBalancerDeleteVip(lbName, vip string) error
SetLoadBalancerAffinityTimeout(lbName string, timeout int) error
DeleteLoadBalancers(filter func(lb *ovnnb.LoadBalancer) bool) error
GetLoadBalancer(lbName string, ignoreNotFound bool) (*ovnnb.LoadBalancer, error)
Expand Down
108 changes: 58 additions & 50 deletions pkg/ovs/ovn-nb-load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package ovs
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"strings"

"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
Expand Down Expand Up @@ -61,59 +62,66 @@ func (c *ovnClient) UpdateLoadBalancer(lb *ovnnb.LoadBalancer, fields ...interfa
return nil
}

func (c *ovnClient) loadBalancerUpdateVips(lbName string, vips interface{}) error {
var toAdd map[string]string
var toDelete []string
value := reflect.ValueOf(vips)
switch value.Type().Kind() {
case reflect.Slice:
toDelete = vips.([]string)
case reflect.Map:
toAdd = vips.(map[string]string)
default:
return fmt.Errorf("program error: invalid data type of vips %+v", vips)
}
if value.Len() == 0 {
return nil
}

lb, err := c.GetLoadBalancer(lbName, false)
// LoadBalancerAddVips adds or updates a vip
func (c *ovnClient) LoadBalancerAddVip(lbName, vip string, backends ...string) error {
sort.Strings(backends)
ops, err := c.LoadBalancerOp(lbName, func(lb *ovnnb.LoadBalancer) []model.Mutation {
mutations := make([]model.Mutation, 0, 2)
value := strings.Join(backends, ",")
if len(lb.Vips) != 0 {
if lb.Vips[vip] == value {
return nil
}
mutations = append(mutations, model.Mutation{
Field: &lb.Vips,
Value: map[string]string{vip: lb.Vips[vip]},
Mutator: ovsdb.MutateOperationDelete,
})
}
mutations = append(mutations, model.Mutation{
Field: &lb.Vips,
Value: map[string]string{vip: value},
Mutator: ovsdb.MutateOperationInsert,
})
return mutations
})
if err != nil {
return err
return fmt.Errorf("failed to generate operations when adding vip %s with backends %v to load balancers %s: %v", vip, backends, lbName, err)
}

m := make(map[string]string, len(lb.Vips)+len(toAdd))
for k, v := range lb.Vips {
m[k] = v
if err = c.Transact("lb-add", ops); err != nil {
return fmt.Errorf("failed to add vip %s with backends %v to load balancers %s: %v", vip, backends, lbName, err)
}
for k, v := range toAdd {
m[k] = v
}
for _, k := range toDelete {
delete(m, k)
return nil
}

// LoadBalancerDeleteVip deletes load balancer vip
func (c *ovnClient) LoadBalancerDeleteVip(lbName string, vip string) error {
ops, err := c.LoadBalancerOp(lbName, func(lb *ovnnb.LoadBalancer) []model.Mutation {
if len(lb.Vips) == 0 {
return nil
}
if _, ok := lb.Vips[vip]; !ok {
return nil
}

return []model.Mutation{{
Field: &lb.Vips,
Value: map[string]string{vip: lb.Vips[vip]},
Mutator: ovsdb.MutateOperationDelete,
}}
})
if err != nil {
return fmt.Errorf("failed to generate operations when deleting vip %s from load balancers %s: %v", vip, lbName, err)
}
if reflect.DeepEqual(m, lb.Vips) {
if len(ops) == 0 {
return nil
}

lb.Vips = m
if err := c.UpdateLoadBalancer(lb, &lb.Vips); err != nil {
return fmt.Errorf("add vips %v to lb %s: %v", vips, lbName, err)
if err = c.Transact("lb-add", ops); err != nil {
return fmt.Errorf("failed to delete vip %s from load balancers %s: %v", vip, lbName, err)
}

return nil
}

// LoadBalancerAddVips adds or updates vips
func (c *ovnClient) LoadBalancerAddVips(lbName string, vips map[string]string) error {
return c.loadBalancerUpdateVips(lbName, vips)
}

// LoadBalancerDeleteVips deletes load balancer vips
func (c *ovnClient) LoadBalancerDeleteVips(lbName string, vips ...string) error {
return c.loadBalancerUpdateVips(lbName, vips)
}

// SetLoadBalancerAffinityTimeout sets the LB's affinity timeout in seconds
func (c *ovnClient) SetLoadBalancerAffinityTimeout(lbName string, timeout int) error {
lb, err := c.GetLoadBalancer(lbName, false)
Expand Down Expand Up @@ -224,7 +232,7 @@ func (c *ovnClient) ListLoadBalancers(filter func(lb *ovnnb.LoadBalancer) bool)
return lbList, nil
}

func (c *ovnClient) LoadBalancerOp(lbName string, mutationsFunc ...func(lb *ovnnb.LoadBalancer) *model.Mutation) ([]ovsdb.Operation, error) {
func (c *ovnClient) LoadBalancerOp(lbName string, mutationsFunc ...func(lb *ovnnb.LoadBalancer) []model.Mutation) ([]ovsdb.Operation, error) {
lb, err := c.GetLoadBalancer(lbName, false)
if err != nil {
return nil, err
Expand All @@ -235,14 +243,14 @@ func (c *ovnClient) LoadBalancerOp(lbName string, mutationsFunc ...func(lb *ovnn
}

mutations := make([]model.Mutation, 0, len(mutationsFunc))

for _, f := range mutationsFunc {
mutation := f(lb)

if mutation != nil {
mutations = append(mutations, *mutation)
if m := f(lb); len(m) != 0 {
mutations = append(mutations, m...)
}
}
if len(mutations) == 0 {
return nil, nil
}

ops, err := c.ovnNbClient.Where(lb).Mutate(lb, mutations...)
if err != nil {
Expand Down
Loading

0 comments on commit 1fc5d85

Please sign in to comment.