Skip to content

Commit

Permalink
await: add PersistentVolumeClaim event processing to apps_deployment
Browse files Browse the repository at this point in the history
ref: #133
  • Loading branch information
metral committed Jan 17, 2019
1 parent 1bd96ed commit 098bf43
Show file tree
Hide file tree
Showing 2 changed files with 610 additions and 14 deletions.
149 changes: 139 additions & 10 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package await
import (
"fmt"
"reflect"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -88,6 +89,7 @@ type deploymentInitAwaiter struct {
config updateAwaitConfig
deploymentAvailable bool
replicaSetAvailable bool
pvcsAvailable bool
updatedReplicaSetReady bool
currentGeneration string

Expand All @@ -96,6 +98,7 @@ type deploymentInitAwaiter struct {
deployment *unstructured.Unstructured
replicaSets map[string]*unstructured.Unstructured
pods map[string]*unstructured.Unstructured
pvcs map[string]*unstructured.Unstructured
}

func makeDeploymentInitAwaiter(c updateAwaitConfig) *deploymentInitAwaiter {
Expand All @@ -112,6 +115,7 @@ func makeDeploymentInitAwaiter(c updateAwaitConfig) *deploymentInitAwaiter {
deployment: c.currentOutputs,
pods: map[string]*unstructured.Unstructured{},
replicaSets: map[string]*unstructured.Unstructured{},
pvcs: map[string]*unstructured.Unstructured{},
}
}

Expand All @@ -133,7 +137,7 @@ func (dia *deploymentInitAwaiter) Await() error {
// corresponding ReplicaSet), and therefore there is no rollout to mark as "Progressing".
//

replicaSetClient, podClient, err := dia.makeClients()
replicaSetClient, podClient, pvcClient, err := dia.makeClients()
if err != nil {
return err
}
Expand Down Expand Up @@ -164,15 +168,24 @@ func (dia *deploymentInitAwaiter) Await() error {
}
defer podWatcher.Stop()

// Create PersistentVolumeClaims watcher.
pvcWatcher, err := pvcClient.Watch(metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for PersistentVolumeClaims objects associated with Deployment %q",
dia.config.currentInputs.GetName())
}
defer pvcWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()

return dia.await(deploymentWatcher, replicaSetWatcher, podWatcher, time.After(5*time.Minute), period.C)
return dia.await(deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(5*time.Minute), period.C)
}

func (dia *deploymentInitAwaiter) Read() error {
// Get clients needed to retrieve live versions of relevant Deployments, ReplicaSets, and Pods.
replicaSetClient, podClient, err := dia.makeClients()
replicaSetClient, podClient, pvcClient, err := dia.makeClients()
if err != nil {
return err
}
Expand Down Expand Up @@ -206,12 +219,19 @@ func (dia *deploymentInitAwaiter) Read() error {
podList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

pvcList, err := pvcClient.List(metav1.ListOptions{})
if err != nil {
glog.V(3).Infof("Error retrieving PersistentVolumeClaims list for Deployment %q: %v",
deployment.GetName(), err)
pvcList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

return dia.read(deployment, rsList.(*unstructured.UnstructuredList),
podList.(*unstructured.UnstructuredList))
podList.(*unstructured.UnstructuredList), pvcList.(*unstructured.UnstructuredList))
}

func (dia *deploymentInitAwaiter) read(
deployment *unstructured.Unstructured, replicaSets, pods *unstructured.UnstructuredList,
deployment *unstructured.Unstructured, replicaSets, pods, pvcs *unstructured.UnstructuredList,
) error {
dia.processDeploymentEvent(watchAddedEvent(deployment))

Expand All @@ -233,6 +253,15 @@ func (dia *deploymentInitAwaiter) read(
deployment.GetName(), err)
}

err = pvcs.EachListItem(func(pvc runtime.Object) error {
dia.processPersistentVolumeClaimsEvent(watchAddedEvent(pvc.(*unstructured.Unstructured)))
return nil
})
if err != nil {
glog.V(3).Infof("Error iterating over PersistentVolumeClaims list for Deployment %q: %v",
deployment.GetName(), err)
}

if dia.checkAndLogStatus() {
return nil
}
Expand All @@ -245,7 +274,7 @@ func (dia *deploymentInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (dia *deploymentInitAwaiter) await(
deploymentWatcher, replicaSetWatcher, podWatcher watch.Interface, timeout, period <-chan time.Time,
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, timeout, period <-chan time.Time,
) error {
dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")

Expand Down Expand Up @@ -281,6 +310,8 @@ func (dia *deploymentInitAwaiter) await(
dia.processReplicaSetEvent(event)
case event := <-podWatcher.ResultChan():
dia.processPodEvent(event)
case event := <-pvcWatcher.ResultChan():
dia.processPersistentVolumeClaimsEvent(event)
}
}
}
Expand All @@ -295,14 +326,30 @@ func (dia *deploymentInitAwaiter) await(
// rolled out. This means there is no rollout to be marked as "progressing", so we need only
// check that the Deployment was created, and the corresponding ReplicaSet needs to be marked
// available.
func (dia *deploymentInitAwaiter) isEveryPVCReady() bool {
if len(dia.pvcs) == 0 || (len(dia.pvcs) > 0 && dia.pvcsAvailable) {
return true
}

return false
}

func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
if dia.currentGeneration == "1" {
if dia.deploymentAvailable && dia.updatedReplicaSetReady {
if !dia.isEveryPVCReady() {
return false
}

dia.config.logStatus(diag.Info, "✅ Deployment initialization complete")
return true
}
} else {
if dia.deploymentAvailable && dia.replicaSetAvailable && dia.updatedReplicaSetReady {
if !dia.isEveryPVCReady() {
return false
}

dia.config.logStatus(diag.Info, "✅ Deployment initialization complete")
return true
}
Expand Down Expand Up @@ -403,6 +450,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
}

dia.checkReplicaSetStatus()
dia.checkPersistentVolumeClaimStatus()
}

func (dia *deploymentInitAwaiter) processReplicaSetEvent(event watch.Event) {
Expand Down Expand Up @@ -503,6 +551,30 @@ func (dia *deploymentInitAwaiter) changeTriggeredRollout() bool {
return len(fields) > 0
}

func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {
inputs := dia.config.currentInputs

glog.V(3).Infof("Checking PersistentVolumeClaims status for Deployment %q", inputs.GetName())

allPVCsReady := true
for _, pvc := range dia.pvcs {
phase, hasConditions := openapi.Pluck(pvc.Object, "status", "phase")
if !hasConditions {
return
}

// Success only occurs when there are no PersistentVolumeClaims
// defined, or when all PVCs have a status of 'Bound'
if phase != statusBound {
allPVCsReady = false
message := fmt.Sprintf("PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
dia.config.logStatus(diag.Warning, message)
}
}

dia.pvcsAvailable = allPVCsReady
}

func (dia *deploymentInitAwaiter) processPodEvent(event watch.Event) {
pod, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
Expand All @@ -527,6 +599,26 @@ func (dia *deploymentInitAwaiter) processPodEvent(event watch.Event) {
dia.pods[podName] = pod
}

func (dia *deploymentInitAwaiter) processPersistentVolumeClaimsEvent(event watch.Event) {
pvc, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
glog.V(3).Infof("PersistentVolumeClaim watch received unknown object type %q",
reflect.TypeOf(pvc))
return
}

glog.V(3).Infof("Received update for PersistentVolumeClaim %q", pvc.GetName())

// If Pod was deleted, remove it from our aggregated checkers.
uid := string(pvc.GetUID())
if event.Type == watch.Deleted {
delete(dia.pvcs, uid)
return
}
dia.pvcs[uid] = pvc
dia.checkPersistentVolumeClaimStatus()
}

func (dia *deploymentInitAwaiter) aggregatePodErrors() ([]string, []string) {
rs, exists := dia.replicaSets[dia.currentGeneration]
if !exists {
Expand Down Expand Up @@ -573,13 +665,33 @@ func (dia *deploymentInitAwaiter) aggregatePodErrors() ([]string, []string) {
return scheduleErrors, containerErrors
}

func (dia *deploymentInitAwaiter) getFailedPersistentValueClaims() []string {
if dia.isEveryPVCReady() {
return nil
}

failed := []string{}
for _, pvc := range dia.pvcs {
phase, _ := openapi.Pluck(pvc.Object, "status", "phase")
if phase != statusBound {
failed = append(failed, pvc.GetName())
}
}
return failed
}

func (dia *deploymentInitAwaiter) errorMessages() []string {
messages := []string{}
for _, message := range dia.deploymentErrors {
messages = append(messages, message)
}

if dia.currentGeneration == "1" {
if !dia.isEveryPVCReady() {
failed := dia.getFailedPersistentValueClaims()
msg := fmt.Sprintf("Failed to bind PersistentVolumeClaim(s): %q", strings.Join(failed, ","))
messages = append(messages, msg)
}
if !dia.deploymentAvailable {
messages = append(messages,
"Minimum number of live Pods was not attained")
Expand All @@ -588,6 +700,11 @@ func (dia *deploymentInitAwaiter) errorMessages() []string {
"Minimum number of Pods to consider the application live was not attained")
}
} else {
if !dia.isEveryPVCReady() {
failed := dia.getFailedPersistentValueClaims()
msg := fmt.Sprintf("Failed to bind PersistentVolumeClaim(s): %q", strings.Join(failed, ","))
messages = append(messages, msg)
}
if !dia.deploymentAvailable {
messages = append(messages,
"Minimum number of live Pods was not attained")
Expand All @@ -608,7 +725,7 @@ func (dia *deploymentInitAwaiter) errorMessages() []string {
}

func (dia *deploymentInitAwaiter) makeClients() (
replicaSetClient, podClient dynamic.ResourceInterface, err error,
replicaSetClient, podClient, pvcClient dynamic.ResourceInterface, err error,
) {
replicaSetClient, err = client.FromGVK(dia.config.pool, dia.config.disco,
schema.GroupVersionKind{
Expand All @@ -617,7 +734,7 @@ func (dia *deploymentInitAwaiter) makeClients() (
Kind: "ReplicaSet",
}, dia.config.currentInputs.GetNamespace())
if err != nil {
return nil, nil, errors.Wrapf(err,
return nil, nil, nil, errors.Wrapf(err,
"Could not make client to watch ReplicaSets associated with Deployment %q",
dia.config.currentInputs.GetName())
}
Expand All @@ -629,10 +746,22 @@ func (dia *deploymentInitAwaiter) makeClients() (
Kind: "Pod",
}, dia.config.currentInputs.GetNamespace())
if err != nil {
return nil, nil, errors.Wrapf(err,
return nil, nil, nil, errors.Wrapf(err,
"Could not make client to watch Pods associated with Deployment %q",
dia.config.currentInputs.GetName())
}

return replicaSetClient, podClient, nil
pvcClient, err = client.FromGVK(dia.config.pool, dia.config.disco,
schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PersistentVolumeClaim",
}, dia.config.currentInputs.GetNamespace())
if err != nil {
return nil, nil, nil, errors.Wrapf(err,
"Could not make client to watch PersistentVolumeClaims associated with Deployment %q",
dia.config.currentInputs.GetName())
}

return replicaSetClient, podClient, pvcClient, nil
}
Loading

0 comments on commit 098bf43

Please sign in to comment.