Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ingress awaiter for ExternalName Services #320

Merged
merged 2 commits into from
Jan 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 104 additions & 42 deletions pkg/await/extensions_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/client"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi/pkg/diag"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
)

Expand Down Expand Up @@ -44,22 +46,22 @@ import (
// ------------------------------------------------------------------------------------------------

type ingressInitAwaiter struct {
config createAwaitConfig
ingress *unstructured.Unstructured
ingressReady bool
endpointsReady bool
endpointsSettled bool
endpointExists map[string]bool
config createAwaitConfig
ingress *unstructured.Unstructured
ingressReady bool
endpointsSettled bool
knownEndpointObjects sets.String
knownExternalNameServices sets.String
}

func makeIngressInitAwaiter(c createAwaitConfig) *ingressInitAwaiter {
return &ingressInitAwaiter{
config: c,
ingress: c.currentOutputs,
ingressReady: false,
endpointsReady: false,
endpointsSettled: false,
endpointExists: make(map[string]bool),
config: c,
ingress: c.currentOutputs,
ingressReady: false,
endpointsSettled: false,
knownEndpointObjects: sets.NewString(),
knownExternalNameServices: sets.NewString(),
}
}

Expand All @@ -79,9 +81,10 @@ func (iia *ingressInitAwaiter) Await() error {
//
// We succeed only when all of the following are true:
//
// 1. Ingress object exists.
// 2. Endpoint objects exist with matching names for each Ingress path.
// 3. Ingress entry exists for .status.loadBalancer.ingress.
// 1. Ingress object exists.
// 2. Endpoint objects exist with matching names for each Ingress path (except when Service
// type is ExternalName).
// 3. Ingress entry exists for .status.loadBalancer.ingress.
//

// Create ingress watcher.
Expand Down Expand Up @@ -109,7 +112,23 @@ func (iia *ingressInitAwaiter) Await() error {
}
defer endpointWatcher.Stop()

return iia.await(ingressWatcher, endpointWatcher, time.After(10*time.Minute), make(chan struct{}))
servicesClient, err := client.FromGVK(iia.config.pool, iia.config.disco, schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
}, iia.config.currentInputs.GetNamespace())
if err != nil {
glog.V(3).Infof("Failed to initialize Services client: %v", err)
return err
}
serviceWatcher, err := servicesClient.Watch(metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for Service objects associated with Ingress %q",
iia.config.currentInputs.GetName())
}

return iia.await(ingressWatcher, serviceWatcher, endpointWatcher, make(chan struct{}), time.After(10*time.Minute))
}

func (iia *ingressInitAwaiter) Read() error {
Expand Down Expand Up @@ -137,16 +156,36 @@ func (iia *ingressInitAwaiter) Read() error {
endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

return iia.read(ingress, endpointList.(*unstructured.UnstructuredList))
servicesClient, err := client.FromGVK(iia.config.pool, iia.config.disco, schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
}, iia.config.currentInputs.GetNamespace())
if err != nil {
glog.V(3).Infof("Failed to initialize Services client: %v", err)
return err
}
serviceList, err := servicesClient.List(metav1.ListOptions{})
if err != nil {
glog.V(3).Infof("Failed to list services needed for Ingress awaiter: %v", err)
serviceList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

return iia.read(ingress, endpointList.(*unstructured.UnstructuredList), serviceList.(*unstructured.UnstructuredList))
}

func (iia *ingressInitAwaiter) read(
ingress *unstructured.Unstructured,
endpoints *unstructured.UnstructuredList,
) error {
func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoints *unstructured.UnstructuredList,
services *unstructured.UnstructuredList) error {
iia.processIngressEvent(watchAddedEvent(ingress))

var err error
err := services.EachListItem(func(service runtime.Object) error {
iia.processServiceEvent(watchAddedEvent(service.(*unstructured.Unstructured)))
return nil
})
if err != nil {
glog.V(3).Infof("Error iterating over endpoint list for service %q: %v", ingress.GetName(), err)
}

settled := make(chan struct{})

glog.V(3).Infof("Processing endpoint list: %#v", endpoints)
Expand All @@ -170,10 +209,8 @@ func (iia *ingressInitAwaiter) read(
}

// await is a helper companion to `Await` designed to make it easy to test this module.
func (iia *ingressInitAwaiter) await(
ingressWatcher, endpointWatcher watch.Interface, timeout <-chan time.Time,
settled chan struct{},
) error {
func (iia *ingressInitAwaiter) await(ingressWatcher, serviceWatcher, endpointWatcher watch.Interface,
settled chan struct{}, timeout <-chan time.Time) error {
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")

for {
Expand All @@ -186,7 +223,7 @@ func (iia *ingressInitAwaiter) await(
select {
case <-iia.config.ctx.Done():
// On cancel, check one last time if the ingress is ready.
if iia.ingressReady && iia.endpointsReady {
if iia.ingressReady && iia.checkIfEndpointsReady() {
return nil
}
return &cancellationError{
Expand All @@ -195,7 +232,7 @@ func (iia *ingressInitAwaiter) await(
}
case <-timeout:
// On timeout, check one last time if the ingress is ready.
if iia.ingressReady && iia.endpointsReady {
if iia.ingressReady && iia.checkIfEndpointsReady() {
return nil
}
return &timeoutError{
Expand All @@ -208,16 +245,39 @@ func (iia *ingressInitAwaiter) await(
iia.processIngressEvent(event)
case event := <-endpointWatcher.ResultChan():
iia.processEndpointEvent(event, settled)
case event := <-serviceWatcher.ResultChan():
iia.processServiceEvent(event)
}
}
}

func (iia *ingressInitAwaiter) processServiceEvent(event watch.Event) {
service, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
glog.V(3).Infof("Service watch received unknown object type %q",
reflect.TypeOf(service))
return
}

name := service.GetName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if event.Type == watch.Deleted {, don't we want to delete this from iia.externalServices? And now that I think about it, is there a reason we don't do this for iia.endpointExists as well?


if event.Type == watch.Deleted {
iia.knownExternalNameServices.Delete(name)
return
}

t, ok := openapi.Pluck(service.Object, "spec", "type")
if ok && t.(string) == "ExternalName" {
iia.knownExternalNameServices.Insert(name)
}
}

func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) {
inputIngressName := iia.config.currentInputs.GetName()

ingress, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
glog.V(3).Infof("Ingress watch received unknown object type '%s'",
glog.V(3).Infof("Ingress watch received unknown object type %q",
reflect.TypeOf(ingress))
return
}
Expand All @@ -242,14 +302,12 @@ func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) {
return
}

iia.endpointsReady = iia.checkIfEndpointsReady()

glog.V(3).Infof("Received status for ingress '%s': %#v", inputIngressName, obj.Status)
glog.V(3).Infof("Received status for ingress %q: %#v", inputIngressName, obj.Status)

// Update status of ingress object so that we can check success.
iia.ingressReady = len(obj.Status.LoadBalancer.Ingress) > 0

glog.V(3).Infof("Waiting for ingress '%q' to update .status.loadBalancer with hostname/IP",
glog.V(3).Infof("Waiting for ingress %q to update .status.loadBalancer with hostname/IP",
inputIngressName)
}

Expand All @@ -276,7 +334,12 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() bool {

for _, rule := range obj.Spec.Rules {
for _, path := range rule.HTTP.Paths {
if !iia.endpointExists[path.Backend.ServiceName] {
// Ignore ExternalName services
if iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Error,
fmt.Sprintf("No matching service found for ingress rule: %q", path.Path))
return false
Expand All @@ -299,14 +362,13 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
name := endpoint.GetName()
switch event.Type {
case watch.Added, watch.Modified:
iia.endpointExists[name] = true
iia.knownEndpointObjects.Insert(name)
case watch.Deleted:
iia.endpointExists[name] = false
iia.knownEndpointObjects.Delete(name)
// NOTE: Unlike `processServiceEvent` don't return; we still want to set
// `iia.endpointsSettled` to `false`.
}

// Start over, prove that endpoints are ready.
iia.endpointsReady = iia.checkIfEndpointsReady()

// Every time we get an update to one of our endpoints objects, give it a few seconds
// for them to settle.
iia.endpointsSettled = false
Expand All @@ -319,7 +381,7 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
func (iia *ingressInitAwaiter) errorMessages() []string {
messages := make([]string, 0)

if !iia.endpointsReady {
if !iia.checkIfEndpointsReady() {
messages = append(messages,
"Ingress has at least one rule that does not target any Service. "+
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service")
Expand All @@ -335,10 +397,10 @@ func (iia *ingressInitAwaiter) errorMessages() []string {
}

func (iia *ingressInitAwaiter) checkAndLogStatus() bool {
success := iia.ingressReady && iia.endpointsReady
success := iia.ingressReady && iia.checkIfEndpointsReady()
if success {
iia.config.logStatus(diag.Info, "✅ Ingress initialization complete")
} else if iia.endpointsReady {
} else if iia.checkIfEndpointsReady() {
iia.config.logStatus(diag.Info, "[2/3] Waiting for update of .status.loadBalancer with hostname/IP")
}

Expand Down
50 changes: 39 additions & 11 deletions pkg/await/extensions_ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ func Test_Extensions_Ingress(t *testing.T) {
tests := []struct {
description string
ingressInput func(namespace, name, targetService string) *unstructured.Unstructured
do func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time)
do func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time)
expectedError error
}{
{
description: "Should succeed when Ingress is allocated an IP address and all paths match an existing Endpoint",
ingressInput: initializedIngress,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress and endpoint objects back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))
endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6"))
Expand All @@ -29,10 +29,25 @@ func Test_Extensions_Ingress(t *testing.T) {
settled <- struct{}{}
},
},
{
description: "Should succeed when Ingress is allocated an IP address and path references an ExternalName Service",
ingressInput: initializedIngress,
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress and endpoint objects back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))
services <- watchAddedEvent(externalNameService("default", "foo-4setj4y6"))

// Mark endpoint objects as having settled. Success.
settled <- struct{}{}

// Timeout, success.
timeout <- time.Now()
},
},
{
description: "Should fail if the Ingress does not have an IP address allocated, and not all paths match an existing Endpoint",
ingressInput: ingressInput,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// Trigger timeout.
timeout <- time.Now()
},
Expand All @@ -48,7 +63,7 @@ func Test_Extensions_Ingress(t *testing.T) {
{
description: "Should fail if not all Ingress paths match existing Endpoints",
ingressInput: ingressInput,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))

Expand All @@ -66,7 +81,7 @@ func Test_Extensions_Ingress(t *testing.T) {
{
description: "Should fail if Ingress is not allocated an IP address",
ingressInput: ingressInput,
do: func(ingresses, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes uninitialized service back.
ingresses <- watchAddedEvent(ingressInput("default", "foo", "foo-4setj4y6"))

Expand All @@ -92,13 +107,13 @@ func Test_Extensions_Ingress(t *testing.T) {
mockAwaitConfig(test.ingressInput("default", "foo", "foo-4setj4y6")))

ingresses := make(chan watch.Event)
services := make(chan watch.Event)
endpoints := make(chan watch.Event)
settled := make(chan struct{})
timeout := make(chan time.Time)
go test.do(ingresses, endpoints, settled, timeout)
go test.do(ingresses, services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: endpoints},
timeout, settled)
err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints}, settled, timeout)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand All @@ -109,6 +124,7 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
ingressInput func(namespace, name, targetService string) *unstructured.Unstructured
ingress func(namespace, name, targetService string) *unstructured.Unstructured
endpoint func(namespace, name string) *unstructured.Unstructured
service func(namespace, name string) *unstructured.Unstructured
expectedSubErrors []string
}{
{
Expand All @@ -126,6 +142,12 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service",
},
},
{
description: "Read should succeed when Ingress is allocated an IP address and Service is type ExternalName",
ingressInput: ingressInput,
ingress: initializedIngress,
service: externalNameService,
},
{
description: "Read should fail if Ingress not allocated an IP address",
ingressInput: ingressInput,
Expand Down Expand Up @@ -155,12 +177,18 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
ingress := test.ingress("default", "foo", "foo-4setj4y6")

var err error
endpointList := unstructuredList()
serviceList := unstructuredList()
if test.endpoint != nil {
endpoint := test.endpoint("default", "foo-4setj4y6")
err = awaiter.read(ingress, unstructuredList(*endpoint))
} else {
err = awaiter.read(ingress, unstructuredList())
endpointList = unstructuredList(*endpoint)
}
if test.service != nil {
service := test.service("default", "foo-4setj4y6")
serviceList = unstructuredList(*service)
}
err = awaiter.read(ingress, endpointList, serviceList)

if test.expectedSubErrors != nil {
assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description)
} else {
Expand Down