Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setting Hostname from Pods on EndpointSlice to match Endpoints behavior. #84207

Merged
merged 1 commit into from Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/controller/endpoint/endpoints_controller.go
Expand Up @@ -427,11 +427,10 @@ func (e *EndpointController) syncService(key string) error {
klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)
continue
}
epa := *ep

hostname := pod.Spec.Hostname
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
epa.Hostname = hostname
epa := *ep
if endpointutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}

// Allow headless service not to have ports.
Expand Down
Expand Up @@ -280,6 +280,7 @@ func TestSyncServiceFull(t *testing.T) {
Protocol: protoPtr(v1.ProtocolSCTP),
Port: int32Ptr(int32(3456)),
}}, slice.Ports)

assert.ElementsMatch(t, []discovery.Endpoint{{
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Addresses: []string{"1.2.3.4"},
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/endpointslice/reconciler.go
Expand Up @@ -90,7 +90,8 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
if err != nil {
return err
}
endpoint := podToEndpoint(pod, node, service.Spec.PublishNotReadyAddresses)

endpoint := podToEndpoint(pod, node, service)
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
numDesiredEndpoints++
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/endpointslice/reconciler_test.go
Expand Up @@ -224,13 +224,13 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
}

// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
}

existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
Expand Down Expand Up @@ -276,13 +276,13 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
}

// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
}

existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
if i%30 == 0 {
existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
}
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
}

createEndpointSlices(t, client, namespace, existingSlices)
Expand Down Expand Up @@ -396,15 +396,15 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 0; i < 80; i++ {
pod := newPod(i, namespace, true, 1)
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice1)

slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 100; i < 120; i++ {
pod := newPod(i, namespace, true, 1)
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice2)
Expand Down
17 changes: 12 additions & 5 deletions pkg/controller/endpointslice/utils.go
Expand Up @@ -30,13 +30,14 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/discovery/validation"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
)

// podEndpointChanged returns true if the results of podToEndpoint are different
// for the pods passed to this function.
func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, false)
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, false)
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})

endpoint1.TargetRef.ResourceVersion = ""
endpoint2.TargetRef.ResourceVersion = ""
Expand All @@ -45,7 +46,7 @@ func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
}

// podToEndpoint returns an Endpoint object generated from a Pod and Node.
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, publishNotReadyAddresses bool) discovery.Endpoint {
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint {
// Build out topology information. This is currently limited to hostname,
// zone, and region, but this will be expanded in the future.
topology := map[string]string{}
Expand All @@ -67,8 +68,8 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, publishNotReadyAddresses
}
}

ready := publishNotReadyAddresses || podutil.IsPodReady(pod)
return discovery.Endpoint{
ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod)
ep := discovery.Endpoint{
Addresses: getEndpointAddresses(pod.Status),
Conditions: discovery.EndpointConditions{
Ready: &ready,
Expand All @@ -82,6 +83,12 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, publishNotReadyAddresses
ResourceVersion: pod.ObjectMeta.ResourceVersion,
},
}

if endpointutil.ShouldSetHostname(pod, service) {
ep.Hostname = &pod.Spec.Hostname
}

return ep
}

// getEndpointPorts returns a list of EndpointPorts generated from a Service
Expand Down
50 changes: 42 additions & 8 deletions pkg/controller/endpointslice/utils_test.go
Expand Up @@ -74,11 +74,17 @@ func TestNewEndpointSlice(t *testing.T) {

func TestPodToEndpoint(t *testing.T) {
ns := "test"
svc, _ := newServiceAndEndpointMeta("foo", ns)
svcPublishNotReady, _ := newServiceAndEndpointMeta("publishnotready", ns)
svcPublishNotReady.Spec.PublishNotReadyAddresses = true

readyPod := newPod(1, ns, true, 1)
readyPodHostname := newPod(1, ns, true, 1)
readyPodHostname.Spec.Subdomain = svc.Name
readyPodHostname.Spec.Hostname = "example-hostname"

unreadyPod := newPod(1, ns, false, 1)
multiIPPod := newPod(1, ns, true, 1)

multiIPPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}

node1 := &v1.Node{
Expand All @@ -95,12 +101,14 @@ func TestPodToEndpoint(t *testing.T) {
name string
pod *v1.Pod
node *v1.Node
svc *v1.Service
expectedEndpoint discovery.Endpoint
publishNotReadyAddresses bool
}{
{
name: "Ready pod",
pod: readyPod,
svc: &svc,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Expand All @@ -115,9 +123,9 @@ func TestPodToEndpoint(t *testing.T) {
},
},
{
name: "Ready pod + publishNotReadyAddresses",
pod: readyPod,
publishNotReadyAddresses: true,
name: "Ready pod + publishNotReadyAddresses",
pod: readyPod,
svc: &svcPublishNotReady,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Expand All @@ -134,6 +142,7 @@ func TestPodToEndpoint(t *testing.T) {
{
name: "Unready pod",
pod: unreadyPod,
svc: &svc,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
Expand All @@ -148,9 +157,9 @@ func TestPodToEndpoint(t *testing.T) {
},
},
{
name: "Unready pod + publishNotReadyAddresses",
pod: unreadyPod,
publishNotReadyAddresses: true,
name: "Unready pod + publishNotReadyAddresses",
pod: unreadyPod,
svc: &svcPublishNotReady,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Expand All @@ -168,6 +177,7 @@ func TestPodToEndpoint(t *testing.T) {
name: "Ready pod + node labels",
pod: readyPod,
node: node1,
svc: &svc,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Expand All @@ -189,6 +199,7 @@ func TestPodToEndpoint(t *testing.T) {
name: "Multi IP Ready pod + node labels",
pod: multiIPPod,
node: node1,
svc: &svc,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4", "1234::5678:0000:0000:9abc:def0"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Expand All @@ -206,11 +217,34 @@ func TestPodToEndpoint(t *testing.T) {
},
},
},
{
name: "Ready pod + hostname",
pod: readyPodHostname,
node: node1,
svc: &svc,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Hostname: &readyPodHostname.Spec.Hostname,
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: readyPodHostname.Name,
UID: readyPodHostname.UID,
ResourceVersion: readyPodHostname.ResourceVersion,
},
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.publishNotReadyAddresses)
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.svc)
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {
t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/util/endpoint/controller_utils.go
Expand Up @@ -143,6 +143,12 @@ func ShouldPodBeInEndpoints(pod *v1.Pod) bool {
return true
}

// ShouldSetHostname returns true if the Hostname attribute should be set on an
// Endpoints Address or EndpointSlice Endpoint.
func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool {
robscott marked this conversation as resolved.
Show resolved Hide resolved
return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace
}

// PodChanged returns two boolean values, the first returns true if the pod.
// has changed, the second value returns true if the pod labels have changed.
func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, bool) {
Expand Down
89 changes: 77 additions & 12 deletions pkg/controller/util/endpoint/controller_utils_test.go
Expand Up @@ -81,16 +81,19 @@ func TestDetermineNeededServiceUpdates(t *testing.T) {
union: sets.NewString(),
},
}

for _, testCase := range testCases {
retval := determineNeededServiceUpdates(testCase.a, testCase.b, false)
if !retval.Equal(testCase.xor) {
t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List())
}
t.Run(testCase.name, func(t *testing.T) {
retval := determineNeededServiceUpdates(testCase.a, testCase.b, false)
if !retval.Equal(testCase.xor) {
t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List())
}

retval = determineNeededServiceUpdates(testCase.a, testCase.b, true)
if !retval.Equal(testCase.union) {
t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List())
}
retval = determineNeededServiceUpdates(testCase.a, testCase.b, true)
if !retval.Equal(testCase.union) {
t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List())
}
})
}
}

Expand Down Expand Up @@ -224,11 +227,73 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
expected: true,
},
}

for _, test := range testCases {
result := ShouldPodBeInEndpoints(test.pod)
if result != test.expected {
t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
}
t.Run(test.name, func(t *testing.T) {
result := ShouldPodBeInEndpoints(test.pod)
if result != test.expected {
t.Errorf("expected: %t, got: %t", test.expected, result)
}
})
}
}

func TestShouldSetHostname(t *testing.T) {
testCases := map[string]struct {
pod *v1.Pod
service *v1.Service
expected bool
}{
"all matching": {
pod: genSimplePod("ns", "foo", "svc-name"),
service: genSimpleSvc("ns", "svc-name"),
expected: true,
},
"all matching, hostname not set": {
pod: genSimplePod("ns", "", "svc-name"),
service: genSimpleSvc("ns", "svc-name"),
expected: false,
},
"all set, different name/subdomain": {
pod: genSimplePod("ns", "hostname", "subdomain"),
service: genSimpleSvc("ns", "name"),
expected: false,
},
"all set, different namespace": {
pod: genSimplePod("ns1", "hostname", "svc-name"),
service: genSimpleSvc("ns2", "svc-name"),
expected: false,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
result := ShouldSetHostname(testCase.pod, testCase.service)
if result != testCase.expected {
t.Errorf("expected: %t, got: %t", testCase.expected, result)
}
})
}
}

func genSimplePod(namespace, hostname, subdomain string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
},
Spec: v1.PodSpec{
Hostname: hostname,
Subdomain: subdomain,
},
}
}

func genSimpleSvc(namespace, name string) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}

Expand Down