Skip to content

Commit

Permalink
chore: add endpointslice cache
Browse files Browse the repository at this point in the history
  • Loading branch information
nilo19 committed Aug 17, 2023
1 parent f50daac commit 4e3536d
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ type Cloud struct {
multipleStandardLoadBalancersActiveServicesLock sync.Mutex
multipleStandardLoadBalancersActiveNodesLock sync.Mutex
localServiceNameToServiceInfoMap sync.Map
endpointSlicesCache sync.Map
}

// NewCloud returns a Cloud with initialized clients
Expand Down
46 changes: 45 additions & 1 deletion pkg/provider/azure_loadbalancer_backendpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestEnsureHostsInPoolNodeIP(t *testing.T) {
local bool
notFound bool
skip bool
cache bool
expectedBackendPool network.BackendAddressPool
}{
{
Expand Down Expand Up @@ -293,6 +294,42 @@ func TestEnsureHostsInPoolNodeIP(t *testing.T) {
},
},
},
{
desc: "local service with its endpoint slice in cache",
local: true,
backendPool: network.BackendAddressPool{
Name: pointer.String("default-svc-1"),
BackendAddressPoolPropertiesFormat: &network.BackendAddressPoolPropertiesFormat{
LoadBalancerBackendAddresses: &[]network.LoadBalancerBackendAddress{},
},
},
multiSLBConfigs: []MultipleStandardLoadBalancerConfiguration{
{
Name: "kubernetes",
},
},
expectedBackendPool: network.BackendAddressPool{
Name: pointer.String("default-svc-1"),
BackendAddressPoolPropertiesFormat: &network.BackendAddressPoolPropertiesFormat{
VirtualNetwork: &network.SubResource{ID: pointer.String("/subscriptions/subscription/resourceGroups/rg/providers/Microsoft.Network/virtualNetworks/vnet")},
LoadBalancerBackendAddresses: &[]network.LoadBalancerBackendAddress{
{
Name: pointer.String("vmss-0"),
LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{
IPAddress: pointer.String("10.0.0.2"),
},
},
{
Name: pointer.String("vmss-1"),
LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{
IPAddress: pointer.String("10.0.0.1"),
},
},
},
},
},
cache: true,
},
}

for _, tc := range testcases {
Expand Down Expand Up @@ -334,7 +371,14 @@ func TestEnsureHostsInPoolNodeIP(t *testing.T) {
az.localServiceNameToServiceInfoMap.Store("default/svc-1", &serviceInfo{lbName: "lb"})
}

kubeClient := fake.NewSimpleClientset(getTestEndpointSlice("eps", "default", "svc-1", "vmss-0", "vmss-1"))
var kubeClient *fake.Clientset
eps := getTestEndpointSlice("eps", "default", "svc-1", "vmss-0", "vmss-1")
if !tc.cache {
kubeClient = fake.NewSimpleClientset(eps)
} else {
kubeClient = fake.NewSimpleClientset()
az.endpointSlicesCache.Store("eps", eps)
}
az.KubeClient = kubeClient
az.nodePrivateIPs = map[string]sets.Set[string]{
"vmss-0": sets.New[string]("1.2.3.4"),
Expand Down
55 changes: 40 additions & 15 deletions pkg/provider/azure_local_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInf
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices().Informer()
_, _ = endpointSlicesInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
es := obj.(*discovery_v1.EndpointSlice)
az.endpointSlicesCache.Store(strings.ToLower(es.Name), es)
},
UpdateFunc: func(oldObj, newObj interface{}) {
previousES := oldObj.(*discovery_v1.EndpointSlice)
newES := newObj.(*discovery_v1.EndpointSlice)
Expand All @@ -301,6 +305,7 @@ func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInf
}

klog.V(4).Infof("Detecting EndpointSlice %s/%s update", newES.Namespace, newES.Name)
az.endpointSlicesCache.Store(strings.ToLower(newES.Name), newES)

key := strings.ToLower(fmt.Sprintf("%s/%s", newES.Namespace, svcName))
si, found := az.getLocalServiceInfo(key)
Expand Down Expand Up @@ -357,6 +362,10 @@ func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInf
}
}
},
DeleteFunc: func(obj interface{}) {
es := obj.(*discovery_v1.EndpointSlice)
az.endpointSlicesCache.Delete(strings.ToLower(es.Name))
},
})
}

Expand Down Expand Up @@ -461,25 +470,41 @@ func newServiceInfo(ipFamily, lbName string) *serviceInfo {

// getLocalServiceEndpointsNodeNames gets the node names that host all endpoints of the local service.
func (az *Cloud) getLocalServiceEndpointsNodeNames(service *v1.Service) (sets.Set[string], error) {
eps, err := az.KubeClient.DiscoveryV1().EndpointSlices(service.Namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to list EndpointSlices for service %s/%s: %s", service.Namespace, service.Name, err.Error())
return nil, err
}

for _, endpointSlice := range eps.Items {
endpointSlice := endpointSlice
if strings.EqualFold(getServiceNameOfEndpointSlice(&endpointSlice), service.Name) {
var nodeNames []string
for _, ep := range endpointSlice.Endpoints {
klog.V(4).Infof("EndpointSlice %s/%s has endpoint %s on node %s", endpointSlice.Namespace, endpointSlice.Name, ep.Addresses, pointer.StringDeref(ep.NodeName, ""))
nodeNames = append(nodeNames, pointer.StringDeref(ep.NodeName, ""))
var ep *discovery_v1.EndpointSlice
az.endpointSlicesCache.Range(func(key, value interface{}) bool {
endpointSlice := value.(*discovery_v1.EndpointSlice)
if strings.EqualFold(getServiceNameOfEndpointSlice(endpointSlice), service.Name) {
ep = endpointSlice
return false
}
return true
})
if ep == nil {
klog.Infof("EndpointSlice for service %s/%s not found, try to list EndpointSlices", service.Namespace, service.Name)
eps, err := az.KubeClient.DiscoveryV1().EndpointSlices(service.Namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to list EndpointSlices for service %s/%s: %s", service.Namespace, service.Name, err.Error())
return nil, err
}
for _, endpointSlice := range eps.Items {
endpointSlice := endpointSlice
if strings.EqualFold(getServiceNameOfEndpointSlice(&endpointSlice), service.Name) {
ep = &endpointSlice
break
}
return sets.New[string](nodeNames...), nil
}
}
if ep == nil {
return nil, fmt.Errorf("failed to find EndpointSlice for service %s/%s", service.Namespace, service.Name)
}

var nodeNames []string
for _, endpoint := range ep.Endpoints {
klog.V(4).Infof("EndpointSlice %s/%s has endpoint %s on node %s", ep.Namespace, ep.Name, endpoint.Addresses, pointer.StringDeref(endpoint.NodeName, ""))
nodeNames = append(nodeNames, pointer.StringDeref(endpoint.NodeName, ""))
}

return nil, nil
return sets.New[string](nodeNames...), nil
}

// cleanupLocalServiceBackendPool cleans up the backend pool of
Expand Down

0 comments on commit 4e3536d

Please sign in to comment.