Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Pod await logic #590

Merged
merged 6 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

- Enable multiple instances of Helm charts per stack (https://github.com/pulumi/pulumi-kubernetes/pull/599).
- Enable multiple instances of YAML manifests per stack (https://github.com/pulumi/pulumi-kubernetes/pull/594).
- Refactor Pod await logic for easier testing and maintenance (https://github.com/pulumi/pulumi-kubernetes/pull/590).

### Bug fixes

Expand Down
63 changes: 21 additions & 42 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/kinds"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi/pkg/diag"
Expand Down Expand Up @@ -299,13 +301,9 @@ func (dia *deploymentInitAwaiter) await(
subErrors: dia.errorMessages(),
}
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := dia.aggregatePodErrors()
for _, message := range scheduleErrors {
dia.config.logStatus(diag.Warning, message)
}

for _, message := range containerErrors {
dia.config.logStatus(diag.Warning, message)
messages := dia.aggregatePodErrors()
for _, message := range messages {
dia.config.logMessage(message)
}
case event := <-deploymentWatcher.ResultChan():
dia.processDeploymentEvent(event)
Expand Down Expand Up @@ -657,50 +655,30 @@ func (dia *deploymentInitAwaiter) processPersistentVolumeClaimsEvent(event watch
dia.checkPersistentVolumeClaimStatus()
}

func (dia *deploymentInitAwaiter) aggregatePodErrors() ([]string, []string) {
func (dia *deploymentInitAwaiter) aggregatePodErrors() logging.Messages {
rs, exists := dia.replicaSets[dia.currentGeneration]
if !exists {
return []string{}, []string{}
return nil
}

scheduleErrorCounts := map[string]int{}
containerErrorCounts := map[string]int{}
for _, pod := range dia.pods {
var messages logging.Messages
for _, unstructuredPod := range dia.pods {
// Filter down to only Pods owned by the active ReplicaSet.
if !isOwnedBy(pod, rs) {
if !isOwnedBy(unstructuredPod, rs) {
continue
}

// Check the pod for errors.
checker := makePodChecker()
checker.check(pod)

for reason, message := range checker.podScheduledErrors {
message = fmt.Sprintf("[%s] %s", reason, message)
scheduleErrorCounts[message] = scheduleErrorCounts[message] + 1
}

for reason, messages := range checker.containerErrors {
for _, message := range messages {
message = fmt.Sprintf("[%s] %s", reason, message)
containerErrorCounts[message] = containerErrorCounts[message] + 1
}
checker := states.NewPodChecker()
pod, err := clients.PodFromUnstructured(unstructuredPod)
if err != nil {
glog.V(3).Infof("Failed to unmarshal Pod event: %v", err)
return nil
}
messages = append(messages, checker.Update(pod).Warnings()...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

}

scheduleErrors := make([]string, 0)
for message, count := range scheduleErrorCounts {
message = fmt.Sprintf("%d Pods failed to schedule because: %s", count, message)
scheduleErrors = append(scheduleErrors, message)
}

containerErrors := make([]string, 0)
for message, count := range containerErrorCounts {
message = fmt.Sprintf("%d Pods failed to run because: %s", count, message)
containerErrors = append(containerErrors, message)
}

return scheduleErrors, containerErrors
return messages
}

func (dia *deploymentInitAwaiter) getFailedPersistentValueClaims() []string {
Expand Down Expand Up @@ -755,9 +733,10 @@ func (dia *deploymentInitAwaiter) errorMessages() []string {
}
}

scheduleErrors, containerErrors := dia.aggregatePodErrors()
messages = append(messages, scheduleErrors...)
messages = append(messages, containerErrors...)
errorMessages := dia.aggregatePodErrors()
for _, message := range errorMessages {
messages = append(messages, message.S)
}

return messages
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/apps_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func Test_Apps_Deployment(t *testing.T) {
object: deploymentProgressing(inputNamespace, deploymentInputName, revision1),
subErrors: []string{
"Minimum number of live Pods was not attained",
`1 Pods failed to run because: [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
`containers with unready status: [nginx] -- Back-off pulling image "sdkjlsdlkj"`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge thing but as a general rule I think we should keep the ImagePullBackOff because the standard error codes are easier to Google. It's also easier for people who know how this stuff works to understand what's going on.

}},
},
{
Expand All @@ -523,7 +523,7 @@ func Test_Apps_Deployment(t *testing.T) {
object: deploymentProgressing(inputNamespace, deploymentInputName, revision2),
subErrors: []string{
"Minimum number of live Pods was not attained",
`1 Pods failed to run because: [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
`containers with unready status: [nginx] -- Back-off pulling image "sdkjlsdlkj"`,
}},
},
{
Expand Down
61 changes: 20 additions & 41 deletions pkg/await/apps_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/kinds"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi/pkg/diag"
Expand Down Expand Up @@ -240,13 +242,9 @@ func (sia *statefulsetInitAwaiter) await(
subErrors: sia.errorMessages(),
}
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := sia.aggregatePodErrors()
for _, message := range scheduleErrors {
sia.config.logStatus(diag.Warning, message)
}

for _, message := range containerErrors {
sia.config.logStatus(diag.Warning, message)
messages := sia.aggregatePodErrors()
for _, message := range messages {
sia.config.logMessage(message)
}
case event := <-statefulsetWatcher.ResultChan():
sia.processStatefulSetEvent(event)
Expand Down Expand Up @@ -392,45 +390,25 @@ func (sia *statefulsetInitAwaiter) processPodEvent(event watch.Event) {
sia.pods[podName] = pod
}

func (sia *statefulsetInitAwaiter) aggregatePodErrors() ([]string, []string) {
scheduleErrorCounts := map[string]int{}
containerErrorCounts := map[string]int{}
for _, pod := range sia.pods {
func (sia *statefulsetInitAwaiter) aggregatePodErrors() logging.Messages {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but an alternative design you might consider here is to have it take a lambda as an argument instead of passing an allocated list back, and then allocating another list, again, in response, which is what we're doing in a lot of the callsites.

messages := sia.aggregatePodErrors(func() {
    sia.config.logMessage(message)
})

var messages logging.Messages
for _, unstructuredPod := range sia.pods {
// Filter down to only Pods owned by the active StatefulSet.
if !isOwnedBy(pod, sia.statefulset) {
if !isOwnedBy(unstructuredPod, sia.statefulset) {
continue
}

// Check the pod for errors.
checker := makePodChecker()
checker.check(pod)

for reason, message := range checker.podScheduledErrors {
message = fmt.Sprintf("[%s] %s", reason, message)
scheduleErrorCounts[message] = scheduleErrorCounts[message] + 1
}

for reason, messages := range checker.containerErrors {
for _, message := range messages {
message = fmt.Sprintf("[%s] %s", reason, message)
containerErrorCounts[message] = containerErrorCounts[message] + 1
}
checker := states.NewPodChecker()
pod, err := clients.PodFromUnstructured(unstructuredPod)
if err != nil {
glog.V(3).Infof("Failed to unmarshal Pod event: %v", err)
return nil
}
messages = append(messages, checker.Update(pod).Warnings()...)
}

scheduleErrors := make([]string, 0)
for message, count := range scheduleErrorCounts {
message = fmt.Sprintf("%d Pods failed to schedule because: %s", count, message)
scheduleErrors = append(scheduleErrors, message)
}

containerErrors := make([]string, 0)
for message, count := range containerErrorCounts {
message = fmt.Sprintf("%d Pods failed to run because: %s", count, message)
containerErrors = append(containerErrors, message)
}

return scheduleErrors, containerErrors
return messages
}

func (sia *statefulsetInitAwaiter) errorMessages() []string {
Expand All @@ -445,9 +423,10 @@ func (sia *statefulsetInitAwaiter) errorMessages() []string {
".status.currentRevision does not match .status.updateRevision")
}

scheduleErrors, containerErrors := sia.aggregatePodErrors()
messages = append(messages, scheduleErrors...)
messages = append(messages, containerErrors...)
errorMessages := sia.aggregatePodErrors()
for _, message := range errorMessages {
messages = append(messages, message.S)
}

return messages
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/apps_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func Test_Apps_StatefulSet(t *testing.T) {
object: statefulsetProgressing(inputNamespace, inputName, targetService),
subErrors: []string{
"Failed to observe the expected number of ready replicas",
"1 Pods failed to run because: [ErrImagePull] manifest for nginx:busted not found",
"containers with unready status: [nginx] -- manifest for nginx:busted not found",
}},
},
{
Expand All @@ -187,7 +187,7 @@ func Test_Apps_StatefulSet(t *testing.T) {
subErrors: []string{
"Failed to observe the expected number of ready replicas",
".status.currentRevision does not match .status.updateRevision",
"1 Pods failed to run because: [ErrImagePull] manifest for nginx:busted not found",
"containers with unready status: [nginx] -- manifest for nginx:busted not found",
}},
},
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) {
cac.logger.LogMessage(sev, message)
}

func (cac *createAwaitConfig) logMessage(message logging.Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have DedupLogger#LogMessage take a logging.Message instead of having a second implementation here?

cac.logger.LogMessage(message.Severity, message.S)
}

// updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated",
// i.e., the spec of the API object has changed and the controllers have reached a steady state. For
// example, we might consider a `Deployment` "fully updated" only when the previous generation of
Expand Down Expand Up @@ -173,9 +177,9 @@ var awaiters = map[string]awaitSpec{
awaitCreation: untilCoreV1PersistentVolumeClaimBound,
},
coreV1Pod: {
// NOTE: Because we replace the Pod in most situations, we do not require special logic for
// the update path.
awaitCreation: func(c createAwaitConfig) error { return makePodInitAwaiter(c).Await() },
awaitCreation: awaitPodInit,
awaitRead: awaitPodRead,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment no longer true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true in most cases, but IIRC, it wasn't working quite right as I was testing individual Pod updates. Seemed like it made sense to standardize since the change was trivial.

awaitUpdate: awaitPodUpdate,
awaitDeletion: untilCoreV1PodDeleted,
},
coreV1ReplicationController: {
Expand Down
Loading