Skip to content

Commit

Permalink
Merge pull request #1078 from tssurya/merge-04-05-2022
Browse files Browse the repository at this point in the history
Bug 2070929: Downstream Merge: 04-05-2022
  • Loading branch information
openshift-merge-robot committed May 5, 2022
2 parents e8d2b75 + ac5b231 commit 401cfc2
Show file tree
Hide file tree
Showing 21 changed files with 1,409 additions and 52 deletions.
2 changes: 1 addition & 1 deletion dist/templates/ovnkube-db-raft.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:

# ovndb-raft PodDisruptBudget to prevent majority of ovnkube raft cluster
# nodes from disruption
apiVersion: policy/v1beta1
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: ovndb-raft-pdb
Expand Down
1 change: 0 additions & 1 deletion go-controller/pkg/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (k *Kube) UpdateEgressIP(eIP *egressipv1.EgressIP) error {
}

func (k *Kube) PatchEgressIP(name string, patchData []byte) error {
klog.Infof("Patching status on EgressIP %s", name)
_, err := k.EIPClient.K8sV1().EgressIPs().Patch(context.TODO(), name, types.JSONPatchType, patchData, metav1.PatchOptions{})
return err
}
Expand Down
14 changes: 13 additions & 1 deletion go-controller/pkg/metrics/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,15 @@ func (cpr *ControlPlaneRecorder) Run(sbClient libovsdbclient.Client, stop <-chan
if table != portBindingTable {
return
}
cpr.queue.Add(item{op: updatePortBinding, old: old, new: new, t: time.Now()})
oldRow := old.(*sbdb.PortBinding)
newRow := new.(*sbdb.PortBinding)
// chassis assigned
if oldRow.Chassis == nil && newRow.Chassis != nil {
cpr.queue.Add(item{op: updatePortBinding, old: old, new: new, t: time.Now()})
// port binding up
} else if oldRow.Up != nil && !*oldRow.Up && newRow.Up != nil && *newRow.Up {
cpr.queue.Add(item{op: updatePortBinding, old: old, new: new, t: time.Now()})
}
},
DeleteFunc: func(table string, model model.Model) {
},
Expand Down Expand Up @@ -620,13 +628,17 @@ func (cpr *ControlPlaneRecorder) updatePortBinding(old, new model.Model, t time.
klog.V(5).Infof("Port binding update expected pod with UID %q in cache", podUID)
return
}

if oldRow.Chassis == nil && newRow.Chassis != nil && r.timestampType == portBinding {
metricPortBindingChassisLatency.Observe(t.Sub(r.timestamp).Seconds())
r.timestamp = t
r.timestampType = portBindingChassis

}

if oldRow.Up != nil && !*oldRow.Up && newRow.Up != nil && *newRow.Up && r.timestampType == portBindingChassis {
metricPortBindingUpLatency.Observe(t.Sub(r.timestamp).Seconds())
delete(cpr.podRecords, podUID)
}
}

Expand Down
21 changes: 19 additions & 2 deletions go-controller/pkg/ovn/address_set/fake_address_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,25 @@ func (as *fakeAddressSets) GetIPs() ([]string, []string) {
}

func (as *fakeAddressSets) SetIPs(ips []net.IP) error {
// NOOP
return nil
allIPs := []net.IP{}
if as.ipv4 != nil {
for _, ip := range as.ipv4.ips {
allIPs = append(allIPs, ip)
}
}

if as.ipv6 != nil {
for _, ip := range as.ipv6.ips {
allIPs = append(allIPs, ip)
}
}

err := as.DeleteIPs(allIPs)
if err != nil {
return err
}

return as.AddIPs(ips)
}

func (as *fakeAddressSets) DeleteIPs(ips []net.IP) error {
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/ovn/controller/services/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/assert"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilpointer "k8s.io/utils/pointer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ import (
"golang.org/x/time/rate"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"

coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
discoverylisters "k8s.io/client-go/listers/discovery/v1beta1"
discoverylisters "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
libovsdbtest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing/libovsdb"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -45,15 +45,15 @@ func newControllerWithDBSetup(dbSetup libovsdbtest.TestSetup) (*serviceControlle
controller := NewController(client,
nbClient,
informerFactory.Core().V1().Services(),
informerFactory.Discovery().V1beta1().EndpointSlices(),
informerFactory.Discovery().V1().EndpointSlices(),
informerFactory.Core().V1().Nodes(),
)
controller.servicesSynced = alwaysReady
controller.endpointSlicesSynced = alwaysReady
return &serviceController{
controller,
informerFactory.Core().V1().Services().Informer().GetStore(),
informerFactory.Discovery().V1beta1().EndpointSlices().Informer().GetStore(),
informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(),
cleanup,
}, nil
}
Expand Down
72 changes: 57 additions & 15 deletions go-controller/pkg/ovn/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err erro
// empty selectors, matching everything, whereas we would mean the inverse
newNamespaceSelector, _ := metav1.LabelSelectorAsSelector(nil)
oldNamespaceSelector, _ := metav1.LabelSelectorAsSelector(nil)
// Initialize a sets.String which holds egress IPs that were not fully assigned
// but are allocated and they are meant to be removed.
staleEgressIPs := sets.NewString()
if old != nil {
oldEIP = old
oldNamespaceSelector, err = metav1.LabelSelectorAsSelector(&oldEIP.Spec.NamespaceSelector)
Expand All @@ -80,6 +83,7 @@ func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err erro
}
name = oldEIP.Name
status = oldEIP.Status.Items
staleEgressIPs.Insert(oldEIP.Spec.EgressIPs...)
}
if new != nil {
newEIP = new
Expand All @@ -89,6 +93,13 @@ func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err erro
}
name = newEIP.Name
status = newEIP.Status.Items
if staleEgressIPs.Len() > 0 {
for _, egressIP := range newEIP.Spec.EgressIPs {
if staleEgressIPs.Has(egressIP) {
staleEgressIPs.Delete(egressIP)
}
}
}
}

// We do not initialize a nothing selector for the podSelector, because
Expand Down Expand Up @@ -219,6 +230,23 @@ func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err erro
// performed and avoid incorrect future assignments due to a
// de-synchronized cache.
oc.addAllocatorEgressIPAssignments(name, statusToKeep)

// When egress IP is not fully assigned to a node, then statusToRemove may not
// have those entries, hence retrieve it from staleEgressIPs for removing
// the item from cloudprivateipconfig.
for _, toRemove := range statusToRemove {
if !staleEgressIPs.Has(toRemove.EgressIP) {
continue
}
staleEgressIPs.Delete(toRemove.EgressIP)
}
for staleEgressIP := range staleEgressIPs {
if oc.deleteAllocatorEgressIPAssignmentIfExists(name, staleEgressIP) {
statusToRemove = append(statusToRemove,
egressipv1.EgressIPStatusItem{EgressIP: staleEgressIP})
}
}

// Execute CloudPrivateIPConfig changes for assignments which need to be
// added/removed, assignments which don't change do not require any
// further setup.
Expand Down Expand Up @@ -980,9 +1008,6 @@ func (oc *Controller) addPodEgressIPAssignments(name string, statusAssignments [
podIPs: logicalPort.ips,
}
oc.eIPC.podAssignment[podKey] = podState
if err := oc.eIPC.deletePerPodGRSNAT(pod, logicalPort.ips); err != nil {
return err
}
} else {
for _, status := range statusAssignments {
if _, exists := podState.egressStatuses[status]; !exists {
Expand All @@ -1000,6 +1025,20 @@ func (oc *Controller) addPodEgressIPAssignments(name string, statusAssignments [
return nil
}

// deleteAllocatorEgressIPAssignmentIfExists deletes egressIP config from node allocations map
// if the entry is available and returns true, otherwise returns false.
func (oc *Controller) deleteAllocatorEgressIPAssignmentIfExists(name, egressIP string) bool {
oc.eIPC.allocator.Lock()
defer oc.eIPC.allocator.Unlock()
for _, eNode := range oc.eIPC.allocator.cache {
if egressIPName, exists := eNode.allocations[egressIP]; exists && egressIPName == name {
delete(eNode.allocations, egressIP)
return true
}
}
return false
}

// deleteAllocatorEgressIPAssignments deletes the allocation as to keep the
// cache state correct, also see addAllocatorEgressIPAssignments
func (oc *Controller) deleteAllocatorEgressIPAssignments(statusAssignments []egressipv1.EgressIPStatusItem) {
Expand Down Expand Up @@ -1027,13 +1066,11 @@ func (oc *Controller) deleteEgressIPAssignments(name string, statusesToRemove []
}
for podKey, podStatus := range oc.eIPC.podAssignment {
delete(podStatus.egressStatuses, statusToRemove)
if len(podStatus.egressStatuses) == 0 {
podNamespace, podName := getPodNamespaceAndNameFromKey(podKey)
if err := oc.eIPC.addPerPodGRSNAT(podNamespace, podName, podStatus.podIPs); err != nil {
return err
}
delete(oc.eIPC.podAssignment, podKey)
podNamespace, podName := getPodNamespaceAndNameFromKey(podKey)
if err := oc.eIPC.addPerPodGRSNAT(podNamespace, podName, podStatus.podIPs); err != nil {
return err
}
delete(oc.eIPC.podAssignment, podKey)
}
}
return nil
Expand Down Expand Up @@ -1077,9 +1114,6 @@ func (oc *Controller) deletePodEgressIPAssignments(name string, statusesToRemove
}
delete(podStatus.egressStatuses, statusToRemove)
}
if len(podStatus.egressStatuses) > 0 {
return nil
}
if err := oc.eIPC.addPerPodGRSNAT(pod.Namespace, pod.Name, podStatus.podIPs); err != nil {
return err
}
Expand Down Expand Up @@ -1337,6 +1371,7 @@ type EgressIPPatchStatus struct {
// object update which risks resetting the EgressIP object's fields to the state
// they had when we started processing the change.
func (oc *Controller) patchReplaceEgressIPStatus(name string, statusItems []egressipv1.EgressIPStatusItem) error {
klog.Infof("Patching status on EgressIP %s: %v", name, statusItems)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
t := []EgressIPPatchStatus{
{
Expand Down Expand Up @@ -1790,6 +1825,9 @@ type egressIPController struct {
// (routing pod traffic to the egress node) and NAT objects on the egress node
// (SNAT-ing to the egress IP).
func (e *egressIPController) addPodEgressIPAssignment(egressIPName string, status egressipv1.EgressIPStatusItem, pod *kapi.Pod, podIPs []*net.IPNet) (err error) {
if err := e.deletePerPodGRSNAT(pod, podIPs, status); err != nil {
return err
}
if err := e.handleEgressReroutePolicy(podIPs, status, egressIPName, e.createEgressReroutePolicy); err != nil {
return fmt.Errorf("unable to create logical router policy, err: %v", err)
}
Expand Down Expand Up @@ -1843,9 +1881,10 @@ func (e *egressIPController) addPerPodGRSNAT(podNamespace, podName string, podIP
return nil
}

func (e *egressIPController) deletePerPodGRSNAT(pod *kapi.Pod, podIPs []*net.IPNet) error {
if config.Gateway.DisableSNATMultipleGWs {
// remove snats to->nodeIP (from the node where pod exists) for these podIPs before adding the snat to->egressIP
func (e *egressIPController) deletePerPodGRSNAT(pod *kapi.Pod, podIPs []*net.IPNet, status egressipv1.EgressIPStatusItem) error {
if config.Gateway.DisableSNATMultipleGWs && status.Node == pod.Spec.NodeName {
// remove snats to->nodeIP (from the node where pod exists if that node is also serving
// as an egress node for this pod) for these podIPs before adding the snat to->egressIP
extIPs, err := getExternalIPsGRSNAT(e.watchFactory, pod.Spec.NodeName)
if err != nil {
return err
Expand All @@ -1854,6 +1893,9 @@ func (e *egressIPController) deletePerPodGRSNAT(pod *kapi.Pod, podIPs []*net.IPN
if err != nil {
return err
}
} else if config.Gateway.DisableSNATMultipleGWs {
// it means the node on which the pod is is different from the egressNode that is managing the pod
klog.V(5).Infof("Not deleting SNAT on %s since egress node managing %s/%s is %s", pod.Spec.NodeName, pod.Namespace, pod.Name, status.Node)
}
return nil
}
Expand Down

0 comments on commit 401cfc2

Please sign in to comment.