Skip to content

Commit

Permalink
Create Ingress awaiter with incremental status updates (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone authored Nov 21, 2018
1 parent a11ab8e commit 3d49f78
Show file tree
Hide file tree
Showing 6 changed files with 605 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 4 additions & 47 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type createAwaitConfig struct {
currentOutputs *unstructured.Unstructured
}

// nolint
func (cac *createAwaitConfig) eventClient() (dynamic.ResourceInterface, error) {
return client.FromGVK(cac.pool, cac.disco, schema.GroupVersionKind{
Group: "",
Expand Down Expand Up @@ -191,7 +192,9 @@ var awaiters = map[string]awaitSpec{
},
extensionsV1Beta1Deployment: deploymentAwaiter,
extensionsV1Beta1Ingress: {
awaitCreation: untilExtensionsV1Beta1IngressInitialized,
awaitCreation: awaitIngressInit,
awaitRead: awaitIngressRead,
awaitUpdate: awaitIngressUpdate,
},
rbacAuthorizationV1ClusterRole: { /* NONE */ },
rbacAuthorizationV1ClusterRoleBinding: { /* NONE */ },
Expand Down Expand Up @@ -527,52 +530,6 @@ func untilCoreV1ServiceAccountInitialized(c createAwaitConfig) error {

// --------------------------------------------------------------------------

// extensions/v1beta1/Ingress

// --------------------------------------------------------------------------

func untilExtensionsV1Beta1IngressInitialized(c createAwaitConfig) error {
clientForEvents, err := c.eventClient()
if err != nil {
return err
}

name := c.currentInputs.GetName()

externalIPAllocated := func(svc *unstructured.Unstructured) bool {
lbIngress, _ := openapi.Pluck(svc.Object, "status", "loadBalancer", "ingress")
status, _ := openapi.Pluck(svc.Object, "status")

glog.V(3).Infof("Received Ingress status: %#v", status)
if ing, isSlice := lbIngress.([]interface{}); isSlice && len(ing) > 0 {
return true
}

glog.V(3).Infof("Waiting for Ingress '%q' to assign IP/hostname for a load balancer", name)

return false
}

// Await.
glog.V(3).Info("Waiting for load balancer to assign IP/hostname")

err = watcher.ForObject(c.ctx, c.clientForResource, c.currentInputs.GetName()).
WatchUntil(externalIPAllocated, 10*time.Minute)

if err != nil {
lastWarnings, wErr := getLastWarningsForObject(clientForEvents, c.currentInputs.GetNamespace(),
name, "Ingress", 3)
if wErr != nil {
return wErr
}
return fmt.Errorf("%s%s", err, stringifyEvents(lastWarnings))
}

return nil
}

// --------------------------------------------------------------------------

// Awaiter utilities.

// --------------------------------------------------------------------------
Expand Down
5 changes: 4 additions & 1 deletion pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (sia *serviceInitAwaiter) Await() error {
sia.config.currentInputs.GetName())
}

glog.V(3).Infof("Service Endpoint client namespace: %q", sia.config.currentInputs.GetNamespace())
endpointWatcher, err := endpointClient.Watch(metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
Expand Down Expand Up @@ -185,6 +186,8 @@ func (sia *serviceInitAwaiter) read(

var err error
settled := make(chan struct{})

glog.V(3).Infof("Processing endpoint list: %#v", endpoints)
err = endpoints.EachListItem(func(endpoint runtime.Object) error {
sia.processEndpointEvent(watchAddedEvent(endpoint.(*unstructured.Unstructured)), settled)
return nil
Expand Down Expand Up @@ -336,7 +339,7 @@ func (sia *serviceInitAwaiter) processEndpointEvent(event watch.Event, settledCh
}

func (sia *serviceInitAwaiter) errorMessages() []string {
messages := []string{}
messages := make([]string, 0)
if sia.emptyHeadlessOrExternalName() {
return messages
}
Expand Down
Loading

0 comments on commit 3d49f78

Please sign in to comment.