Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ingress awaiter for ExternalName Services #320

Merged
merged 2 commits into from
Jan 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 99 additions & 17 deletions pkg/await/extensions_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/client"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi/pkg/diag"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -50,6 +51,7 @@ type ingressInitAwaiter struct {
endpointsReady bool
endpointsSettled bool
endpointExists map[string]bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name for this is now probably something like serviceReady, as Services with type ExternalName don't have endpoints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arg, dammit. I'm testing out the GH extension and it apparently doesn't let you batch up review comments?

externalServices map[string]bool
}

func makeIngressInitAwaiter(c createAwaitConfig) *ingressInitAwaiter {
Expand All @@ -60,6 +62,7 @@ func makeIngressInitAwaiter(c createAwaitConfig) *ingressInitAwaiter {
endpointsReady: false,
endpointsSettled: false,
endpointExists: make(map[string]bool),
externalServices: make(map[string]bool),
}
}

Expand All @@ -79,9 +82,10 @@ func (iia *ingressInitAwaiter) Await() error {
//
// We succeed only when all of the following are true:
//
// 1. Ingress object exists.
// 2. Endpoint objects exist with matching names for each Ingress path.
// 3. Ingress entry exists for .status.loadBalancer.ingress.
// 1. Ingress object exists.
// 2. Endpoint objects exist with matching names for each Ingress path.
// 2.1 Alternatively, a Service with type: ExternalName must path the Ingress path.
// 3. Ingress entry exists for .status.loadBalancer.ingress.
//

// Create ingress watcher.
Expand Down Expand Up @@ -109,7 +113,23 @@ func (iia *ingressInitAwaiter) Await() error {
}
defer endpointWatcher.Stop()

return iia.await(ingressWatcher, endpointWatcher, time.After(10*time.Minute), make(chan struct{}))
servicesClient, err := client.FromGVK(iia.config.pool, iia.config.disco, schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
}, iia.config.currentInputs.GetNamespace())
if err != nil {
glog.V(3).Infof("Failed to initialize Services client: %v", err)
return err
}
serviceWatcher, err := servicesClient.Watch(metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for Service objects associated with Ingress %q",
iia.config.currentInputs.GetName())
}

return iia.await(ingressWatcher, serviceWatcher, endpointWatcher, make(chan struct{}), time.After(10*time.Minute))
}

func (iia *ingressInitAwaiter) Read() error {
Expand Down Expand Up @@ -137,16 +157,36 @@ func (iia *ingressInitAwaiter) Read() error {
endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

return iia.read(ingress, endpointList.(*unstructured.UnstructuredList))
servicesClient, err := client.FromGVK(iia.config.pool, iia.config.disco, schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
}, iia.config.currentInputs.GetNamespace())
if err != nil {
glog.V(3).Infof("Failed to initialize Services client: %v", err)
return err
}
serviceList, err := servicesClient.List(metav1.ListOptions{})
if err != nil {
glog.V(3).Infof("Failed to list services needed for Ingress awaiter: %v", err)
serviceList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

return iia.read(ingress, endpointList.(*unstructured.UnstructuredList), serviceList.(*unstructured.UnstructuredList))
}

func (iia *ingressInitAwaiter) read(
ingress *unstructured.Unstructured,
endpoints *unstructured.UnstructuredList,
) error {
func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoints *unstructured.UnstructuredList,
services *unstructured.UnstructuredList) error {
iia.processIngressEvent(watchAddedEvent(ingress))

var err error
err := services.EachListItem(func(service runtime.Object) error {
iia.processServiceEvent(watchAddedEvent(service.(*unstructured.Unstructured)))
return nil
})
if err != nil {
glog.V(3).Infof("Error iterating over endpoint list for service %q: %v", ingress.GetName(), err)
}

settled := make(chan struct{})

glog.V(3).Infof("Processing endpoint list: %#v", endpoints)
Expand All @@ -158,6 +198,7 @@ func (iia *ingressInitAwaiter) read(
glog.V(3).Infof("Error iterating over endpoint list for ingress %q: %v", ingress.GetName(), err)
}

iia.endpointsReady = iia.checkIfEndpointsReady()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm missing something -- seems like it would be better to just get rid of iia.endpointsReady and simply call iia.checkIfEndpointsReady() wherever you would check iia.endpointsReady? Seems redundant to have the function and the variable.

iia.endpointsSettled = true
if iia.checkAndLogStatus() {
return nil
Expand All @@ -170,10 +211,8 @@ func (iia *ingressInitAwaiter) read(
}

// await is a helper companion to `Await` designed to make it easy to test this module.
func (iia *ingressInitAwaiter) await(
ingressWatcher, endpointWatcher watch.Interface, timeout <-chan time.Time,
settled chan struct{},
) error {
func (iia *ingressInitAwaiter) await(ingressWatcher, serviceWatcher, endpointWatcher watch.Interface,
settled chan struct{}, timeout <-chan time.Time) error {
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")

for {
Expand All @@ -195,6 +234,7 @@ func (iia *ingressInitAwaiter) await(
}
case <-timeout:
// On timeout, check one last time if the ingress is ready.
iia.endpointsReady = iia.checkIfEndpointsReady()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we have to do this also for case <-iia.config.ctx.Done():?

if iia.ingressReady && iia.endpointsReady {
return nil
}
Expand All @@ -208,16 +248,36 @@ func (iia *ingressInitAwaiter) await(
iia.processIngressEvent(event)
case event := <-endpointWatcher.ResultChan():
iia.processEndpointEvent(event, settled)
case event := <-serviceWatcher.ResultChan():
iia.processServiceEvent(event)
}
}
}

func (iia *ingressInitAwaiter) processServiceEvent(event watch.Event) {
service, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
glog.V(3).Infof("Service watch received unknown object type %q",
reflect.TypeOf(service))
return
}

name := service.GetName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if event.Type == watch.Deleted {, don't we want to delete this from iia.externalServices? And now that I think about it, is there a reason we don't do this for iia.endpointExists as well?


t, ok := openapi.Pluck(service.Object, "spec", "type")
if ok && t.(string) == "ExternalName" {
iia.externalServices[name] = true
} else {
iia.externalServices[name] = false
}
}

func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) {
inputIngressName := iia.config.currentInputs.GetName()

ingress, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
glog.V(3).Infof("Ingress watch received unknown object type '%s'",
glog.V(3).Infof("Ingress watch received unknown object type %q",
reflect.TypeOf(ingress))
return
}
Expand All @@ -242,17 +302,34 @@ func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) {
return
}

var serviceNames []string
for _, rule := range obj.Spec.Rules {
for _, path := range rule.HTTP.Paths {
serviceNames = append(serviceNames, path.Backend.ServiceName)
}
}
iia.ignoreExternalNameServices(serviceNames)

iia.endpointsReady = iia.checkIfEndpointsReady()

glog.V(3).Infof("Received status for ingress '%s': %#v", inputIngressName, obj.Status)
glog.V(3).Infof("Received status for ingress %q: %#v", inputIngressName, obj.Status)

// Update status of ingress object so that we can check success.
iia.ingressReady = len(obj.Status.LoadBalancer.Ingress) > 0

glog.V(3).Infof("Waiting for ingress '%q' to update .status.loadBalancer with hostname/IP",
glog.V(3).Infof("Waiting for ingress %q to update .status.loadBalancer with hostname/IP",
inputIngressName)
}

func (iia *ingressInitAwaiter) ignoreExternalNameServices(names []string) {
// Services with type: ExternalName do not have associated Pods/Endpoints to wait for, so mark as ready.
for _, name := range names {
if iia.externalServices[name] {
iia.endpointExists[name] = true
}
}
}

func decodeIngress(u *unstructured.Unstructured) (*v1beta1.Ingress, error) {
b, err := u.MarshalJSON()
if err != nil {
Expand All @@ -276,6 +353,11 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() bool {

for _, rule := range obj.Spec.Rules {
for _, path := range rule.HTTP.Paths {
// Ignore ExternalName services
if iia.externalServices[path.Backend.ServiceName] {
continue
}

if !iia.endpointExists[path.Backend.ServiceName] {
iia.config.logStatus(diag.Error,
fmt.Sprintf("No matching service found for ingress rule: %q", path.Path))
Expand Down
51 changes: 35 additions & 16 deletions pkg/await/extensions_ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ func Test_Extensions_Ingress(t *testing.T) {
tests := []struct {
description string
ingressInput func(namespace, name, targetService string) *unstructured.Unstructured
do func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time)
do func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time)
expectedError error
}{
{
description: "Should succeed when Ingress is allocated an IP address and all paths match an existing Endpoint",
ingressInput: initializedIngress,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress and endpoint objects back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))
endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6"))
Expand All @@ -29,10 +29,25 @@ func Test_Extensions_Ingress(t *testing.T) {
settled <- struct{}{}
},
},
{
description: "Should succeed when Ingress is allocated an IP address and path references an ExternalName Service",
ingressInput: initializedIngress,
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress and endpoint objects back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))
services <- watchAddedEvent(externalNameService("default", "foo-4setj4y6"))

// Mark endpoint objects as having settled. Success.
settled <- struct{}{}

// Timeout, success.
timeout <- time.Now()
},
},
{
description: "Should fail if the Ingress does not have an IP address allocated, and not all paths match an existing Endpoint",
ingressInput: ingressInput,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// Trigger timeout.
timeout <- time.Now()
},
Expand All @@ -48,7 +63,7 @@ func Test_Extensions_Ingress(t *testing.T) {
{
description: "Should fail if not all Ingress paths match existing Endpoints",
ingressInput: ingressInput,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))

Expand All @@ -66,7 +81,7 @@ func Test_Extensions_Ingress(t *testing.T) {
{
description: "Should fail if Ingress is not allocated an IP address",
ingressInput: ingressInput,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes uninitialized service back.
ingresses <- watchAddedEvent(ingressInput("default", "foo", "foo-4setj4y6"))

Expand All @@ -92,13 +107,13 @@ func Test_Extensions_Ingress(t *testing.T) {
mockAwaitConfig(test.ingressInput("default", "foo", "foo-4setj4y6")))

ingresses := make(chan watch.Event)
services := make(chan watch.Event)
endpoints := make(chan watch.Event)
settled := make(chan struct{})
timeout := make(chan time.Time)
go test.do(ingresses, endpoints, settled, timeout)
go test.do(ingresses, services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: endpoints},
timeout, settled)
err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints}, settled, timeout)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand All @@ -109,6 +124,7 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
ingressInput func(namespace, name, targetService string) *unstructured.Unstructured
ingress func(namespace, name, targetService string) *unstructured.Unstructured
endpoint func(namespace, name string) *unstructured.Unstructured
service func(namespace, name string) *unstructured.Unstructured
expectedSubErrors []string
}{
{
Expand All @@ -118,13 +134,10 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
endpoint: initializedEndpoint,
},
{
description: "Read should fail if not all Ingress paths match existing Endpoints",
description: "Read should succeed when Ingress is allocated an IP address and all paths match an existing Endpoint",
ingressInput: ingressInput,
ingress: initializedIngress,
expectedSubErrors: []string{
"Ingress has at least one rule that does not target any Service. " +
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service",
},
service: externalNameService,
},
{
description: "Read should fail if Ingress not allocated an IP address",
Expand Down Expand Up @@ -155,12 +168,18 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
ingress := test.ingress("default", "foo", "foo-4setj4y6")

var err error
endpointList := unstructuredList()
serviceList := unstructuredList()
if test.endpoint != nil {
endpoint := test.endpoint("default", "foo-4setj4y6")
err = awaiter.read(ingress, unstructuredList(*endpoint))
} else {
err = awaiter.read(ingress, unstructuredList())
endpointList = unstructuredList(*endpoint)
}
if test.service != nil {
service := test.service("default", "foo-4setj4y6")
serviceList = unstructuredList(*service)
}
err = awaiter.read(ingress, endpointList, serviceList)

if test.expectedSubErrors != nil {
assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description)
} else {
Expand Down