Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle buggy case for headless Service with no port #366

Merged
merged 11 commits into from
Jan 25, 2019
26 changes: 8 additions & 18 deletions pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ type ProviderConfig struct {

type CreateConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Inputs *unstructured.Unstructured
}

type ReadConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Name string
Inputs *unstructured.Unstructured
Name string
}

type UpdateConfig struct {
ProviderConfig
Previous *unstructured.Unstructured
Inputs *unstructured.Unstructured
Previous *unstructured.Unstructured
Inputs *unstructured.Unstructured
}

type DeleteConfig struct {
Expand Down Expand Up @@ -336,26 +336,16 @@ func Deletion(c DeleteConfig) error {
return err
}

// Attempt to retrieve k8s server version. Use default version in case this fails.
var version serverVersion
if sv, err := c.ClientSet.DiscoveryClientCached.ServerVersion(); err == nil {
if v, err := parseVersion(sv); err == nil {
version = v
} else {
version = defaultVersion()
}
} else {
version = defaultVersion()
}
version := ServerVersion(c.ClientSet.DiscoveryClientCached)

// Manually set delete propagation for Kubernetes versions < 1.6 to avoid bugs.
deleteOpts := metav1.DeleteOptions{}
if version.Compare(1, 6) < 0 {
if version.Compare(1, 6, 0) < 0 {
// 1.5.x option.
boolFalse := false
// nolint
deleteOpts.OrphanDependents = &boolFalse
} else if version.Compare(1, 7) < 0 {
} else if version.Compare(1, 7, 0) < 0 {
// 1.6.x option. Background delete propagation is broken in k8s v1.6.
fg := metav1.DeletePropagationForeground
deleteOpts.PropagationPolicy = &fg
Expand Down
61 changes: 56 additions & 5 deletions pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (sia *serviceInitAwaiter) processServiceEvent(event watch.Event) {
// Update status of service object so that we can check success.
sia.serviceReady = isSlice && len(ing) > 0

glog.V(3).Infof("Waiting for service '%q' to assign IP/hostname for a load balancer",
glog.V(3).Infof("Waiting for service %q to assign IP/hostname for a load balancer",
inputServiceName)
} else {
// If it's not type `LoadBalancer`, report success.
Expand Down Expand Up @@ -345,24 +345,75 @@ func (sia *serviceInitAwaiter) errorMessages() []string {
return messages
}

// isHeadlessService checks if the Service has a defined .spec.clusterIP
func (sia *serviceInitAwaiter) isHeadlessService() bool {
clusterIP, _ := openapi.Pluck(sia.service.Object, "spec", "clusterIP")
return clusterIP == v1.ClusterIPNone
}

// isExternalNameService checks if the Service type is "ExternalName"
func (sia *serviceInitAwaiter) isExternalNameService() bool {
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
return sia.serviceType == string(v1.ServiceTypeExternalName)
}

// emptyHeadlessOrExternalName checks whether the current `Service` is either an "empty" headless
// `Service`[1] (i.e., it targets 0 `Pod`s) or a `Service` with `.spec.type: ExternalName` (which
// also targets 0 `Pod`s). This is useful to know when deciding whether to wait for a `Service` to
// target some number of `Pod`s.
//
// [1]: https://kubernetes.io/docs/concepts/services-networking/service/#headless-services
func (sia *serviceInitAwaiter) emptyHeadlessOrExternalName() bool {
clusterIP, _ := openapi.Pluck(sia.service.Object, "spec", "clusterIP")
selectorI, _ := openapi.Pluck(sia.service.Object, "spec", "selector")
selector, _ := selectorI.(map[string]interface{})

headlessEmpty := len(selector) == 0 && clusterIP == v1.ClusterIPNone
return headlessEmpty || sia.serviceType == string(v1.ServiceTypeExternalName)
headlessEmpty := len(selector) == 0 && sia.isHeadlessService()
return headlessEmpty || sia.isExternalNameService()

}

// hasHeadlessServicePortBug checks whether the current `Service` is affected by a bug [1][2]
// that prevents endpoints from being populated when ports are not specified for a headless Service.
//
// [1]: https://github.com/kubernetes/dns/issues/174
// [2]: https://github.com/kubernetes/kubernetes/commit/1c0137252465574519baf99252df8d75048f1304
func (sia *serviceInitAwaiter) hasHeadlessServicePortBug() bool {
isBuggy := false

// This bug only affects headless or external name Services.
if !sia.isHeadlessService() || sia.isExternalNameService() {
return isBuggy
}

version := ServerVersion(sia.config.clientSet.DiscoveryClientCached)

portsI, _ := openapi.Pluck(sia.service.Object, "spec", "ports")
ports, _ := portsI.([]map[string]interface{})
hasPorts := len(ports) > 0

// k8s versions < 1.12.1 have the bug.
if version.Compare(1, 12, 1) < 0 {
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
// The bug affects Services with no specified ports.
if !hasPorts {
isBuggy = true
}
}

return isBuggy
}

// waitForPods determines whether to wait for Pods to be ready before marking the Service ready.
func (sia *serviceInitAwaiter) waitForPods() bool {
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
// For these special cases, skip the wait for Pod logic.
if sia.emptyHeadlessOrExternalName() || sia.hasHeadlessServicePortBug() {
sia.endpointsReady = true
return false
}

return true
}

func (sia *serviceInitAwaiter) checkAndLogStatus() bool {
if sia.emptyHeadlessOrExternalName() {
if !sia.waitForPods() {
return sia.serviceReady
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/await/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
)

Expand Down Expand Up @@ -151,6 +152,29 @@ func getLastWarningsForObject(

// --------------------------------------------------------------------------

// Version helpers.

// --------------------------------------------------------------------------

// ServerVersion attempts to retrieve the server version from k8s.
// Returns the configured default version in case this fails.
func ServerVersion(cdi discovery.CachedDiscoveryInterface) serverVersion {
var version serverVersion
if sv, err := cdi.ServerVersion(); err == nil {
if v, err := parseVersion(sv); err == nil {
version = v
} else {
version = defaultVersion()
}
} else {
version = defaultVersion()
}

return version
}

// --------------------------------------------------------------------------

// Response helpers.

// --------------------------------------------------------------------------
Expand Down
42 changes: 28 additions & 14 deletions pkg/await/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
)

// Format v0.0.0(-master+$Format:%h$)
var gitVersionRe = regexp.MustCompile(`v([0-9])+.([0-9])+.[0-9]+.*`)
var gitVersionRe = regexp.MustCompile(`v([0-9])+.([0-9])+.([0-9])+.*`)

// serverVersion captures k8s major.minor version in a parsed form
type serverVersion struct {
Major int
Minor int
Patch int
}

// DefaultVersion takes a wild guess (v1.9) at the version of a Kubernetes cluster.
Expand All @@ -53,7 +54,7 @@ func defaultVersion() serverVersion {

func parseGitVersion(gitVersion string) (serverVersion, error) {
parsedVersion := gitVersionRe.FindStringSubmatch(gitVersion)
if len(parsedVersion) != 3 {
if len(parsedVersion) != 4 {
return serverVersion{}, fmt.Errorf("unable to parse git version %q", gitVersion)
}
var ret serverVersion
Expand All @@ -66,29 +67,37 @@ func parseGitVersion(gitVersion string) (serverVersion, error) {
if err != nil {
return serverVersion{}, err
}
ret.Patch, err = strconv.Atoi(parsedVersion[3])
if err != nil {
return serverVersion{}, err
}
return ret, nil
}

// parseVersion parses version.Info into a serverVersion struct
func parseVersion(v *version.Info) (ret serverVersion, err error) {
ret.Major, err = strconv.Atoi(v.Major)
ret, err = parseGitVersion(v.GitVersion)
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return parseGitVersion(v.GitVersion)
}
ret.Major, err = strconv.Atoi(v.Major)
if err != nil {
return serverVersion{}, fmt.Errorf("unable to parse server version: %#v", v)
}

// trim "+" in minor version (happened on GKE)
v.Minor = strings.TrimSuffix(v.Minor, "+")
// trim "+" in minor version (happened on GKE)
v.Minor = strings.TrimSuffix(v.Minor, "+")

ret.Minor, err = strconv.Atoi(v.Minor)
if err != nil {
return serverVersion{}, fmt.Errorf("unable to parse server version: %#v", v)
}

ret.Minor, err = strconv.Atoi(v.Minor)
if err != nil {
return parseGitVersion(v.GitVersion)
}

return ret, nil
return
}

// Compare returns -1/0/+1 iff v is less than / equal / greater than major.minor
func (v serverVersion) Compare(major, minor int) int {
// Compare returns -1/0/+1 iff v is less than / equal / greater than major.minor.patch
func (v serverVersion) Compare(major, minor, patch int) int {
a := v.Major
b := major

Expand All @@ -97,6 +106,11 @@ func (v serverVersion) Compare(major, minor int) int {
b = minor
}

if a == b {
a = v.Patch
b = patch
}

var res int
if a > b {
res = 1
Expand All @@ -109,7 +123,7 @@ func (v serverVersion) Compare(major, minor int) int {
}

func (v serverVersion) String() string {
return fmt.Sprintf("%d.%d", v.Major, v.Minor)
return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
}

// canonicalizeDeploymentAPIVersion unifies the various pre-release apiVerion values for a
Expand Down
22 changes: 15 additions & 7 deletions pkg/await/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ func TestParseVersion(t *testing.T) {
},
{
input: version.Info{Major: "", Minor: "", GitVersion: "v1.8.8-test.0"},
expected: serverVersion{Major: 1, Minor: 8},
expected: serverVersion{Major: 1, Minor: 8, Patch: 8},
},
{
input: version.Info{Major: "1", Minor: "8", GitVersion: "v1.9.0"},
expected: serverVersion{Major: 1, Minor: 8},
expected: serverVersion{Major: 1, Minor: 9},
},
{
input: version.Info{Major: "1", Minor: "9", GitVersion: "v1.9.1"},
expected: serverVersion{Major: 1, Minor: 9, Patch: 1},
},
{
input: version.Info{Major: "", Minor: "", GitVersion: "v1.a"},
Expand All @@ -81,27 +85,31 @@ func TestParseVersion(t *testing.T) {
continue
}
if v != test.expected {
t.Errorf("Expected %v, got %v", test.expected, v)
t.Errorf("Expected %#v, got %#v", test.expected, v)
}
}
}

func TestVersionCompare(t *testing.T) {
v := serverVersion{Major: 2, Minor: 3}
v := serverVersion{Major: 2, Minor: 3, Patch: 0}
tests := []struct {
major, minor, result int
major, minor, patch, result int
}{
{major: 1, minor: 0, result: 1},
{major: 2, minor: 0, result: 1},
{major: 2, minor: 2, result: 1},
{major: 2, minor: 2, result: 1},
{major: 2, minor: 2, patch: 2, result: 1},
{major: 2, minor: 3, result: 0},
{major: 2, minor: 3, patch: 0, result: 0},
{major: 2, minor: 4, result: -1},
{major: 3, minor: 0, result: -1},
{major: 2, minor: 3, patch: 1, result: -1},
}
for _, test := range tests {
res := v.Compare(test.major, test.minor)
res := v.Compare(test.major, test.minor, test.patch)
if res != test.result {
t.Errorf("%d.%d => Expected %d, got %d", test.major, test.minor, test.result, res)
t.Errorf("%d.%d.%d => Expected %d, got %d", test.major, test.minor, test.patch, test.result, res)
}
}
}