Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax ingress await restrictions #1832

Merged
merged 7 commits into from Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 44 additions & 26 deletions provider/pkg/await/ingress.go
Expand Up @@ -63,6 +63,7 @@ type ingressInitAwaiter struct {
ingress *unstructured.Unstructured
ingressReady bool
endpointsSettled bool
endpointEventsCount uint64
knownEndpointObjects sets.String
knownExternalNameServices sets.String
}
Expand Down Expand Up @@ -133,7 +134,7 @@ func (iia *ingressInitAwaiter) Await() error {
go serviceInformer.Informer().Run(stopper)

timeout := metadata.TimeoutDuration(iia.config.timeout, iia.config.currentInputs, DefaultIngressTimeoutMins*60)
return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(timeout))
return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(60*time.Second), time.After(timeout))
}

func (iia *ingressInitAwaiter) Read() error {
Expand Down Expand Up @@ -204,6 +205,7 @@ func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoint
func (iia *ingressInitAwaiter) await(
ingressEvents, serviceEvents, endpointsEvents <-chan watch.Event,
settled chan struct{},
settlementGracePeriodExpired <-chan time.Time,
timeout <-chan time.Time,
) error {
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")
Expand Down Expand Up @@ -234,6 +236,13 @@ func (iia *ingressInitAwaiter) await(
object: iia.ingress,
subErrors: iia.errorMessages(),
}
case <-settlementGracePeriodExpired:
// If we don't see any endpoint events in the designated time, assume endpoints have settled.
// This is to account for the distinct possibility of ingress using a resource reference or non-existent
// endpoints - in which case we will never see corresponding endpoint events.
if iia.endpointEventsCount == 0 {
iia.endpointsSettled = true
}
case <-settled:
iia.endpointsSettled = true
case event := <-ingressEvents:
Expand Down Expand Up @@ -326,6 +335,7 @@ func decodeIngress(u *unstructured.Unstructured, to interface{}) error {

func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
apiVersion := iia.ingress.GetAPIVersion()

switch apiVersion {
case "extensions/v1beta1", "networking.k8s.io/v1beta1":
var obj networkingv1beta1.Ingress
Expand All @@ -336,21 +346,27 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
}

for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
var httpRules []networkingv1beta1.HTTPIngressPath
lblackstone marked this conversation as resolved.
Show resolved Hide resolved

if rule.HTTP != nil {
httpRules = rule.HTTP.Paths
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
}
for _, path := range rule.HTTP.Paths {
for _, path := range httpRules {
// Ignore ExternalName services
if path.Backend.ServiceName != "" && iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if path.Backend.ServiceName != "" && !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
return apiVersion, false
if iia.endpointsSettled {
// We haven't seen the target endpoint emit any events within the settlement period
// and there is a chance it may never exist.
iia.config.logStatus(diag.Warning, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
} else {
// We may get more endpoint events, lets wait and retry.
return apiVersion, false
}
}
}
}
Expand All @@ -362,12 +378,12 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
}

for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
var httpRules []networkingv1.HTTPIngressPath

if rule.HTTP != nil {
httpRules = rule.HTTP.Paths
}
for _, path := range rule.HTTP.Paths {
for _, path := range httpRules {
// TODO: Should we worry about "resource" backends?
if path.Backend.Service == nil {
continue
Expand All @@ -379,9 +395,16 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
}

if path.Backend.Service.Name != "" && !iia.knownEndpointObjects.Has(path.Backend.Service.Name) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name)))
return apiVersion, false
if iia.endpointsSettled {
// We haven't seen the target endpoint emit any events within the settlement period
// and there is a chance it may never exist
// (https://github.com/pulumi/pulumi-kubernetes/issues/1810)
iia.config.logStatus(diag.Warning, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name)))
} else {
// We may get more endpoint events, lets wait and retry.
return apiVersion, false
}
}
}
}
Expand Down Expand Up @@ -421,6 +444,8 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
return
}

iia.endpointEventsCount++

name := endpoint.GetName()
switch event.Type {
case watch.Added, watch.Modified:
Expand All @@ -443,15 +468,8 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
func (iia *ingressInitAwaiter) errorMessages() []string {
messages := make([]string, 0)

if apiVersion, ready := iia.checkIfEndpointsReady(); !ready {
field := ".spec.rules[].http.paths[].backend.serviceName"
switch apiVersion {
case "networking.k8s.io/v1":
field = ".spec.rules[].http.paths[].backend.service.name"
}
messages = append(messages, fmt.Sprintf(
"Ingress has at least one rule that does not target any Service. "+
"Field '%v' may not match any active Service", field))
if _, ready := iia.checkIfEndpointsReady(); !ready {
messages = append(messages, "Ingress has at least one rule who's target endpoint didn't become available in time.")
lblackstone marked this conversation as resolved.
Show resolved Hide resolved
}

if !iia.ingressReady {
Expand Down