Skip to content

Commit

Permalink
Delete pbr programming of deleted endpoint right away
Browse files Browse the repository at this point in the history
* When an endpointslice is updated, if it is a delete of ip of endpoint delete it right away.
Given delay should be added only when there is addition of new ip.
* User can configure per service delay for service-graph-endpoint-add-delay

(cherry picked from commit 9b6ac13)
  • Loading branch information
akhilamohanan committed Feb 7, 2023
1 parent 62ad6c5 commit 1495504
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 38 deletions.
1 change: 1 addition & 0 deletions pkg/controller/config.go
Expand Up @@ -30,6 +30,7 @@ type OpflexGroup struct {
}

type delayService struct {
Delay int `json:"delay,omitempty"`
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/controller.go
Expand Up @@ -173,6 +173,7 @@ type AciController struct {
}

type DelayedEpSlice struct {
ServiceKey string
OldEpSlice *v1beta1.EndpointSlice
NewEpSlice *v1beta1.EndpointSlice
DelayedTime time.Time
Expand Down Expand Up @@ -541,6 +542,17 @@ func (cont *AciController) Run(stopCh <-chan struct{}) {
cont.config.SnatDefaultPortRangeEnd = defEnd
}

// Set default value for pbr programming delay if services list is not empty
// and delay value is empty
if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
cont.config.ServiceGraphEndpointAddDelay.Delay = 90
}
if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
}

// Set contract scope for snat svc graph to global by default
if cont.config.SnatSvcContractScope == "" {
cont.config.SnatSvcContractScope = "global"
Expand Down
205 changes: 168 additions & 37 deletions pkg/controller/services.go
Expand Up @@ -409,7 +409,6 @@ func apicRedirectDst(rpDn string, ip string, mac string,
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
nodeMap map[string]*metadata.ServiceEndpoint,
monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {

rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
rp.SetAttr("thresholdDownAction", "deny")
rpDn := rp.GetDn()
Expand Down Expand Up @@ -1527,7 +1526,6 @@ func (cont *AciController) serviceUpdated(old interface{}, new interface{}) {
Error("Could not create service key: ", err)
return
}

oldPorts := getServiceTargetPorts(oldservice)
newPorts := getServiceTargetPorts(newservice)
if !reflect.DeepEqual(oldPorts, newPorts) {
Expand Down Expand Up @@ -1592,10 +1590,29 @@ func (cont *AciController) getEndpointSliceIps(endpointSlice *v1beta1.EndpointSl
return ips
}

func (cont *AciController) notReadyEndpointPresent(endpointSlice *v1beta1.EndpointSlice) bool {
for _, endpoints := range endpointSlice.Endpoints {
if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
(endpoints.Conditions.Terminating != nil && !*endpoints.Conditions.Terminating) {
return true
}
}
return false
}

func (cont *AciController) getEndpointSliceEpIps(endpoints v1beta1.Endpoint) map[string]bool {
ips := make(map[string]bool)
for _, addr := range endpoints.Addresses {
ips[addr] = true
}
return ips
}

func (cont *AciController) processDelayedEpSlices() {
var processEps []DelayedEpSlice
cont.indexMutex.Lock()
for i, delayedepslice := range cont.delayedEpSlices {
for i := 0; i < len(cont.delayedEpSlices); i++ {
delayedepslice := cont.delayedEpSlices[i]
if time.Now().After(delayedepslice.DelayedTime) {
var toprocess DelayedEpSlice
err := util.DeepCopyObj(&delayedepslice, &toprocess)
Expand All @@ -1610,8 +1627,13 @@ func (cont *AciController) processDelayedEpSlices() {

cont.indexMutex.Unlock()
for _, epslice := range processEps {
cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
//ignore the epslice if newly added endpoint is not ready
if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
} else {
cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
}
}
}

Expand Down Expand Up @@ -1664,61 +1686,108 @@ func (cont *AciController) endpointSliceDeleted(obj interface{}) {
cont.queueServiceUpdateByKey(servicekey)
}

func (cont *AciController) svcInAddDelayList(name, ns string) bool {
// Checks if the given service is present in the user configured list of services
// for pbr delay and if present, returns the servie specific delay if configured
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
if svc.Name == name && svc.Namespace == ns {
return true
return svc.Delay, true
}
}
return false
return 0, false
}

func (cont *AciController) endpointSliceUpdated(oldobj interface{}, newobj interface{}) {
oldendpointslice, ok := oldobj.(*v1beta1.EndpointSlice)
if !ok {
cont.log.Error("error processing Endpointslice object: ", oldobj)
return
}
newendpointslice, ok := newobj.(*v1beta1.EndpointSlice)
if !ok {
cont.log.Error("error processing Endpointslice object: ", newobj)
return
// Check if the endpointslice update notification has any deletion of enpoint
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *v1beta1.EndpointSlice) bool {
del := false

// if any endpoint is removed from endpontslice
if len(newendpointslice.Endpoints) < len(newendpointslice.Endpoints) {
del = true
}
if cont.config.NoWaitForServiceEpReadiness == false {
proceed := true

if !del {
// if any one of the endpoint is in terminating state
for _, endpoint := range newendpointslice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
proceed = false
if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
del = true
break
}
}
if !proceed {
cont.log.Debug("New enpoints are not in ready state")
return
}
if !del {
// if any one of endpoint moved from ready state to not-ready state
for _, oldendpoint := range oldendpointslice.Endpoints {
oldips := cont.getEndpointSliceEpIps(oldendpoint)
for _, newendpoint := range newendpointslice.Endpoints {
newips := cont.getEndpointSliceEpIps(newendpoint)
if reflect.DeepEqual(oldips, newips) {
if (oldendpoint.Conditions.Ready != nil && *oldendpoint.Conditions.Ready) &&
(newendpoint.Conditions.Ready != nil && !*newendpoint.Conditions.Ready) {
del = true
}
break
}
}
}
}
return del
}

func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *v1beta1.EndpointSlice,
newendpointslice *v1beta1.EndpointSlice) {
svc, ns, valid := getServiceNameAndNs(newendpointslice)
if !valid {
return
}
var delay int
if cont.config.ServiceGraphEndpointAddDelay.Delay != 0 {
if cont.svcInAddDelayList(svc, ns) {
delay = cont.config.ServiceGraphEndpointAddDelay.Delay
cont.log.Debug("Delay of ", delay, " added for service ", svc, " in ns: ", ns)
}
svckey, valid := getServiceKey(newendpointslice)
if !valid {
return
}
if delay > 0 {
delay := cont.config.ServiceGraphEndpointAddDelay.Delay
svcDelay, exists := cont.svcInAddDelayList(svc, ns)
if svcDelay > 0 {
delay = svcDelay
}
var delayedsvc bool
delayedsvc = exists && delay > 0
if delayedsvc {
cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
var delayedepslice DelayedEpSlice
delayedepslice.OldEpSlice = oldendpointslice
delayedepslice.ServiceKey = svckey
delayedepslice.NewEpSlice = newendpointslice
currentTime := time.Now()
delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
cont.indexMutex.Lock()
cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
cont.indexMutex.Unlock()
} else {
cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
}

if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
}
return
}
func (cont *AciController) endpointSliceUpdated(oldobj interface{}, newobj interface{}) {
oldendpointslice, ok := oldobj.(*v1beta1.EndpointSlice)
if !ok {
cont.log.Error("error processing Endpointslice object: ", oldobj)
return
}
cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
newendpointslice, ok := newobj.(*v1beta1.EndpointSlice)
if !ok {
cont.log.Error("error processing Endpointslice object: ", newobj)
return
}
if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
} else {
cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
}
}

func (cont *AciController) doendpointSliceUpdated(oldendpointslice *v1beta1.EndpointSlice,
Expand All @@ -1729,13 +1798,15 @@ func (cont *AciController) doendpointSliceUpdated(oldendpointslice *v1beta1.Endp
}
oldIps := cont.getEndpointSliceIps(oldendpointslice)
newIps := cont.getEndpointSliceIps(newendpointslice)
if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
if !reflect.DeepEqual(oldIps, newIps) {
cont.indexMutex.Lock()
cont.queueIPNetPolUpdates(oldIps)
cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
cont.queueIPNetPolUpdates(newIps)
cont.indexMutex.Unlock()
}

if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
cont.queueEndpointSliceNetPolUpdates(newendpointslice)
}
Expand Down Expand Up @@ -1840,6 +1911,61 @@ func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoi

}

// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
// 1. endpoint not present in delayedEpSlices of the service
// 2. endpoint present in delayedEpSlices of the service but in not ready state
//
// indexMutex lock must be acquired before calling the function
func (cont *AciController) isDelayedEndpoint(endpoint v1beta1.Endpoint, svckey string) bool {
delayed := false
endpointips := cont.getEndpointSliceEpIps(endpoint)
for _, delayedepslices := range cont.delayedEpSlices {
if delayedepslices.ServiceKey == svckey {
var found bool
epslice := delayedepslices.OldEpSlice
for _, ep := range epslice.Endpoints {
epips := cont.getEndpointSliceEpIps(ep)
if reflect.DeepEqual(endpointips, epips) {
// case 2
if ep.Conditions.Ready != nil && !*ep.Conditions.Ready {
delayed = true
}
found = true
}
}
// case 1
if !found {
delayed = true
}
}
}
return delayed
}

// set nodemap only if endoint is ready and not in delayedEpSlices
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
endpoint v1beta1.Endpoint, service *v1.Service) {

svckey, err := cache.MetaNamespaceKeyFunc(service)
if err != nil {
cont.log.Error("Could not create service key: ", err)
return
}
if cont.config.NoWaitForServiceEpReadiness ||
(endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {

nodeName, ok := endpoint.Topology["kubernetes.io/hostname"]
if ok {
// donot setNodeMap for endpoint if:
// endpoint is newly added
// endpoint status changed from not ready to ready
if !cont.isDelayedEndpoint(endpoint, svckey) {
cont.setNodeMap(nodeMap, nodeName)
}
}
}
}

func (sep *serviceEndpoint) GetnodesMetadata(key string,
service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
cont := sep.cont
Expand Down Expand Up @@ -1873,9 +1999,14 @@ func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
func(endpointSliceobj interface{}) {
endpointSlices := endpointSliceobj.(*v1beta1.EndpointSlice)
for _, endpoint := range endpointSlices.Endpoints {
nodeName, ok := endpoint.Topology["kubernetes.io/hostname"]
if ok {
cont.setNodeMap(nodeMap, nodeName)
if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
cont.setNodeMapDelay(nodeMap, endpoint, service)
} else if cont.config.NoWaitForServiceEpReadiness ||
(endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
nodeName, ok := endpoint.Topology["kubernetes.io/hostname"]
if ok {
cont.setNodeMap(nodeMap, nodeName)
}
}
}
})
Expand Down

0 comments on commit 1495504

Please sign in to comment.