Skip to content

Commit

Permalink
Relax ingress await restrictions (#1832)
Browse files Browse the repository at this point in the history
* Relax ingress await restrictions

* Add a settlement grace period

* Relax ingress await restrictions

* Add a settlement grace period

* Changelog

* Review comments
  • Loading branch information
Vivek Lakshmanan committed Dec 10, 2021
1 parent 434a8df commit 839fa82
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,4 +1,5 @@
## HEAD (Unreleased)
- Relax ingress await restrictions (https://github.com/pulumi/pulumi-kubernetes/pull/1832)

## 3.12.1 (December 9, 2021)

Expand Down
70 changes: 44 additions & 26 deletions provider/pkg/await/ingress.go
Expand Up @@ -63,6 +63,7 @@ type ingressInitAwaiter struct {
ingress *unstructured.Unstructured
ingressReady bool
endpointsSettled bool
endpointEventsCount uint64
knownEndpointObjects sets.String
knownExternalNameServices sets.String
}
Expand Down Expand Up @@ -133,7 +134,7 @@ func (iia *ingressInitAwaiter) Await() error {
go serviceInformer.Informer().Run(stopper)

timeout := metadata.TimeoutDuration(iia.config.timeout, iia.config.currentInputs, DefaultIngressTimeoutMins*60)
return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(timeout))
return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(60*time.Second), time.After(timeout))
}

func (iia *ingressInitAwaiter) Read() error {
Expand Down Expand Up @@ -204,6 +205,7 @@ func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoint
func (iia *ingressInitAwaiter) await(
ingressEvents, serviceEvents, endpointsEvents <-chan watch.Event,
settled chan struct{},
settlementGracePeriodExpired <-chan time.Time,
timeout <-chan time.Time,
) error {
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")
Expand Down Expand Up @@ -234,6 +236,13 @@ func (iia *ingressInitAwaiter) await(
object: iia.ingress,
subErrors: iia.errorMessages(),
}
case <-settlementGracePeriodExpired:
// If we don't see any endpoint events in the designated time, assume endpoints have settled.
// This is to account for the distinct possibility of ingress using a resource reference or non-existent
// endpoints - in which case we will never see corresponding endpoint events.
if iia.endpointEventsCount == 0 {
iia.endpointsSettled = true
}
case <-settled:
iia.endpointsSettled = true
case event := <-ingressEvents:
Expand Down Expand Up @@ -326,6 +335,7 @@ func decodeIngress(u *unstructured.Unstructured, to interface{}) error {

func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
apiVersion := iia.ingress.GetAPIVersion()

switch apiVersion {
case "extensions/v1beta1", "networking.k8s.io/v1beta1":
var obj networkingv1beta1.Ingress
Expand All @@ -336,21 +346,27 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
}

for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
var httpIngressPaths []networkingv1beta1.HTTPIngressPath

if rule.HTTP != nil {
httpIngressPaths = rule.HTTP.Paths
}
for _, path := range rule.HTTP.Paths {
for _, path := range httpIngressPaths {
// Ignore ExternalName services
if path.Backend.ServiceName != "" && iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if path.Backend.ServiceName != "" && !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
return apiVersion, false
if iia.endpointsSettled {
// We haven't seen the target endpoint emit any events within the settlement period
// and there is a chance it may never exist.
iia.config.logStatus(diag.Warning, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
} else {
// We may get more endpoint events, lets wait and retry.
return apiVersion, false
}
}
}
}
Expand All @@ -362,12 +378,12 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
}

for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
var httpIngressPaths []networkingv1.HTTPIngressPath

if rule.HTTP != nil {
httpIngressPaths = rule.HTTP.Paths
}
for _, path := range rule.HTTP.Paths {
for _, path := range httpIngressPaths {
// TODO: Should we worry about "resource" backends?
if path.Backend.Service == nil {
continue
Expand All @@ -379,9 +395,16 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
}

if path.Backend.Service.Name != "" && !iia.knownEndpointObjects.Has(path.Backend.Service.Name) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name)))
return apiVersion, false
if iia.endpointsSettled {
// We haven't seen the target endpoint emit any events within the settlement period
// and there is a chance it may never exist
// (https://github.com/pulumi/pulumi-kubernetes/issues/1810)
iia.config.logStatus(diag.Warning, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name)))
} else {
// We may get more endpoint events, lets wait and retry.
return apiVersion, false
}
}
}
}
Expand Down Expand Up @@ -421,6 +444,8 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
return
}

iia.endpointEventsCount++

name := endpoint.GetName()
switch event.Type {
case watch.Added, watch.Modified:
Expand All @@ -443,15 +468,8 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
func (iia *ingressInitAwaiter) errorMessages() []string {
messages := make([]string, 0)

if apiVersion, ready := iia.checkIfEndpointsReady(); !ready {
field := ".spec.rules[].http.paths[].backend.serviceName"
switch apiVersion {
case "networking.k8s.io/v1":
field = ".spec.rules[].http.paths[].backend.service.name"
}
messages = append(messages, fmt.Sprintf(
"Ingress has at least one rule that does not target any Service. "+
"Field '%v' may not match any active Service", field))
if _, ready := iia.checkIfEndpointsReady(); !ready {
messages = append(messages, "Ingress has at least one rule with an unavailable target endpoint.")
}

if !iia.ingressReady {
Expand Down

0 comments on commit 839fa82

Please sign in to comment.