Skip to content

Commit

Permalink
Headless service: retrieve endpoints via Endpoints resource; evaluate…
Browse files Browse the repository at this point in the history
… spec.publishNotReadyAddresses

Currently, the endpoints of headless services are retrieved
by querying pods using the pod selector of the service.
Instead, we now query for an Endpoints resource with the
same name as the Service object to get the endpoints for the
service. This is needed in order to support the
spec.publishNotReadyPods attribute of a service.
  • Loading branch information
alfredkrohmer committed Apr 28, 2019
1 parent a4f40df commit 2b14f6f
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 40 deletions.
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 h1:D+CiwcpGTW6pL6bv6KI3KbyEyCKyS+1JWS2h8PNDnGA=
Expand Down Expand Up @@ -197,7 +198,9 @@ github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.2 h1:Fy0orTDgHdbnzHcsOgfCN4LtHf0ec3wwtiwJqwvf3Gc=
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9 h1:/Bsw4C+DEdqPjt8vAqaC9LAqpAQnaCQQqmolqq3S1T4=
github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8=
Expand Down Expand Up @@ -317,5 +320,6 @@ k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d h1:MZjlsu9igBoVPZkXpIGoxI
k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0=
k8s.io/client-go v8.0.0+incompatible h1:tTI4hRmb1DRMl4fG6Vclfdi6nTM82oIrTT7HfitmxC4=
k8s.io/client-go v8.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s=
k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c h1:kJCzg2vGCzah5icgkKR7O1Dzn0NA2iGlym27sb0ZfGE=
k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
71 changes: 51 additions & 20 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type serviceSource struct {
publishInternal bool
publishHostIP bool
serviceInformer coreinformers.ServiceInformer
endpointsInformer coreinformers.EndpointsInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter map[string]struct{}
Expand All @@ -86,6 +87,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()

Expand All @@ -97,6 +99,13 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
},
},
)
endpointsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("endpoints added")
},
},
)
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -141,6 +150,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
publishInternal: publishInternal,
publishHostIP: publishHostIP,
serviceInformer: serviceInformer,
endpointsInformer: endpointsInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: serviceTypes,
Expand Down Expand Up @@ -230,39 +240,60 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
return nil
}

endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName())
if err != nil {
log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err)
return endpoints
}

pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
return endpoints
}

targetsByHeadlessDomain := make(map[string][]string)
for _, v := range pods {
headlessDomains := []string{hostname}

if v.Spec.Hostname != "" {
headlessDomains = append(headlessDomains, fmt.Sprintf("%s.%s", v.Spec.Hostname, hostname))
for _, subset := range endpointsObject.Subsets {
addresses := subset.Addresses
if svc.Spec.PublishNotReadyAddresses {
addresses = append(addresses, subset.NotReadyAddresses...)
}
for _, headlessDomain := range headlessDomains {
if sc.publishHostIP == true {
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, v.Status.HostIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], v.Status.HostIP)
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)

for _, address := range addresses {
// find pod for this address
if address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" {
log.Debugf("Skipping address because its target is not a pod: %v", address)
continue
}
var pod *v1.Pod
for _, v := range pods {
if v.Name == address.TargetRef.Name {
pod = v
break
}
} else {
log.Debugf("Generating matching endpoint %s with PodIP %s", headlessDomain, v.Status.PodIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], v.Status.PodIP)
}
if pod == nil {
log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
continue
}

headlessDomains := []string{hostname}
if pod.Spec.Hostname != "" {
headlessDomains = append(headlessDomains, fmt.Sprintf("%s.%s", pod.Spec.Hostname, hostname))
}

for _, headlessDomain := range headlessDomains {
var endpoint_ string
if sc.publishHostIP == true {
endpoint_ = pod.Status.HostIP
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, endpoint_)
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
endpoint_ = address.IP
log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, endpoint_)
}
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], endpoint_)
}
}

}

headlessDomains := []string{}
Expand Down
Loading

0 comments on commit 2b14f6f

Please sign in to comment.