Skip to content

Commit

Permalink
Process an endpoint set that has a desired port name (#88)
Browse files Browse the repository at this point in the history
If K8s svc has several ports, K8s API may return Endpoints of that svc
in arbitrary order. Processing only the first element of EndpointSet can
cause error similar to this one:

```
E0423 15:00:32.251020       1 endpoints_watch.go:86] error while
building backend list: port 'varnish-port' not found in endpoint list
```

Instead of working with the first element only, this commit introduces
an additional function that tries to find an EndpointSet with the
desired port name.

Co-authored-by: mqmr <me@mqmr.com>
Co-authored-by: Martin Helmich <m.helmich@mittwald.de>
  • Loading branch information
3 people committed Aug 25, 2021
1 parent f9bf0e8 commit 2265347
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
14 changes: 14 additions & 0 deletions pkg/watcher/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,17 @@ func EndpointListFromSubset(ep v1.EndpointSubset, portName string) (EndpointList

return l, nil
}

// EndpointSubsetIndex returns the index of EndpointSubset for which port.Name
// matches provided portName
func EndpointSubsetIndex(es []v1.EndpointSubset, portName string) (int, error) {
for i, set := range es {
for _, port := range set.Ports {
if port.Name == portName {
return i, nil
}
}
}

return -1, fmt.Errorf("port %q is not found in endpoint subsets", portName)
}
25 changes: 20 additions & 5 deletions pkg/watcher/endpoints_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,36 @@ func (v *EndpointWatcher) watch(updates chan *EndpointConfig, errors chan error)

endpoint := ev.Object.(*v1.Endpoints)

if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 {
if len(endpoint.Subsets) == 0 {
glog.Warningf("service '%s' has no endpoints", v.serviceName)

v.endpointConfig = NewEndpointConfig()

continue
}

if v.endpointConfig.Endpoints.EqualsEndpoints(endpoint.Subsets[0]) {
endpointSubsetIndex, err := EndpointSubsetIndex(endpoint.Subsets, v.portName)
if err != nil {
glog.Warning(err)
v.endpointConfig = NewEndpointConfig()

continue
}

if len(endpoint.Subsets[endpointSubsetIndex].Addresses) == 0 {
glog.Warningf("service '%s' has no endpoints", v.serviceName)
v.endpointConfig = NewEndpointConfig()

continue
}

if v.endpointConfig.Endpoints.EqualsEndpoints(endpoint.Subsets[endpointSubsetIndex]) {
glog.Infof("endpoints did not change")
continue
}

var addresses []v1.EndpointAddress
for _, a := range endpoint.Subsets[0].Addresses {
for _, a := range endpoint.Subsets[endpointSubsetIndex].Addresses {
puid := string(a.TargetRef.UID)

po, err := v.client.CoreV1().Pods(v.namespace).Get(a.TargetRef.Name, metav1.GetOptions{})
Expand All @@ -84,11 +99,11 @@ func (v *EndpointWatcher) watch(updates chan *EndpointConfig, errors chan error)
continue
}

endpoint.Subsets[0].Addresses = addresses
endpoint.Subsets[endpointSubsetIndex].Addresses = addresses

newConfig := NewEndpointConfig()

newBackendList, err := EndpointListFromSubset(endpoint.Subsets[0], v.portName)
newBackendList, err := EndpointListFromSubset(endpoint.Subsets[endpointSubsetIndex], v.portName)
if err != nil {
glog.Errorf("error while building backend list: %s", err.Error())
continue
Expand Down

0 comments on commit 2265347

Please sign in to comment.