Skip to content

Commit

Permalink
Add await support for networking.k8s.io/v1 variant of ingress (#1795)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vivek Lakshmanan committed Nov 12, 2021
1 parent 88f81f5 commit 1530785
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 152 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## HEAD (Unreleased)
- Add await support for networking.k8s.io/v1 variant of ingress (https://github.com/pulumi/pulumi-kubernetes/pull/1795)

- Schematize overlay types (https://github.com/pulumi/pulumi-kubernetes/pull/1793)

Expand Down
18 changes: 13 additions & 5 deletions provider/pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ const (
coreV1ServiceAccount = "v1/ServiceAccount"
extensionsV1Beta1Deployment = "extensions/v1beta1/Deployment"
extensionsV1Beta1Ingress = "extensions/v1beta1/Ingress"
networkingV1Ingress = "networking.k8s.io/v1/Ingress"
networkingV1Beta1Ingress = "networking.k8s.io/v1beta1/Ingress"
rbacAuthorizationV1ClusterRole = "rbac.authorization.k8s.io/v1/ClusterRole"
rbacAuthorizationV1ClusterRoleBinding = "rbac.authorization.k8s.io/v1/ClusterRoleBinding"
rbacAuthorizationV1Role = "rbac.authorization.k8s.io/v1/Role"
Expand Down Expand Up @@ -150,6 +152,12 @@ var deploymentAwaiter = awaitSpec{
awaitDeletion: untilAppsDeploymentDeleted,
}

var ingressAwaiter = awaitSpec{
awaitCreation: awaitIngressInit,
awaitRead: awaitIngressRead,
awaitUpdate: awaitIngressUpdate,
}

var jobAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return makeJobInitAwaiter(c).Await()
Expand Down Expand Up @@ -224,11 +232,11 @@ var awaiters = map[string]awaitSpec{
awaitCreation: untilCoreV1ServiceAccountInitialized,
},
extensionsV1Beta1Deployment: deploymentAwaiter,
extensionsV1Beta1Ingress: {
awaitCreation: awaitIngressInit,
awaitRead: awaitIngressRead,
awaitUpdate: awaitIngressUpdate,
},

extensionsV1Beta1Ingress: ingressAwaiter,
networkingV1Beta1Ingress: ingressAwaiter,
networkingV1Ingress: ingressAwaiter,

rbacAuthorizationV1ClusterRole: { /* NONE */ },
rbacAuthorizationV1ClusterRoleBinding: { /* NONE */ },
rbacAuthorizationV1Role: { /* NONE */ },
Expand Down
126 changes: 86 additions & 40 deletions provider/pkg/await/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
networkingv1b1 "k8s.io/api/networking/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -217,7 +218,7 @@ func (iia *ingressInitAwaiter) await(
select {
case <-iia.config.ctx.Done():
// On cancel, check one last time if the ingress is ready.
if iia.ingressReady && iia.checkIfEndpointsReady() {
if _, ready := iia.checkIfEndpointsReady(); ready && iia.ingressReady {
return nil
}
return &cancellationError{
Expand All @@ -226,7 +227,7 @@ func (iia *ingressInitAwaiter) await(
}
case <-timeout:
// On timeout, check one last time if the ingress is ready.
if iia.ingressReady && iia.checkIfEndpointsReady() {
if _, ready := iia.checkIfEndpointsReady(); ready && iia.ingressReady {
return nil
}
return &timeoutError{
Expand Down Expand Up @@ -290,64 +291,103 @@ func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) {
}

iia.ingress = ingress
obj, err := decodeIngress(ingress)
if err != nil {

// To the best of my knowledge, this works across all known ingress api version variations.
ingressesRaw, ok := openapi.Pluck(ingress.Object, "status", "loadBalancer", "ingress")
if !ok {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", ingress)
return
}

logger.V(3).Infof("Received status for ingress %q: %#v", inputIngressName, obj.Status)
ingresses, ok := ingressesRaw.([]interface{})
if !ok {
logger.V(3).Infof("Unexpected ingress object structure from unstructured: %#v", ingress)
return
}

// Update status of ingress object so that we can check success.
iia.ingressReady = len(obj.Status.LoadBalancer.Ingress) > 0
iia.ingressReady = len(ingresses) > 0

logger.V(3).Infof("Waiting for ingress %q to update .status.loadBalancer with hostname/IP",
inputIngressName)
}

func decodeIngress(u *unstructured.Unstructured) (*networkingv1b1.Ingress, error) {
func decodeIngress(u *unstructured.Unstructured, to interface{}) error {
b, err := u.MarshalJSON()
if err != nil {
return nil, err
return err
}
var obj networkingv1b1.Ingress
err = json.Unmarshal(b, &obj)
err = json.Unmarshal(b, to)
if err != nil {
return nil, err
return err
}

return &obj, nil
return nil
}

func (iia *ingressInitAwaiter) checkIfEndpointsReady() bool {
obj, err := decodeIngress(iia.ingress)
if err != nil {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", iia.ingress)
return false
}
func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
apiVersion := iia.ingress.GetAPIVersion()
switch apiVersion {
case "extensions/v1beta1", "networking.k8s.io/v1beta1":
var obj networkingv1beta1.Ingress

for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return false
if err := decodeIngress(iia.ingress, &obj); err != nil {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", iia.ingress)
return apiVersion, false
}
for _, path := range rule.HTTP.Paths {
// Ignore ExternalName services
if iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
}
for _, path := range rule.HTTP.Paths {
// Ignore ExternalName services
if path.Backend.ServiceName != "" && iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if path.Backend.ServiceName != "" && !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
return apiVersion, false
}
}
}
case "networking.k8s.io/v1":
var obj networkingv1.Ingress
if err := decodeIngress(iia.ingress, &obj); err != nil {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", iia.ingress)
return apiVersion, false
}

return false
for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
}
for _, path := range rule.HTTP.Paths {
// TODO: Should we worry about "resource" backends?
if path.Backend.Service == nil {
continue
}

// Ignore ExternalName services
if path.Backend.Service.Name != "" && iia.knownExternalNameServices.Has(path.Backend.Service.Name) {
continue
}

if path.Backend.Service.Name != "" && !iia.knownEndpointObjects.Has(path.Backend.Service.Name) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name)))
return apiVersion, false
}
}
}
}

return true
return apiVersion, true
}

// expectedIngressPath is a helper to print a useful error message.
Expand Down Expand Up @@ -403,10 +443,15 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
func (iia *ingressInitAwaiter) errorMessages() []string {
messages := make([]string, 0)

if !iia.checkIfEndpointsReady() {
messages = append(messages,
if apiVersion, ready := iia.checkIfEndpointsReady(); !ready {
field := ".spec.rules[].http.paths[].backend.serviceName"
switch apiVersion {
case "networking.k8s.io/v1":
field = ".spec.rules[].http.paths[].backend.service.name"
}
messages = append(messages, fmt.Sprintf(
"Ingress has at least one rule that does not target any Service. "+
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service")
"Field '%v' may not match any active Service", field))
}

if !iia.ingressReady {
Expand All @@ -419,11 +464,12 @@ func (iia *ingressInitAwaiter) errorMessages() []string {
}

func (iia *ingressInitAwaiter) checkAndLogStatus() bool {
success := iia.ingressReady && iia.checkIfEndpointsReady()
_, ready := iia.checkIfEndpointsReady()
success := iia.ingressReady && ready
if success {
iia.config.logStatus(diag.Info,
fmt.Sprintf("%sIngress initialization complete", cmdutil.EmojiOr("✅ ", "")))
} else if iia.checkIfEndpointsReady() {
} else if ready {
iia.config.logStatus(diag.Info, "[2/3] Waiting for update of .status.loadBalancer with hostname/IP")
}

Expand Down
68 changes: 67 additions & 1 deletion provider/pkg/await/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ func Test_Extensions_Ingress(t *testing.T) {
settled <- struct{}{}
},
},
{
description: "Should succeed when Ingress (networking/v1) is allocated an IP address and all paths match an existing Endpoint",
ingressInput: initializedIngressV1,
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress and endpoint objects back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))
endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6"))

// Mark endpoint objects as having settled. Success.
settled <- struct{}{}
},
},
{
description: "Should succeed when Ingress is allocated an IP address and path references an ExternalName Service",
ingressInput: initializedIngress,
Expand Down Expand Up @@ -158,6 +170,15 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service",
},
},
{
description: "Read should fail if not all Ingress (networking/v1) paths match existing Endpoints",
ingressInput: ingressInput,
ingress: initializedIngressV1,
expectedSubErrors: []string{
"Ingress has at least one rule that does not target any Service. " +
"Field '.spec.rules[].http.paths[].backend.service.name' may not match any active Service",
},
},
{
description: "Read should succeed when Ingress is allocated an IP address and Service is type ExternalName",
ingressInput: ingressInput,
Expand Down Expand Up @@ -257,7 +278,7 @@ func ingressInput(namespace, name, targetService string) *unstructured.Unstructu

func initializedIngress(namespace, name, targetService string) *unstructured.Unstructured {
obj, err := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "extensions/v1beta1",
"apiVersion": "networking.k8s.io/v1beta1",
"kind": "Ingress",
"metadata": {
"name": "%s",
Expand Down Expand Up @@ -296,6 +317,51 @@ func initializedIngress(namespace, name, targetService string) *unstructured.Uns
return obj
}

func initializedIngressV1(namespace, name, targetService string) *unstructured.Unstructured {
obj, err := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "networking.k8s.io/v1",
"kind": "Ingress",
"metadata": {
"name": "%s",
"namespace": "%s"
},
"spec": {
"rules": [
{
"http": {
"paths": [
{
"backend": {
"service": {
"name": "%s",
"port": {
"number": 80
}
}
},
"path": "/nginx"
}
]
}
}
]
},
"status": {
"loadBalancer": {
"ingress": [
{
"hostname": "localhost"
}
]
}
}
}`, name, namespace, targetService))
if err != nil {
panic(err)
}
return obj
}

func initializedIngressUnspecifiedPath(namespace, name, targetService string) *unstructured.Unstructured {
obj, err := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "extensions/v1beta1",
Expand Down
4 changes: 2 additions & 2 deletions provider/pkg/clients/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1b1 "k8s.io/api/networking/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,7 +35,7 @@ func FromUnstructured(obj *unstructured.Unstructured) (metav1.Object, error) {
case kinds.Job:
output = new(batchv1.Job)
case kinds.Ingress:
output = new(networkingv1b1.Ingress)
output = new(networkingv1.Ingress)
case kinds.PersistentVolume:
output = new(corev1.PersistentVolume)
case kinds.PersistentVolumeClaim:
Expand Down
1 change: 0 additions & 1 deletion tests/sdk/go/helm-local/step1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func main() {
return err
}


return nil
})
}
Loading

0 comments on commit 1530785

Please sign in to comment.