Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle buggy case for headless Service with no port #366

Merged
merged 11 commits into from
Jan 25, 2019
22 changes: 6 additions & 16 deletions pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ type ProviderConfig struct {

type CreateConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Inputs *unstructured.Unstructured
}

type ReadConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Name string
Inputs *unstructured.Unstructured
Name string
}

type UpdateConfig struct {
ProviderConfig
Previous *unstructured.Unstructured
Inputs *unstructured.Unstructured
Previous *unstructured.Unstructured
Inputs *unstructured.Unstructured
}

type DeleteConfig struct {
Expand Down Expand Up @@ -336,17 +336,7 @@ func Deletion(c DeleteConfig) error {
return err
}

// Attempt to retrieve k8s server version. Use default version in case this fails.
var version serverVersion
if sv, err := c.ClientSet.DiscoveryClientCached.ServerVersion(); err == nil {
if v, err := parseVersion(sv); err == nil {
version = v
} else {
version = defaultVersion()
}
} else {
version = defaultVersion()
}
version := ServerVersion(c.ClientSet.DiscoveryClientCached)

// Manually set delete propagation for Kubernetes versions < 1.6 to avoid bugs.
deleteOpts := metav1.DeleteOptions{}
Expand Down
75 changes: 64 additions & 11 deletions pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (sia *serviceInitAwaiter) Await() error {
}
defer endpointWatcher.Stop()

return sia.await(serviceWatcher, endpointWatcher, time.After(10*time.Minute), make(chan struct{}))
version := ServerVersion(sia.config.clientSet.DiscoveryClientCached)

return sia.await(serviceWatcher, endpointWatcher, time.After(10*time.Minute), make(chan struct{}), version)
}

func (sia *serviceInitAwaiter) Read() error {
Expand Down Expand Up @@ -163,11 +165,14 @@ func (sia *serviceInitAwaiter) Read() error {
endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

return sia.read(service, endpointList)
version := ServerVersion(sia.config.clientSet.DiscoveryClientCached)

return sia.read(service, endpointList, version)
}

func (sia *serviceInitAwaiter) read(
service *unstructured.Unstructured, endpoints *unstructured.UnstructuredList,
version serverVersion,
) error {
sia.processServiceEvent(watchAddedEvent(service))

Expand All @@ -186,7 +191,7 @@ func (sia *serviceInitAwaiter) read(

sia.endpointsSettled = true

if sia.checkAndLogStatus() {
if sia.checkAndLogStatus(version) {
return nil
}

Expand All @@ -199,13 +204,13 @@ func (sia *serviceInitAwaiter) read(
// await is a helper companion to `Await` designed to make it easy to test this module.
func (sia *serviceInitAwaiter) await(
serviceWatcher, endpointWatcher watch.Interface, timeout <-chan time.Time,
settled chan struct{},
settled chan struct{}, version serverVersion,
) error {
sia.config.logStatus(diag.Info, "[1/3] Finding Pods to direct traffic to")

for {
// Check whether we've succeeded.
if sia.checkAndLogStatus() {
if sia.checkAndLogStatus(version) {
return nil
}

Expand Down Expand Up @@ -275,7 +280,7 @@ func (sia *serviceInitAwaiter) processServiceEvent(event watch.Event) {
// Update status of service object so that we can check success.
sia.serviceReady = isSlice && len(ing) > 0

glog.V(3).Infof("Waiting for service '%q' to assign IP/hostname for a load balancer",
glog.V(3).Infof("Waiting for service %q to assign IP/hostname for a load balancer",
inputServiceName)
} else {
// If it's not type `LoadBalancer`, report success.
Expand Down Expand Up @@ -345,24 +350,72 @@ func (sia *serviceInitAwaiter) errorMessages() []string {
return messages
}

// isHeadlessService checks if the Service has a defined .spec.clusterIP
func (sia *serviceInitAwaiter) isHeadlessService() bool {
clusterIP, _ := openapi.Pluck(sia.service.Object, "spec", "clusterIP")
return clusterIP == v1.ClusterIPNone
}

// isExternalNameService checks if the Service type is "ExternalName"
func (sia *serviceInitAwaiter) isExternalNameService() bool {
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
return sia.serviceType == string(v1.ServiceTypeExternalName)
}

// emptyHeadlessOrExternalName checks whether the current `Service` is either an "empty" headless
// `Service`[1] (i.e., it targets 0 `Pod`s) or a `Service` with `.spec.type: ExternalName` (which
// also targets 0 `Pod`s). This is useful to know when deciding whether to wait for a `Service` to
// target some number of `Pod`s.
//
// [1]: https://kubernetes.io/docs/concepts/services-networking/service/#headless-services
func (sia *serviceInitAwaiter) emptyHeadlessOrExternalName() bool {
clusterIP, _ := openapi.Pluck(sia.service.Object, "spec", "clusterIP")
selectorI, _ := openapi.Pluck(sia.service.Object, "spec", "selector")
selector, _ := selectorI.(map[string]interface{})

headlessEmpty := len(selector) == 0 && clusterIP == v1.ClusterIPNone
return headlessEmpty || sia.serviceType == string(v1.ServiceTypeExternalName)
headlessEmpty := len(selector) == 0 && sia.isHeadlessService()
return headlessEmpty || sia.isExternalNameService()

}

func (sia *serviceInitAwaiter) checkAndLogStatus() bool {
if sia.emptyHeadlessOrExternalName() {
// hasHeadlessServicePortBug checks whether the current `Service` is affected by a bug [1][2]
// that prevents endpoints from being populated when ports are not specified for a headless
// or external name Service.
//
// [1]: https://github.com/kubernetes/dns/issues/174
// [2]: https://github.com/kubernetes/kubernetes/commit/1c0137252465574519baf99252df8d75048f1304
func (sia *serviceInitAwaiter) hasHeadlessServicePortBug(version serverVersion) bool {
// This bug only affects headless or external name Services.
if !sia.isHeadlessService() && !sia.isExternalNameService() {
return false
}

// k8s versions < 1.12 have the bug.
if version.Compare(1, 12) < 0 {
portsI, _ := openapi.Pluck(sia.service.Object, "spec", "ports")
ports, _ := portsI.([]map[string]interface{})
hasPorts := len(ports) > 0

// The bug affects Services with no specified ports.
if !hasPorts {
return true
}
}

return false
}

// shouldWaitForPods determines whether to wait for Pods to be ready before marking the Service ready.
func (sia *serviceInitAwaiter) shouldWaitForPods(version serverVersion) bool {
// For these special cases, skip the wait for Pod logic.
if sia.emptyHeadlessOrExternalName() || sia.hasHeadlessServicePortBug(version) {
sia.endpointsReady = true
return false
}

return true
}

func (sia *serviceInitAwaiter) checkAndLogStatus(version serverVersion) bool {
if !sia.shouldWaitForPods(version) {
return sia.serviceReady
}

Expand Down
27 changes: 24 additions & 3 deletions pkg/await/core_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func Test_Core_Service(t *testing.T) {
description string
serviceInput func(namespace, name string) *unstructured.Unstructured
do func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time)
version serverVersion
expectedError error
}{
{
Expand Down Expand Up @@ -186,9 +187,21 @@ func Test_Core_Service(t *testing.T) {
timeout <- time.Now()
},
},
{
description: "Should succeed if non-empty headless service doesn't target any Pods before k8s 1.12",
serviceInput: headlessNonemptyServiceInput,
version: serverVersion{1, 11},
do: func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
services <- watchAddedEvent(headlessNonemptyServiceOutput("default", "foo-4setj4y6"))

// Finally, time out.
timeout <- time.Now()
},
},
{
description: "Should fail if non-empty headless service doesn't target any Pods",
serviceInput: headlessNonemptyServiceInput,
version: serverVersion{1, 12},
do: func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
services <- watchAddedEvent(headlessNonemptyServiceOutput("default", "foo-4setj4y6"))

Expand All @@ -214,7 +227,7 @@ func Test_Core_Service(t *testing.T) {
go test.do(services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: services}, &chanWatcher{results: endpoints},
timeout, settled)
timeout, settled, test.version)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand All @@ -225,6 +238,7 @@ func Test_Core_Service_Read(t *testing.T) {
serviceInput func(namespace, name string) *unstructured.Unstructured
service func(namespace, name string) *unstructured.Unstructured
endpoint func(namespace, name string) *unstructured.Unstructured
version serverVersion
expectedSubErrors []string
}{
{
Expand Down Expand Up @@ -261,10 +275,17 @@ func Test_Core_Service_Read(t *testing.T) {
serviceInput: headlessEmptyServiceInput,
service: headlessEmptyServiceInput,
},
{
description: "Read succeed if headless non-empty Service doesn't target any Pods before k8s 1.12",
serviceInput: headlessNonemptyServiceInput,
service: headlessNonemptyServiceInput,
version: serverVersion{1, 11},
},
{
description: "Read fail if headless non-empty Service doesn't target any Pods",
serviceInput: headlessNonemptyServiceInput,
service: headlessNonemptyServiceInput,
version: serverVersion{1, 12},
expectedSubErrors: []string{
"Service does not target any Pods. Selected Pods may not be ready, or " +
"field '.spec.selector' may not match labels on any Pods"},
Expand All @@ -278,9 +299,9 @@ func Test_Core_Service_Read(t *testing.T) {
var err error
if test.endpoint != nil {
endpoint := test.endpoint("default", "foo-4setj4y6")
err = awaiter.read(service, unstructuredList(*endpoint))
err = awaiter.read(service, unstructuredList(*endpoint), test.version)
} else {
err = awaiter.read(service, unstructuredList())
err = awaiter.read(service, unstructuredList(), test.version)
}
if test.expectedSubErrors != nil {
assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description)
Expand Down
24 changes: 24 additions & 0 deletions pkg/await/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
)

Expand Down Expand Up @@ -151,6 +152,29 @@ func getLastWarningsForObject(

// --------------------------------------------------------------------------

// Version helpers.

// --------------------------------------------------------------------------

// ServerVersion attempts to retrieve the server version from k8s.
// Returns the configured default version in case this fails.
func ServerVersion(cdi discovery.CachedDiscoveryInterface) serverVersion {
var version serverVersion
if sv, err := cdi.ServerVersion(); err == nil {
if v, err := parseVersion(sv); err == nil {
version = v
} else {
version = defaultVersion()
}
} else {
version = defaultVersion()
}

return version
}

// --------------------------------------------------------------------------

// Response helpers.

// --------------------------------------------------------------------------
Expand Down
Loading