diff --git a/pkg/controller/config.go b/pkg/controller/config.go index 6a4b1b669b..0b3f9529af 100644 --- a/pkg/controller/config.go +++ b/pkg/controller/config.go @@ -30,6 +30,7 @@ type OpflexGroup struct { } type delayService struct { + Delay int `json:"delay,omitempty"` Name string `json:"name,omitempty"` Namespace string `json:"namespace,omitempty"` } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e7f7d0ccdd..cae6a30713 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -173,6 +173,7 @@ type AciController struct { } type DelayedEpSlice struct { + ServiceKey string OldEpSlice *v1beta1.EndpointSlice NewEpSlice *v1beta1.EndpointSlice DelayedTime time.Time @@ -541,6 +542,17 @@ func (cont *AciController) Run(stopCh <-chan struct{}) { cont.config.SnatDefaultPortRangeEnd = defEnd } + // Set default value for pbr programming delay if services list is not empty + // and delay value is empty + if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 && + cont.config.ServiceGraphEndpointAddDelay.Services != nil && + len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 { + cont.config.ServiceGraphEndpointAddDelay.Delay = 90 + } + if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 { + cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay) + } + // Set contract scope for snat svc graph to global by default if cont.config.SnatSvcContractScope == "" { cont.config.SnatSvcContractScope = "global" diff --git a/pkg/controller/services.go b/pkg/controller/services.go index ecb5a879f7..797ea6a62b 100644 --- a/pkg/controller/services.go +++ b/pkg/controller/services.go @@ -409,7 +409,6 @@ func apicRedirectDst(rpDn string, ip string, mac string, func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string, nodeMap map[string]*metadata.ServiceEndpoint, monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) { - rp := apicapi.NewVnsSvcRedirectPol(tenantName, name) rp.SetAttr("thresholdDownAction", "deny") rpDn := rp.GetDn() @@ -1527,7 +1526,6 @@ func (cont *AciController) serviceUpdated(old interface{}, new interface{}) { Error("Could not create service key: ", err) return } - oldPorts := getServiceTargetPorts(oldservice) newPorts := getServiceTargetPorts(newservice) if !reflect.DeepEqual(oldPorts, newPorts) { @@ -1592,10 +1590,29 @@ func (cont *AciController) getEndpointSliceIps(endpointSlice *v1beta1.EndpointSl return ips } +func (cont *AciController) notReadyEndpointPresent(endpointSlice *v1beta1.EndpointSlice) bool { + for _, endpoints := range endpointSlice.Endpoints { + if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) && + (endpoints.Conditions.Terminating != nil && !*endpoints.Conditions.Terminating) { + return true + } + } + return false +} + +func (cont *AciController) getEndpointSliceEpIps(endpoints v1beta1.Endpoint) map[string]bool { + ips := make(map[string]bool) + for _, addr := range endpoints.Addresses { + ips[addr] = true + } + return ips +} + func (cont *AciController) processDelayedEpSlices() { var processEps []DelayedEpSlice cont.indexMutex.Lock() - for i, delayedepslice := range cont.delayedEpSlices { + for i := 0; i < len(cont.delayedEpSlices); i++ { + delayedepslice := cont.delayedEpSlices[i] if time.Now().After(delayedepslice.DelayedTime) { var toprocess DelayedEpSlice err := util.DeepCopyObj(&delayedepslice, &toprocess) @@ -1610,8 +1627,13 @@ func (cont *AciController) processDelayedEpSlices() { cont.indexMutex.Unlock() for _, epslice := range processEps { - cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice) - cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice) + //ignore the epslice if newly added endpoint is not ready + if cont.notReadyEndpointPresent(epslice.NewEpSlice) { + cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice) + } else { + cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice) + cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice) + } } } @@ -1664,61 +1686,108 @@ func (cont *AciController) endpointSliceDeleted(obj interface{}) { cont.queueServiceUpdateByKey(servicekey) } -func (cont *AciController) svcInAddDelayList(name, ns string) bool { +// Checks if the given service is present in the user configured list of services +// for pbr delay and if present, returns the servie specific delay if configured +func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) { for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services { if svc.Name == name && svc.Namespace == ns { - return true + return svc.Delay, true } } - return false + return 0, false } -func (cont *AciController) endpointSliceUpdated(oldobj interface{}, newobj interface{}) { - oldendpointslice, ok := oldobj.(*v1beta1.EndpointSlice) - if !ok { - cont.log.Error("error processing Endpointslice object: ", oldobj) - return - } - newendpointslice, ok := newobj.(*v1beta1.EndpointSlice) - if !ok { - cont.log.Error("error processing Endpointslice object: ", newobj) - return +// Check if the endpointslice update notification has any deletion of enpoint +func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *v1beta1.EndpointSlice) bool { + del := false + + // if any endpoint is removed from endpontslice + if len(newendpointslice.Endpoints) < len(newendpointslice.Endpoints) { + del = true } - if cont.config.NoWaitForServiceEpReadiness == false { - proceed := true + + if !del { + // if any one of the endpoint is in terminating state for _, endpoint := range newendpointslice.Endpoints { - if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { - proceed = false + if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating { + del = true + break } } - if !proceed { - cont.log.Debug("New enpoints are not in ready state") - return + } + if !del { + // if any one of endpoint moved from ready state to not-ready state + for _, oldendpoint := range oldendpointslice.Endpoints { + oldips := cont.getEndpointSliceEpIps(oldendpoint) + for _, newendpoint := range newendpointslice.Endpoints { + newips := cont.getEndpointSliceEpIps(newendpoint) + if reflect.DeepEqual(oldips, newips) { + if (oldendpoint.Conditions.Ready != nil && *oldendpoint.Conditions.Ready) && + (newendpoint.Conditions.Ready != nil && !*newendpoint.Conditions.Ready) { + del = true + } + break + } + } } } + return del +} + +func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *v1beta1.EndpointSlice, + newendpointslice *v1beta1.EndpointSlice) { svc, ns, valid := getServiceNameAndNs(newendpointslice) if !valid { return } - var delay int - if cont.config.ServiceGraphEndpointAddDelay.Delay != 0 { - if cont.svcInAddDelayList(svc, ns) { - delay = cont.config.ServiceGraphEndpointAddDelay.Delay - cont.log.Debug("Delay of ", delay, " added for service ", svc, " in ns: ", ns) - } + svckey, valid := getServiceKey(newendpointslice) + if !valid { + return } - if delay > 0 { + delay := cont.config.ServiceGraphEndpointAddDelay.Delay + svcDelay, exists := cont.svcInAddDelayList(svc, ns) + if svcDelay > 0 { + delay = svcDelay + } + var delayedsvc bool + delayedsvc = exists && delay > 0 + if delayedsvc { + cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns) var delayedepslice DelayedEpSlice delayedepslice.OldEpSlice = oldendpointslice + delayedepslice.ServiceKey = svckey delayedepslice.NewEpSlice = newendpointslice currentTime := time.Now() delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second) cont.indexMutex.Lock() cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice) cont.indexMutex.Unlock() + } else { + cont.doendpointSliceUpdated(oldendpointslice, newendpointslice) + } + + if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) { + cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint") + cont.doendpointSliceUpdated(oldendpointslice, newendpointslice) + } + return +} +func (cont *AciController) endpointSliceUpdated(oldobj interface{}, newobj interface{}) { + oldendpointslice, ok := oldobj.(*v1beta1.EndpointSlice) + if !ok { + cont.log.Error("error processing Endpointslice object: ", oldobj) return } - cont.doendpointSliceUpdated(oldendpointslice, newendpointslice) + newendpointslice, ok := newobj.(*v1beta1.EndpointSlice) + if !ok { + cont.log.Error("error processing Endpointslice object: ", newobj) + return + } + if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 { + cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice) + } else { + cont.doendpointSliceUpdated(oldendpointslice, newendpointslice) + } } func (cont *AciController) doendpointSliceUpdated(oldendpointslice *v1beta1.EndpointSlice, @@ -1729,13 +1798,15 @@ func (cont *AciController) doendpointSliceUpdated(oldendpointslice *v1beta1.Endp } oldIps := cont.getEndpointSliceIps(oldendpointslice) newIps := cont.getEndpointSliceIps(newendpointslice) - if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) { + if !reflect.DeepEqual(oldIps, newIps) { cont.indexMutex.Lock() cont.queueIPNetPolUpdates(oldIps) cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey) cont.queueIPNetPolUpdates(newIps) cont.indexMutex.Unlock() + } + if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) { cont.queueEndpointSliceNetPolUpdates(oldendpointslice) cont.queueEndpointSliceNetPolUpdates(newendpointslice) } @@ -1840,6 +1911,61 @@ func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoi } +// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices: +// 1. endpoint not present in delayedEpSlices of the service +// 2. endpoint present in delayedEpSlices of the service but in not ready state +// +// indexMutex lock must be acquired before calling the function +func (cont *AciController) isDelayedEndpoint(endpoint v1beta1.Endpoint, svckey string) bool { + delayed := false + endpointips := cont.getEndpointSliceEpIps(endpoint) + for _, delayedepslices := range cont.delayedEpSlices { + if delayedepslices.ServiceKey == svckey { + var found bool + epslice := delayedepslices.OldEpSlice + for _, ep := range epslice.Endpoints { + epips := cont.getEndpointSliceEpIps(ep) + if reflect.DeepEqual(endpointips, epips) { + // case 2 + if ep.Conditions.Ready != nil && !*ep.Conditions.Ready { + delayed = true + } + found = true + } + } + // case 1 + if !found { + delayed = true + } + } + } + return delayed +} + +// set nodemap only if endoint is ready and not in delayedEpSlices +func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint, + endpoint v1beta1.Endpoint, service *v1.Service) { + + svckey, err := cache.MetaNamespaceKeyFunc(service) + if err != nil { + cont.log.Error("Could not create service key: ", err) + return + } + if cont.config.NoWaitForServiceEpReadiness || + (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) { + + nodeName, ok := endpoint.Topology["kubernetes.io/hostname"] + if ok { + // donot setNodeMap for endpoint if: + // endpoint is newly added + // endpoint status changed from not ready to ready + if !cont.isDelayedEndpoint(endpoint, svckey) { + cont.setNodeMap(nodeMap, nodeName) + } + } + } +} + func (sep *serviceEndpoint) GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) { cont := sep.cont @@ -1873,9 +1999,14 @@ func (seps *serviceEndpointSlice) GetnodesMetadata(key string, func(endpointSliceobj interface{}) { endpointSlices := endpointSliceobj.(*v1beta1.EndpointSlice) for _, endpoint := range endpointSlices.Endpoints { - nodeName, ok := endpoint.Topology["kubernetes.io/hostname"] - if ok { - cont.setNodeMap(nodeMap, nodeName) + if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 { + cont.setNodeMapDelay(nodeMap, endpoint, service) + } else if cont.config.NoWaitForServiceEpReadiness || + (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) { + nodeName, ok := endpoint.Topology["kubernetes.io/hostname"] + if ok { + cont.setNodeMap(nodeMap, nodeName) + } } } }) diff --git a/pkg/controller/services_test.go b/pkg/controller/services_test.go index 6975d365db..da34b58662 100644 --- a/pkg/controller/services_test.go +++ b/pkg/controller/services_test.go @@ -842,6 +842,7 @@ func TestEndpointsliceIpIndex(t *testing.T) { // Service annotation test with EndPointSlice func TestServiceAnnotationWithEps(t *testing.T) { cont := sgCont() + ready := true name := "kube_svc_testns_service1" nameS2 := "kube_svc_testns_service2" graphName := "kube_svc_global" @@ -918,6 +919,9 @@ func TestServiceAnnotationWithEps(t *testing.T) { Addresses: []string{ "1.1.1.1", }, + Conditions: v1beta1.EndpointConditions{ + Ready: &ready, + }, Topology: map[string]string{ "kubernetes.io/hostname": "node1", }, @@ -926,6 +930,9 @@ func TestServiceAnnotationWithEps(t *testing.T) { Addresses: []string{ "1.1.1.2", }, + Conditions: v1beta1.EndpointConditions{ + Ready: &ready, + }, Topology: map[string]string{ "kubernetes.io/hostname": "node2", }, @@ -1032,9 +1039,10 @@ func TestServiceAnnotationWithEps(t *testing.T) { cont.stop() } -//Service graph test with EndPoint slices +// Service graph test with EndPoint slices func TestServiceGraphiWithEps(t *testing.T) { cont := sgCont() + ready := true graphName := "kube_svc_global" cluster := func(nmap map[string]string) apicapi.ApicObject { var nodes []string @@ -1122,6 +1130,9 @@ func TestServiceGraphiWithEps(t *testing.T) { Addresses: []string{ "1.1.1.1", }, + Conditions: v1beta1.EndpointConditions{ + Ready: &ready, + }, Topology: map[string]string{ "kubernetes.io/hostname": "node1", }, @@ -1130,6 +1141,9 @@ func TestServiceGraphiWithEps(t *testing.T) { Addresses: []string{ "1.1.1.2", }, + Conditions: v1beta1.EndpointConditions{ + Ready: &ready, + }, Topology: map[string]string{ "kubernetes.io/hostname": "node2", }, @@ -1141,6 +1155,9 @@ func TestServiceGraphiWithEps(t *testing.T) { Addresses: []string{ "1.1.1.2", }, + Conditions: v1beta1.EndpointConditions{ + Ready: &ready, + }, Topology: map[string]string{ "kubernetes.io/hostname": "node2", },