diff --git a/CHANGELOG.md b/CHANGELOG.md index 7689b612bb..2469194534 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## HEAD (Unreleased) +- Relax ingress await restrictions (https://github.com/pulumi/pulumi-kubernetes/pull/1832) ## 3.12.1 (December 9, 2021) diff --git a/provider/pkg/await/ingress.go b/provider/pkg/await/ingress.go index 56b770597b..b25a14d61f 100644 --- a/provider/pkg/await/ingress.go +++ b/provider/pkg/await/ingress.go @@ -63,6 +63,7 @@ type ingressInitAwaiter struct { ingress *unstructured.Unstructured ingressReady bool endpointsSettled bool + endpointEventsCount uint64 knownEndpointObjects sets.String knownExternalNameServices sets.String } @@ -133,7 +134,7 @@ func (iia *ingressInitAwaiter) Await() error { go serviceInformer.Informer().Run(stopper) timeout := metadata.TimeoutDuration(iia.config.timeout, iia.config.currentInputs, DefaultIngressTimeoutMins*60) - return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(timeout)) + return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(60*time.Second), time.After(timeout)) } func (iia *ingressInitAwaiter) Read() error { @@ -204,6 +205,7 @@ func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoint func (iia *ingressInitAwaiter) await( ingressEvents, serviceEvents, endpointsEvents <-chan watch.Event, settled chan struct{}, + settlementGracePeriodExpired <-chan time.Time, timeout <-chan time.Time, ) error { iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path") @@ -234,6 +236,13 @@ func (iia *ingressInitAwaiter) await( object: iia.ingress, subErrors: iia.errorMessages(), } + case <-settlementGracePeriodExpired: + // If we don't see any endpoint events in the designated time, assume endpoints have settled. + // This is to account for the distinct possibility of ingress using a resource reference or non-existent + // endpoints - in which case we will never see corresponding endpoint events. + if iia.endpointEventsCount == 0 { + iia.endpointsSettled = true + } case <-settled: iia.endpointsSettled = true case event := <-ingressEvents: @@ -326,6 +335,7 @@ func decodeIngress(u *unstructured.Unstructured, to interface{}) error { func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) { apiVersion := iia.ingress.GetAPIVersion() + switch apiVersion { case "extensions/v1beta1", "networking.k8s.io/v1beta1": var obj networkingv1beta1.Ingress @@ -336,21 +346,27 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) { } for _, rule := range obj.Spec.Rules { - if rule.HTTP == nil { - iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s", - ".spec.rules[*].http", obj.Name)) - return apiVersion, false + var httpIngressPaths []networkingv1beta1.HTTPIngressPath + + if rule.HTTP != nil { + httpIngressPaths = rule.HTTP.Paths } - for _, path := range rule.HTTP.Paths { + for _, path := range httpIngressPaths { // Ignore ExternalName services if path.Backend.ServiceName != "" && iia.knownExternalNameServices.Has(path.Backend.ServiceName) { continue } if path.Backend.ServiceName != "" && !iia.knownEndpointObjects.Has(path.Backend.ServiceName) { - iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s", - expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName))) - return apiVersion, false + if iia.endpointsSettled { + // We haven't seen the target endpoint emit any events within the settlement period + // and there is a chance it may never exist. + iia.config.logStatus(diag.Warning, fmt.Sprintf("No matching service found for ingress rule: %s", + expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName))) + } else { + // We may get more endpoint events, lets wait and retry. + return apiVersion, false + } } } } @@ -362,12 +378,12 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) { } for _, rule := range obj.Spec.Rules { - if rule.HTTP == nil { - iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s", - ".spec.rules[*].http", obj.Name)) - return apiVersion, false + var httpIngressPaths []networkingv1.HTTPIngressPath + + if rule.HTTP != nil { + httpIngressPaths = rule.HTTP.Paths } - for _, path := range rule.HTTP.Paths { + for _, path := range httpIngressPaths { // TODO: Should we worry about "resource" backends? if path.Backend.Service == nil { continue @@ -379,9 +395,16 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) { } if path.Backend.Service.Name != "" && !iia.knownEndpointObjects.Has(path.Backend.Service.Name) { - iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s", - expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name))) - return apiVersion, false + if iia.endpointsSettled { + // We haven't seen the target endpoint emit any events within the settlement period + // and there is a chance it may never exist + // (https://github.com/pulumi/pulumi-kubernetes/issues/1810) + iia.config.logStatus(diag.Warning, fmt.Sprintf("No matching service found for ingress rule: %s", + expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name))) + } else { + // We may get more endpoint events, lets wait and retry. + return apiVersion, false + } } } } @@ -421,6 +444,8 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh return } + iia.endpointEventsCount++ + name := endpoint.GetName() switch event.Type { case watch.Added, watch.Modified: @@ -443,15 +468,8 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh func (iia *ingressInitAwaiter) errorMessages() []string { messages := make([]string, 0) - if apiVersion, ready := iia.checkIfEndpointsReady(); !ready { - field := ".spec.rules[].http.paths[].backend.serviceName" - switch apiVersion { - case "networking.k8s.io/v1": - field = ".spec.rules[].http.paths[].backend.service.name" - } - messages = append(messages, fmt.Sprintf( - "Ingress has at least one rule that does not target any Service. "+ - "Field '%v' may not match any active Service", field)) + if _, ready := iia.checkIfEndpointsReady(); !ready { + messages = append(messages, "Ingress has at least one rule with an unavailable target endpoint.") } if !iia.ingressReady { diff --git a/provider/pkg/await/ingress_test.go b/provider/pkg/await/ingress_test.go index 58b0a5ef4c..56e9530cd6 100644 --- a/provider/pkg/await/ingress_test.go +++ b/provider/pkg/await/ingress_test.go @@ -16,13 +16,13 @@ func Test_Extensions_Ingress(t *testing.T) { tests := []struct { description string ingressInput func(namespace, name, targetService string) *unstructured.Unstructured - do func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) + do func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) expectedError error }{ { description: "Should succeed when Ingress is allocated an IP address and all paths match an existing Endpoint", ingressInput: initializedIngress, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // API server passes initialized ingress and endpoint objects back. ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6")) endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6")) @@ -34,7 +34,7 @@ func Test_Extensions_Ingress(t *testing.T) { { description: "Should succeed when Ingress (networking/v1) is allocated an IP address and all paths match an existing Endpoint", ingressInput: initializedIngressV1, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // API server passes initialized ingress and endpoint objects back. ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6")) endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6")) @@ -46,7 +46,7 @@ func Test_Extensions_Ingress(t *testing.T) { { description: "Should succeed when Ingress is allocated an IP address and path references an ExternalName Service", ingressInput: initializedIngress, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // API server passes initialized ingress and endpoint objects back. ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6")) services <- watchAddedEvent(externalNameService("default", "foo-4setj4y6")) @@ -58,44 +58,74 @@ func Test_Extensions_Ingress(t *testing.T) { timeout <- time.Now() }, }, + { + description: "Should succeed when Ingress V1 is allocated an IP address and path references a Resource", + ingressInput: initializedIngressV1, + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { + // API server passes initialized ingress and endpoint objects back. + ingresses <- watchAddedEvent(initializedIngressV1WithResourceRef("default", "foo", "foo-4setj4y6")) + + // No endpoint events received in grace period. Success. + settlementGracePeriod <- time.Now() + }, + }, + { + description: "Should succeed when Ingress V1 is allocated an IP address and path references an ExternalName Service", + ingressInput: initializedIngressV1, + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { + // API server passes initialized ingress and endpoint objects back. + ingresses <- watchAddedEvent(initializedIngressV1("default", "foo", "foo-4setj4y6")) + services <- watchAddedEvent(externalNameService("default", "foo-4setj4y6")) + + // Mark endpoint objects as having settled. Success. + settled <- struct{}{} + + // Timeout, success. + timeout <- time.Now() + }, + }, { description: "Should fail if the Ingress does not have an IP address allocated, and not all paths match an existing Endpoint", ingressInput: ingressInput, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // Trigger timeout. timeout <- time.Now() }, expectedError: &timeoutError{ object: ingressInput("default", "foo", "foo-4setj4y6"), subErrors: []string{ - "Ingress has at least one rule that does not target any Service. " + - "Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service", + "Ingress has at least one rule with an unavailable target endpoint.", "Ingress .status.loadBalancer field was not updated with a hostname/IP address. " + "\n for more information about this error, see https://pulumi.io/xdv72s", }}, }, { - description: "Should fail if not all Ingress paths match existing Endpoints", + description: "Should succeed even if not all Ingress paths match existing Endpoints", ingressInput: ingressInput, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // API server passes initialized ingress back. ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6")) + // Mark endpoint objects as having settled. Success. settled <- struct{}{} + }, + }, + { + description: "Should succeed if no known endpoints exist and no endpoint events are seen", + ingressInput: ingressInput, + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { + // API server passes initialized ingress back. + ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6")) - // Finally, time out. - timeout <- time.Now() + // No events seen in grace period. Success. + settlementGracePeriod <- time.Now() }, - expectedError: &timeoutError{ - object: initializedIngress("default", "foo", "foo-4setj4y6"), - subErrors: []string{ - "Ingress has at least one rule that does not target any Service. " + - "Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service"}}, }, + { description: "Should succeed for Ingress with an unspecified path.", ingressInput: ingressInput, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // API server passes initialized ingress back. ingresses <- watchAddedEvent(initializedIngressUnspecifiedPath("default", "foo", "foo-4setj4y6")) endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6")) @@ -109,7 +139,7 @@ func Test_Extensions_Ingress(t *testing.T) { { description: "Should fail if Ingress is not allocated an IP address", ingressInput: ingressInput, - do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) { + do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, settlementGracePeriod, timeout chan time.Time) { // API server passes uninitialized service back. ingresses <- watchAddedEvent(ingressInput("default", "foo", "foo-4setj4y6")) @@ -131,18 +161,21 @@ func Test_Extensions_Ingress(t *testing.T) { } for _, test := range tests { - awaiter := makeIngressInitAwaiter( - mockAwaitConfig(test.ingressInput("default", "foo", "foo-4setj4y6"))) - - ingresses := make(chan watch.Event) - services := make(chan watch.Event) - endpoints := make(chan watch.Event) - settled := make(chan struct{}) - timeout := make(chan time.Time) - go test.do(ingresses, services, endpoints, settled, timeout) - - err := awaiter.await(ingresses, services, endpoints, settled, timeout) - assert.Equal(t, test.expectedError, err, test.description) + t.Run(test.description, func(t *testing.T) { + awaiter := makeIngressInitAwaiter( + mockAwaitConfig(test.ingressInput("default", "foo", "foo-4setj4y6"))) + + ingresses := make(chan watch.Event) + services := make(chan watch.Event) + endpoints := make(chan watch.Event) + settled := make(chan struct{}) + settlementGracePeriod := make(chan time.Time) + timeout := make(chan time.Time) + go test.do(ingresses, services, endpoints, settled, settlementGracePeriod, timeout) + + err := awaiter.await(ingresses, services, endpoints, settled, settlementGracePeriod, timeout) + assert.Equal(t, test.expectedError, err, test.description) + }) } } @@ -165,19 +198,11 @@ func Test_Extensions_Ingress_Read(t *testing.T) { description: "Read should fail if not all Ingress paths match existing Endpoints", ingressInput: ingressInput, ingress: initializedIngress, - expectedSubErrors: []string{ - "Ingress has at least one rule that does not target any Service. " + - "Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service", - }, }, { - description: "Read should fail if not all Ingress (networking/v1) paths match existing Endpoints", + description: "Read should succeed even if not all Ingress (networking/v1) paths match existing Endpoints", ingressInput: ingressInput, ingress: initializedIngressV1, - expectedSubErrors: []string{ - "Ingress has at least one rule that does not target any Service. " + - "Field '.spec.rules[].http.paths[].backend.service.name' may not match any active Service", - }, }, { description: "Read should succeed when Ingress is allocated an IP address and Service is type ExternalName", @@ -200,8 +225,6 @@ func Test_Extensions_Ingress_Read(t *testing.T) { ingressInput: ingressInput, ingress: ingressInput, expectedSubErrors: []string{ - "Ingress has at least one rule that does not target any Service. " + - "Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service", "Ingress .status.loadBalancer field was not updated with a hostname/IP address. " + "\n for more information about this error, see https://pulumi.io/xdv72s", }, @@ -209,28 +232,30 @@ func Test_Extensions_Ingress_Read(t *testing.T) { } for _, test := range tests { - awaiter := makeIngressInitAwaiter(mockAwaitConfig( - test.ingressInput("default", "foo", "foo-4setj4y6"))) - ingress := test.ingress("default", "foo", "foo-4setj4y6") - - var err error - endpointList := unstructuredList() - serviceList := unstructuredList() - if test.endpoint != nil { - endpoint := test.endpoint("default", "foo-4setj4y6") - endpointList = unstructuredList(*endpoint) - } - if test.service != nil { - service := test.service("default", "foo-4setj4y6") - serviceList = unstructuredList(*service) - } - err = awaiter.read(ingress, endpointList, serviceList) - - if test.expectedSubErrors != nil { - assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description) - } else { - assert.Nil(t, err, test.description) - } + t.Run(test.description, func(t *testing.T) { + awaiter := makeIngressInitAwaiter(mockAwaitConfig( + test.ingressInput("default", "foo", "foo-4setj4y6"))) + ingress := test.ingress("default", "foo", "foo-4setj4y6") + + var err error + endpointList := unstructuredList() + serviceList := unstructuredList() + if test.endpoint != nil { + endpoint := test.endpoint("default", "foo-4setj4y6") + endpointList = unstructuredList(*endpoint) + } + if test.service != nil { + service := test.service("default", "foo-4setj4y6") + serviceList = unstructuredList(*service) + } + err = awaiter.read(ingress, endpointList, serviceList) + + if test.expectedSubErrors != nil { + assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description) + } else { + assert.Nil(t, err, test.description) + } + }) } } @@ -362,6 +387,50 @@ func initializedIngressV1(namespace, name, targetService string) *unstructured.U return obj } +func initializedIngressV1WithResourceRef(namespace, name, targetService string) *unstructured.Unstructured { + obj, err := decodeUnstructured(fmt.Sprintf(`{ + "apiVersion": "networking.k8s.io/v1", + "kind": "Ingress", + "metadata": { + "name": "%s", + "namespace": "%s" + }, + "spec": { + "rules": [ + { + "http": { + "paths": [ + { + "backend": { + "resource": { + "apiGroup": "k8s.example.com", + "kind": "StorageBucket", + "name": "icon-assets" + } + }, + "path": "/nginx" + } + ] + } + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "hostname": "localhost" + } + ] + } + } +}`, name, namespace)) + if err != nil { + panic(err) + } + return obj +} + func initializedIngressUnspecifiedPath(namespace, name, targetService string) *unstructured.Unstructured { obj, err := decodeUnstructured(fmt.Sprintf(`{ "apiVersion": "extensions/v1beta1", diff --git a/provider/pkg/provider/helm_release.go b/provider/pkg/provider/helm_release.go index 3facc779f8..31d6464784 100644 --- a/provider/pkg/provider/helm_release.go +++ b/provider/pkg/provider/helm_release.go @@ -212,7 +212,7 @@ func newHelmReleaseProvider( } func debug(format string, a ...interface{}) { - logger.V(3).Infof("[DEBUG] %s", fmt.Sprintf(format, a...)) + logger.V(6).Infof("[DEBUG] %s", fmt.Sprintf(format, a...)) } func (r *helmReleaseProvider) getActionConfig(namespace string) (*action.Configuration, error) {