Skip to content

Commit

Permalink
Add Pod UID and Nodename parameters to svcwatcher created EndPoints.
Browse files Browse the repository at this point in the history
This is needed to be inline with how K8s in-built Service controller creates its own EPs.
Not doing so might break assumptions hard-coded into external DNS Controllers, such as CoreDNS.
  • Loading branch information
Levovar committed Mar 30, 2020
1 parent 1110535 commit d8f66f0
Showing 1 changed file with 29 additions and 40 deletions.
69 changes: 29 additions & 40 deletions pkg/svccontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,17 @@ func (c *Controller) EpCheckUpdate(ipAddr, ip6Addr string, eps *corev1.Endpoints
(wasIpv6AddressFound || (ip6Addr != "" && ip6Addr != ipam.NoneAllocType && ip6Addr != ipam.DynamicAllocType)) {
return nil
}
host := getPodHost(pod)
targetRef := &corev1.ObjectReference{
Kind: "pod",
Namespace: pod.Namespace,
Name: pod.Name,
ResourceVersion: pod.ResourceVersion,
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
UID: pod.ObjectMeta.UID,
}
if PodReady(pod) || early {
eps.Subsets[0].Addresses = createChangedEpAddressList(ipAddr, ip6Addr, host, eps, targetRef, eps.Subsets[0].Addresses)
eps.Subsets[0].Addresses = createChangedEpAddressList(ipAddr, ip6Addr, pod, eps, targetRef, eps.Subsets[0].Addresses)
} else {
eps.Subsets[0].NotReadyAddresses = createChangedEpAddressList(ipAddr, ip6Addr, host, eps, targetRef, eps.Subsets[0].NotReadyAddresses)
eps.Subsets[0].NotReadyAddresses = createChangedEpAddressList(ipAddr, ip6Addr, pod, eps, targetRef, eps.Subsets[0].NotReadyAddresses)
}
return c.UpdateEndpoints(eps)
}
Expand Down Expand Up @@ -230,15 +230,15 @@ func (c* Controller) UpdatePodRvInEps(epsList []*corev1.Endpoints, pod *corev1.P
// it is not possible that the same pod is in both ready and in not ready
for i, a := range eps.Subsets[0].Addresses {
if a.TargetRef != nil {
if a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace {
if a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace && a.TargetRef.UID == pod.ObjectMeta.UID {
newEps.Subsets[0].Addresses[i].TargetRef.ResourceVersion = pod.ResourceVersion
epList = append(epList, newEps)
}
}
}
for i, a := range eps.Subsets[0].NotReadyAddresses {
if a.TargetRef != nil {
if a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace {
if a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace && a.TargetRef.UID == pod.ObjectMeta.UID {
newEps.Subsets[0].NotReadyAddresses[i].TargetRef.ResourceVersion = pod.ResourceVersion
epList = append(epList, newEps)
}
Expand Down Expand Up @@ -271,15 +271,17 @@ func (c* Controller) UpdatePodStatusInEps(epsList []*corev1.Endpoints, pod *core
early := (svc.Annotations[TolerateUnreadyEps] == "true")
// it is not possible that the same pod is in both ready and in not ready
for _, a := range eps.Subsets[0].Addresses {
if (a.TargetRef != nil) && (oldReady || (newReady && early)) && a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace {
if (a.TargetRef != nil) && (oldReady || (newReady && early)) &&
a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace && a.TargetRef.UID == pod.ObjectMeta.UID {
if !early {
delete(readyAddrs, a.IP)
notReadyAddrs[a.IP] = a
}
}
}
for _, a := range eps.Subsets[0].NotReadyAddresses {
if ( a.TargetRef != nil ) && newReady && a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace {
if ( a.TargetRef != nil ) && newReady &&
a.TargetRef.Name == pod.Name && a.TargetRef.Namespace == pod.Namespace && a.TargetRef.UID == pod.ObjectMeta.UID {
delete(notReadyAddrs, a.IP)
readyAddrs[a.IP] = a
}
Expand All @@ -301,8 +303,8 @@ func (c* Controller) UpdatePodStatusInEps(epsList []*corev1.Endpoints, pod *core
func (c *Controller) MakeNewEps(svc *corev1.Service, des []*danmv1.DanmEp) (corev1.Endpoints) {
epNew := corev1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: svc.Name,
Namespace: svc.Namespace,
Name: svc.ObjectMeta.Name,
Namespace: svc.ObjectMeta.Namespace,
Annotations: svc.GetAnnotations(),
},
}
Expand All @@ -320,29 +322,24 @@ func (c *Controller) MakeNewEps(svc *corev1.Service, des []*danmv1.DanmEp) (core
}
targetRef := &corev1.ObjectReference{
Kind: "pod",
Namespace: pod.Namespace,
Name: pod.Name,
ResourceVersion: pod.ResourceVersion,
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
UID: pod.ObjectMeta.UID,
}
hostName := getPodHost(pod)
if PodReady(pod) || svc.Annotations[TolerateUnreadyEps] == "true" {
readyEpAddrs = createChangedEpAddressList(strings.Split(de.Spec.Iface.Address, "/")[0], strings.Split(de.Spec.Iface.AddressIPv6, "/")[0], hostName, nil, targetRef, readyEpAddrs)
readyEpAddrs = createChangedEpAddressList(strings.Split(de.Spec.Iface.Address, "/")[0], strings.Split(de.Spec.Iface.AddressIPv6, "/")[0], pod, nil, targetRef, readyEpAddrs)
} else {
notReadyEpAddrs = createChangedEpAddressList(strings.Split(de.Spec.Iface.Address, "/")[0], strings.Split(de.Spec.Iface.AddressIPv6, "/")[0], hostName, nil, targetRef, notReadyEpAddrs)
notReadyEpAddrs = createChangedEpAddressList(strings.Split(de.Spec.Iface.Address, "/")[0], strings.Split(de.Spec.Iface.AddressIPv6, "/")[0], pod, nil, targetRef, notReadyEpAddrs)
}
}
var epPorts []corev1.EndpointPort
for _, svcPort := range svc.Spec.Ports {
ep := corev1.EndpointPort{}
if svcPort.Name != "" {
ep.Name = svcPort.Name
}
if svcPort.Port != 0 {
ep.Port = svcPort.Port
}
if svcPort.Protocol != "" {
ep.Protocol = svcPort.Protocol
}
ep := corev1.EndpointPort {
Name: svcPort.Name,
Port: svcPort.Port,
Protocol: svcPort.Protocol,
}
epPorts = append(epPorts, ep)
}
subsets := []corev1.EndpointSubset{
Expand Down Expand Up @@ -558,7 +555,7 @@ func (c *Controller) updatePod(old, new interface{}) {
}
for _, de := range desList {
deNew := de.DeepCopy()
if deNew.Spec.Pod == podName && deNew.Namespace == podNs {
if deNew.Spec.Pod == podName && deNew.Namespace == podNs && deNew.Spec.PodUID == newPod.ObjectMeta.UID {
deLabels := newPod.Labels
deNew.SetLabels(deLabels)
_, err = c.danmclient.DanmV1().DanmEps(deNew.Namespace).Update(deNew)
Expand Down Expand Up @@ -640,26 +637,18 @@ func isIpInEp(ip string, eps *corev1.Endpoints) bool {
return isIpPresent
}

func createChangedEpAddressList(v4Address, v6Address, host string, eps *corev1.Endpoints, targetRef *corev1.ObjectReference, epAddrs []corev1.EndpointAddress) []corev1.EndpointAddress {
func createChangedEpAddressList(v4Address, v6Address string, pod *corev1.Pod, eps *corev1.Endpoints, targetRef *corev1.ObjectReference, epAddrs []corev1.EndpointAddress) []corev1.EndpointAddress {
if (v4Address != "" && v4Address != ipam.NoneAllocType && v4Address != ipam.DynamicAllocType) &&
(eps == nil || !isIpInEp(v4Address, eps)) {
epAddrs = append(epAddrs, corev1.EndpointAddress{IP: v4Address, Hostname: host, TargetRef: targetRef})
epAddrs = append(epAddrs, corev1.EndpointAddress{IP: v4Address, Hostname: pod.Spec.Hostname, NodeName: &pod.Spec.NodeName, TargetRef: targetRef})
}
if (v6Address != "" && v6Address != ipam.NoneAllocType && v6Address != ipam.DynamicAllocType) &&
(eps == nil || !isIpInEp(v6Address, eps)) {
epAddrs = append(epAddrs, corev1.EndpointAddress{IP: v6Address, Hostname: host, TargetRef: targetRef})
epAddrs = append(epAddrs, corev1.EndpointAddress{IP: v6Address, Hostname: pod.Spec.Hostname, NodeName: &pod.Spec.NodeName, TargetRef: targetRef})
}
return epAddrs
}

func getPodHost(pod *corev1.Pod) string {
hostName := pod.Spec.Hostname
if hostName == "" {
hostName = pod.Name
}
return hostName
}

func getIpsFromDanmEp(de *danmv1.DanmEp) (string,string) {
var ipAddr, ip6Addr string
if de.Spec.Iface.Address != "" {
Expand Down

0 comments on commit d8f66f0

Please sign in to comment.