Skip to content

Commit

Permalink
Refactor Pod await logic (#590)
Browse files Browse the repository at this point in the history
- Refactor the Pod await logic into a collection of stateless Condition functions. This makes testing much easier, and should improve our ability to maintain and reason about the related logic.
- Revise the related tests to be a combination of unit tests for the Condition functions, and integration tests validating the behavior of the state checker on recorded sequences of k8s Events corresponding to typical workflows.
  • Loading branch information
lblackstone committed Jul 3, 2019
1 parent a30c8ea commit 46b1bce
Show file tree
Hide file tree
Showing 33 changed files with 5,190 additions and 1,507 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

- Enable multiple instances of Helm charts per stack (https://github.com/pulumi/pulumi-kubernetes/pull/599).
- Enable multiple instances of YAML manifests per stack (https://github.com/pulumi/pulumi-kubernetes/pull/594).
- Refactor Pod await logic for easier testing and maintenance (https://github.com/pulumi/pulumi-kubernetes/pull/590).

### Bug fixes

Expand Down
63 changes: 21 additions & 42 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/kinds"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi/pkg/diag"
Expand Down Expand Up @@ -299,13 +301,9 @@ func (dia *deploymentInitAwaiter) await(
subErrors: dia.errorMessages(),
}
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := dia.aggregatePodErrors()
for _, message := range scheduleErrors {
dia.config.logStatus(diag.Warning, message)
}

for _, message := range containerErrors {
dia.config.logStatus(diag.Warning, message)
messages := dia.aggregatePodErrors()
for _, message := range messages {
dia.config.logMessage(message)
}
case event := <-deploymentWatcher.ResultChan():
dia.processDeploymentEvent(event)
Expand Down Expand Up @@ -657,50 +655,30 @@ func (dia *deploymentInitAwaiter) processPersistentVolumeClaimsEvent(event watch
dia.checkPersistentVolumeClaimStatus()
}

func (dia *deploymentInitAwaiter) aggregatePodErrors() ([]string, []string) {
func (dia *deploymentInitAwaiter) aggregatePodErrors() logging.Messages {
rs, exists := dia.replicaSets[dia.currentGeneration]
if !exists {
return []string{}, []string{}
return nil
}

scheduleErrorCounts := map[string]int{}
containerErrorCounts := map[string]int{}
for _, pod := range dia.pods {
var messages logging.Messages
for _, unstructuredPod := range dia.pods {
// Filter down to only Pods owned by the active ReplicaSet.
if !isOwnedBy(pod, rs) {
if !isOwnedBy(unstructuredPod, rs) {
continue
}

// Check the pod for errors.
checker := makePodChecker()
checker.check(pod)

for reason, message := range checker.podScheduledErrors {
message = fmt.Sprintf("[%s] %s", reason, message)
scheduleErrorCounts[message] = scheduleErrorCounts[message] + 1
}

for reason, messages := range checker.containerErrors {
for _, message := range messages {
message = fmt.Sprintf("[%s] %s", reason, message)
containerErrorCounts[message] = containerErrorCounts[message] + 1
}
checker := states.NewPodChecker()
pod, err := clients.PodFromUnstructured(unstructuredPod)
if err != nil {
glog.V(3).Infof("Failed to unmarshal Pod event: %v", err)
return nil
}
messages = append(messages, checker.Update(pod).Warnings()...)
}

scheduleErrors := make([]string, 0)
for message, count := range scheduleErrorCounts {
message = fmt.Sprintf("%d Pods failed to schedule because: %s", count, message)
scheduleErrors = append(scheduleErrors, message)
}

containerErrors := make([]string, 0)
for message, count := range containerErrorCounts {
message = fmt.Sprintf("%d Pods failed to run because: %s", count, message)
containerErrors = append(containerErrors, message)
}

return scheduleErrors, containerErrors
return messages
}

func (dia *deploymentInitAwaiter) getFailedPersistentValueClaims() []string {
Expand Down Expand Up @@ -755,9 +733,10 @@ func (dia *deploymentInitAwaiter) errorMessages() []string {
}
}

scheduleErrors, containerErrors := dia.aggregatePodErrors()
messages = append(messages, scheduleErrors...)
messages = append(messages, containerErrors...)
errorMessages := dia.aggregatePodErrors()
for _, message := range errorMessages {
messages = append(messages, message.S)
}

return messages
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/apps_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func Test_Apps_Deployment(t *testing.T) {
object: deploymentProgressing(inputNamespace, deploymentInputName, revision1),
subErrors: []string{
"Minimum number of live Pods was not attained",
`1 Pods failed to run because: [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
`containers with unready status: [nginx] -- Back-off pulling image "sdkjlsdlkj"`,
}},
},
{
Expand All @@ -523,7 +523,7 @@ func Test_Apps_Deployment(t *testing.T) {
object: deploymentProgressing(inputNamespace, deploymentInputName, revision2),
subErrors: []string{
"Minimum number of live Pods was not attained",
`1 Pods failed to run because: [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
`containers with unready status: [nginx] -- Back-off pulling image "sdkjlsdlkj"`,
}},
},
{
Expand Down
61 changes: 20 additions & 41 deletions pkg/await/apps_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/kinds"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi/pkg/diag"
Expand Down Expand Up @@ -240,13 +242,9 @@ func (sia *statefulsetInitAwaiter) await(
subErrors: sia.errorMessages(),
}
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := sia.aggregatePodErrors()
for _, message := range scheduleErrors {
sia.config.logStatus(diag.Warning, message)
}

for _, message := range containerErrors {
sia.config.logStatus(diag.Warning, message)
messages := sia.aggregatePodErrors()
for _, message := range messages {
sia.config.logMessage(message)
}
case event := <-statefulsetWatcher.ResultChan():
sia.processStatefulSetEvent(event)
Expand Down Expand Up @@ -392,45 +390,25 @@ func (sia *statefulsetInitAwaiter) processPodEvent(event watch.Event) {
sia.pods[podName] = pod
}

func (sia *statefulsetInitAwaiter) aggregatePodErrors() ([]string, []string) {
scheduleErrorCounts := map[string]int{}
containerErrorCounts := map[string]int{}
for _, pod := range sia.pods {
func (sia *statefulsetInitAwaiter) aggregatePodErrors() logging.Messages {
var messages logging.Messages
for _, unstructuredPod := range sia.pods {
// Filter down to only Pods owned by the active StatefulSet.
if !isOwnedBy(pod, sia.statefulset) {
if !isOwnedBy(unstructuredPod, sia.statefulset) {
continue
}

// Check the pod for errors.
checker := makePodChecker()
checker.check(pod)

for reason, message := range checker.podScheduledErrors {
message = fmt.Sprintf("[%s] %s", reason, message)
scheduleErrorCounts[message] = scheduleErrorCounts[message] + 1
}

for reason, messages := range checker.containerErrors {
for _, message := range messages {
message = fmt.Sprintf("[%s] %s", reason, message)
containerErrorCounts[message] = containerErrorCounts[message] + 1
}
checker := states.NewPodChecker()
pod, err := clients.PodFromUnstructured(unstructuredPod)
if err != nil {
glog.V(3).Infof("Failed to unmarshal Pod event: %v", err)
return nil
}
messages = append(messages, checker.Update(pod).Warnings()...)
}

scheduleErrors := make([]string, 0)
for message, count := range scheduleErrorCounts {
message = fmt.Sprintf("%d Pods failed to schedule because: %s", count, message)
scheduleErrors = append(scheduleErrors, message)
}

containerErrors := make([]string, 0)
for message, count := range containerErrorCounts {
message = fmt.Sprintf("%d Pods failed to run because: %s", count, message)
containerErrors = append(containerErrors, message)
}

return scheduleErrors, containerErrors
return messages
}

func (sia *statefulsetInitAwaiter) errorMessages() []string {
Expand All @@ -445,9 +423,10 @@ func (sia *statefulsetInitAwaiter) errorMessages() []string {
".status.currentRevision does not match .status.updateRevision")
}

scheduleErrors, containerErrors := sia.aggregatePodErrors()
messages = append(messages, scheduleErrors...)
messages = append(messages, containerErrors...)
errorMessages := sia.aggregatePodErrors()
for _, message := range errorMessages {
messages = append(messages, message.S)
}

return messages
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/apps_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func Test_Apps_StatefulSet(t *testing.T) {
object: statefulsetProgressing(inputNamespace, inputName, targetService),
subErrors: []string{
"Failed to observe the expected number of ready replicas",
"1 Pods failed to run because: [ErrImagePull] manifest for nginx:busted not found",
"containers with unready status: [nginx] -- manifest for nginx:busted not found",
}},
},
{
Expand All @@ -187,7 +187,7 @@ func Test_Apps_StatefulSet(t *testing.T) {
subErrors: []string{
"Failed to observe the expected number of ready replicas",
".status.currentRevision does not match .status.updateRevision",
"1 Pods failed to run because: [ErrImagePull] manifest for nginx:busted not found",
"containers with unready status: [nginx] -- manifest for nginx:busted not found",
}},
},
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) {
cac.logger.LogMessage(sev, message)
}

func (cac *createAwaitConfig) logMessage(message logging.Message) {
cac.logger.LogMessage(message.Severity, message.S)
}

// updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated",
// i.e., the spec of the API object has changed and the controllers have reached a steady state. For
// example, we might consider a `Deployment` "fully updated" only when the previous generation of
Expand Down Expand Up @@ -173,9 +177,9 @@ var awaiters = map[string]awaitSpec{
awaitCreation: untilCoreV1PersistentVolumeClaimBound,
},
coreV1Pod: {
// NOTE: Because we replace the Pod in most situations, we do not require special logic for
// the update path.
awaitCreation: func(c createAwaitConfig) error { return makePodInitAwaiter(c).Await() },
awaitCreation: awaitPodInit,
awaitRead: awaitPodRead,
awaitUpdate: awaitPodUpdate,
awaitDeletion: untilCoreV1PodDeleted,
},
coreV1ReplicationController: {
Expand Down
Loading

0 comments on commit 46b1bce

Please sign in to comment.