Skip to content

Commit

Permalink
fix network policy issues (#2652)
Browse files Browse the repository at this point in the history
* network policy: fix race condition

* ci: add missing build flags

* libovsdb: fix port group ports operations

we should update ports directly instead of clearing all ports and then adding ports

* controller: remove port group key mutex
  • Loading branch information
zhangzujian committed Apr 17, 2023
1 parent 148f1bf commit f84343e
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 86 deletions.
38 changes: 20 additions & 18 deletions Makefile.e2e
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
E2E_BUILD_FLAGS = -ldflags "-w -s"

KUBECONFIG = $(shell echo $${KUBECONFIG:-$(HOME)/.kube/config})

E2E_BRANCH := $(shell echo $${E2E_BRANCH:-master})
Expand Down Expand Up @@ -55,33 +57,33 @@ e2e: kube-ovn-conformance-e2e

.PHONY: e2e-build
e2e-build:
ginkgo build -ldflags "-w -s" ./test/e2e/k8s-network
ginkgo build -ldflags "-w -s" ./test/e2e/kube-ovn
ginkgo build -ldflags "-w -s" ./test/e2e/ovn-ic
ginkgo build -ldflags "-w -s" ./test/e2e/lb-svc
ginkgo build -ldflags "-w -s" ./test/e2e/ovn-eip
ginkgo build -ldflags "-w -s" ./test/e2e/security
ginkgo build -ldflags "-w -s" ./test/e2e/kubevirt
ginkgo build -ldflags "-w -s" ./test/e2e/webhook
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/k8s-network
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/kube-ovn
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/ovn-ic
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/lb-svc
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/ovn-eip
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/security
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/kubevirt
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/webhook

.PHONY: k8s-conformance-e2e
k8s-conformance-e2e:
ginkgo build ./test/e2e/k8s-network
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/k8s-network
ginkgo $(GINKGO_PARALLEL_OPT) --randomize-all --always-emit-ginkgo-writer --timeout=1h \
$(call ginkgo_option,focus,$(K8S_CONFORMANCE_E2E_FOCUS)) \
$(call ginkgo_option,skip,$(K8S_CONFORMANCE_E2E_SKIP)) \
./test/e2e/k8s-network/k8s-network.test

.PHONY: k8s-netpol-legacy-e2e
k8s-netpol-legacy-e2e:
ginkgo build ./test/e2e/k8s-network
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/k8s-network
ginkgo -p --randomize-all --always-emit-ginkgo-writer --timeout=2h \
$(call ginkgo_option,focus,$(K8S_NETPOL_LEGACY_E2E_FOCUS)) \
./test/e2e/k8s-network/k8s-network.test

.PHONY: k8s-netpol-e2e
k8s-netpol-e2e:
ginkgo build ./test/e2e/k8s-network
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/k8s-network
ginkgo -p --randomize-all --always-emit-ginkgo-writer --timeout=2h \
$(call ginkgo_option,focus,$(K8S_NETPOL_E2E_FOCUS)) \
$(call ginkgo_option,skip,$(K8S_NETPOL_E2E_SKIP)) \
Expand All @@ -103,7 +105,7 @@ cyclonus-netpol-e2e:

.PHONY: kube-ovn-conformance-e2e
kube-ovn-conformance-e2e:
ginkgo build ./test/e2e/kube-ovn
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/kube-ovn
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand All @@ -112,7 +114,7 @@ kube-ovn-conformance-e2e:

.PHONY: kube-ovn-ic-conformance-e2e
kube-ovn-ic-conformance-e2e:
ginkgo build ./test/e2e/ovn-ic
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/ovn-ic
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand All @@ -127,7 +129,7 @@ kube-ovn-submariner-conformance-e2e:

.PHONY: kube-ovn-lb-svc-conformance-e2e
kube-ovn-lb-svc-conformance-e2e:
ginkgo build ./test/e2e/lb-svc
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/lb-svc
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand All @@ -136,7 +138,7 @@ kube-ovn-lb-svc-conformance-e2e:

.PHONY: kube-ovn-eip-conformance-e2e
kube-ovn-eip-conformance-e2e:
ginkgo build ./test/e2e/ovn-eip
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/ovn-eip
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand All @@ -145,7 +147,7 @@ kube-ovn-eip-conformance-e2e:

.PHONY: kube-ovn-security-e2e
kube-ovn-security-e2e:
ginkgo build ./test/e2e/security
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/security
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand All @@ -154,7 +156,7 @@ kube-ovn-security-e2e:

.PHONY: kube-ovn-kubevirt-e2e
kube-ovn-kubevirt-e2e:
ginkgo build ./test/e2e/kubevirt
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/kubevirt
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand All @@ -163,7 +165,7 @@ kube-ovn-kubevirt-e2e:

.PHONY: kube-ovn-webhook-e2e
kube-ovn-webhook-e2e:
ginkgo build ./test/e2e/webhook
ginkgo build $(E2E_BUILD_FLAGS) ./test/e2e/webhook
E2E_BRANCH=$(E2E_BRANCH) \
E2E_IP_FAMILY=$(E2E_IP_FAMILY) \
E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \
Expand Down
24 changes: 12 additions & 12 deletions mocks/pkg/ovs/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type Controller struct {

ovnLegacyClient *ovs.LegacyClient
ovnClient ovs.OvnClient
ovnPgKeyMutex *keymutex.KeyMutex

// ExternalGatewayType define external gateway type, centralized
ExternalGatewayType string
Expand Down Expand Up @@ -209,6 +208,7 @@ type Controller struct {
npsSynced cache.InformerSynced
updateNpQueue workqueue.RateLimitingInterface
deleteNpQueue workqueue.RateLimitingInterface
npKeyMutex *keymutex.KeyMutex

sgsLister kubeovnlister.SecurityGroupLister
sgSynced cache.InformerSynced
Expand Down Expand Up @@ -285,7 +285,6 @@ func NewController(config *Configuration) *Controller {
vpcs: &sync.Map{},
podSubnetMap: &sync.Map{},
ovnLegacyClient: ovs.NewLegacyClient(config.OvnNbAddr, config.OvnTimeout, config.OvnSbAddr, config.ClusterRouter, config.ClusterTcpLoadBalancer, config.ClusterUdpLoadBalancer, config.ClusterTcpSessionLoadBalancer, config.ClusterUdpSessionLoadBalancer, config.NodeSwitch, config.NodeSwitchCIDR),
ovnPgKeyMutex: keymutex.New(97),
ipam: ovnipam.NewIPAM(),
namedPort: NewNamedPort(),

Expand Down Expand Up @@ -541,6 +540,7 @@ func NewController(config *Configuration) *Controller {
controller.npsSynced = npInformer.Informer().HasSynced
controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp")
controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp")
controller.npKeyMutex = keymutex.New(97)
if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddNp,
UpdateFunc: controller.enqueueUpdateNp,
Expand Down
27 changes: 15 additions & 12 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *Controller) processNextUpdateNpWorkItem() bool {
}
if err := c.handleUpdateNp(key); err != nil {
c.updateNpQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
return fmt.Errorf("error syncing network policy %s: %v, requeuing", key, err)
}
c.updateNpQueue.Forget(obj)
return nil
Expand Down Expand Up @@ -138,6 +138,11 @@ func (c *Controller) handleUpdateNp(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

c.npKeyMutex.Lock(key)
defer c.npKeyMutex.Unlock(key)
klog.Infof("handle add/update network policy %s", key)

np, err := c.npsLister.NetworkPolicies(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -177,8 +182,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

npName := np.Name
nameArray := []rune(np.Name)
if !unicode.IsLetter(nameArray[0]) {
if nameArray := []rune(np.Name); !unicode.IsLetter(nameArray[0]) {
npName = "np" + np.Name
}

Expand All @@ -191,11 +195,6 @@ func (c *Controller) handleUpdateNp(key string) error {
egressAllowAsNamePrefix := strings.Replace(fmt.Sprintf("%s.%s.egress.allow", np.Name, np.Namespace), "-", ".", -1)
egressExceptAsNamePrefix := strings.Replace(fmt.Sprintf("%s.%s.egress.except", np.Name, np.Namespace), "-", ".", -1)

// delete existing pg to update acl
if err = c.ovnClient.DeletePortGroup(pgName); err != nil {
klog.Errorf("delete port group %s before networkpolicy update process: %v", pgName, err)
}

if err = c.ovnClient.CreatePortGroup(pgName, map[string]string{networkPolicyKey: np.Namespace + "/" + np.Name}); err != nil {
klog.Errorf("create port group for np %s: %v", key, err)
return err
Expand All @@ -208,8 +207,8 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}

if err := c.ovnClient.PortGroupAddPorts(pgName, ports...); err != nil {
klog.Errorf("add ports to port group %s: %v", pgName, err)
if err = c.ovnClient.PortGroupSetPorts(pgName, ports); err != nil {
klog.Errorf("failed to set ports of port group %s to %v: %v", pgName, ports, err)
return err
}

Expand Down Expand Up @@ -395,8 +394,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

var egressAclCmd []string
exist, err = c.ovnClient.PortGroupExists(pgName)
if err != nil {
if exist, err = c.ovnClient.PortGroupExists(pgName); err != nil {
klog.Errorf("failed to query np %s port group, %v", key, err)
return err
}
Expand Down Expand Up @@ -537,6 +535,11 @@ func (c *Controller) handleDeleteNp(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

c.npKeyMutex.Lock(key)
defer c.npKeyMutex.Unlock(key)
klog.Infof("handle delete network policy %s", key)

npName := name
nameArray := []rune(name)
if !unicode.IsLetter(nameArray[0]) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func (c *Controller) checkAndUpdateNodePortGroup() error {
return err
}

if err := c.ovnClient.PortGroupAddPorts(pgName, nodePorts...); err != nil {
if err = c.ovnClient.PortGroupSetPorts(pgName, nodePorts); err != nil {
klog.Errorf("add ports to port group %s: %v", pgName, err)
return err
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) {
if len(needRoutePodNets) > 0 {
if c.config.EnableNP {
for _, np := range c.podMatchNetworkPolicies(pod) {
klog.V(3).Infof("enqueue update pod %s' network policy", key)
klog.V(3).Infof("enqueue update network policy %s for pod %s", np, key)
c.updateNpQueue.Add(np)
}
}
Expand Down Expand Up @@ -742,12 +742,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN

// remove lsp from port group to make EIP/SNAT work
portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName)
c.ovnPgKeyMutex.Lock(pgName)
if err = c.ovnClient.PortGroupRemovePorts(pgName, portName); err != nil {
c.ovnPgKeyMutex.Unlock(pgName)
return err
}
c.ovnPgKeyMutex.Unlock(pgName)

} else {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType && pod.Annotations[util.NorthGatewayAnnotation] == "" {
Expand All @@ -764,16 +761,11 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN
}

portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName)
c.ovnPgKeyMutex.Lock(pgName)

if err := c.ovnClient.PortGroupAddPorts(pgName, portName); err != nil {
c.ovnPgKeyMutex.Unlock(pgName)
klog.Errorf("add port to port group %s: %v", pgName, err)
return err
}

c.ovnPgKeyMutex.Unlock(pgName)

added = true
break
}
Expand Down
17 changes: 3 additions & 14 deletions pkg/controller/security_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,12 @@ func (c *Controller) updateDenyAllSgPorts() error {
}
pgName := ovs.GetSgPortGroupName(util.DenyAllSecurityGroup)

// reset pg ports
if err := c.ovnClient.PortGroupResetPorts(pgName); err != nil {
klog.V(6).Infof("setting ports of port group %s to %v", pgName, addPorts)
if err = c.ovnClient.PortGroupSetPorts(pgName, addPorts); err != nil {
klog.Error(err)
return err
}

// add port
if len(addPorts) == 0 {
return nil
}

klog.V(6).Infof("add ports %v to port group %s", addPorts, pgName)
if err := c.ovnClient.PortGroupAddPorts(pgName, addPorts...); err != nil {
klog.Errorf("add ports to port group %s: %v", pgName, err)
return err
}

return nil
}

Expand Down Expand Up @@ -439,7 +428,7 @@ func (c *Controller) syncSgLogicalPort(key string) error {
}
}

if err := c.ovnClient.PortGroupAddPorts(sg.Status.PortGroup, ports...); err != nil {
if err = c.ovnClient.PortGroupSetPorts(sg.Status.PortGroup, ports); err != nil {
klog.Errorf("add ports to port group %s: %v", sg.Status.PortGroup, err)
return err
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,8 +1471,6 @@ func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error
}

pgName := getOverlaySubnetsPortGroupName(subnet.Name, pod.Spec.NodeName)
c.ovnPgKeyMutex.Lock(pgName)

portsToAdd := make([]string, 0, len(podPorts))
for _, port := range podPorts {
exist, err := c.ovnClient.LogicalSwitchPortExists(port)
Expand All @@ -1488,12 +1486,10 @@ func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error
portsToAdd = append(portsToAdd, port)
}

if err := c.ovnClient.PortGroupAddPorts(pgName, portsToAdd...); err != nil {
if err = c.ovnClient.PortGroupAddPorts(pgName, portsToAdd...); err != nil {
klog.Errorf("add ports to port group %s: %v", pgName, err)
return err
}

c.ovnPgKeyMutex.Unlock(pgName)
}
return nil
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ovs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type PortGroup interface {
CreatePortGroup(pgName string, externalIDs map[string]string) error
PortGroupAddPorts(pgName string, lspNames ...string) error
PortGroupRemovePorts(pgName string, lspNames ...string) error
PortGroupResetPorts(pgName string) error
PortGroupSetPorts(pgName string, ports []string) error
DeletePortGroup(pgName string) error
ListPortGroups(externalIDs map[string]string) ([]ovnnb.PortGroup, error)
GetPortGroup(pgName string, ignoreNotFound bool) (*ovnnb.PortGroup, error)
Expand Down
Loading

0 comments on commit f84343e

Please sign in to comment.