Skip to content

Commit

Permalink
Add logic to check for Job readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed Sep 20, 2019
1 parent c9feaab commit 377ce21
Show file tree
Hide file tree
Showing 29 changed files with 2,003 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Improvements

- Add logic to check for Job readiness. (https://github.com/pulumi/pulumi-kubernetes/pull/633).
- Automatically mark Secret data and stringData as secret. (https://github.com/pulumi/pulumi-kubernetes/pull/803).
- Provide detailed error for removed apiVersions. (https://github.com/pulumi/pulumi-kubernetes/pull/809).

Expand Down
3 changes: 2 additions & 1 deletion pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,8 @@ func (dia *deploymentInitAwaiter) aggregatePodErrors() logging.Messages {
glog.V(3).Infof("Failed to unmarshal Pod event: %v", err)
return nil
}
messages = append(messages, checker.Update(pod).Warnings()...)
m := checker.Update(pod)
messages = append(messages, m.MessagesWithSeverity(diag.Warning, diag.Error)...)
}

return messages
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/apps_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func Test_Apps_Deployment(t *testing.T) {
object: deploymentProgressing(inputNamespace, deploymentInputName, revision1),
subErrors: []string{
"Minimum number of live Pods was not attained",
`containers with unready status: [nginx] -- [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
`[Pod foo-4setj4y6-7cdf7ddc54-kvh2w]: containers with unready status: [nginx] -- [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
}},
},
{
Expand All @@ -538,7 +538,7 @@ func Test_Apps_Deployment(t *testing.T) {
object: deploymentProgressing(inputNamespace, deploymentInputName, revision2),
subErrors: []string{
"Minimum number of live Pods was not attained",
`containers with unready status: [nginx] -- [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
`[Pod foo-4setj4y6-7cdf7ddc54-kvh2w]: containers with unready status: [nginx] -- [ImagePullBackOff] Back-off pulling image "sdkjlsdlkj"`,
}},
},
{
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/apps_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func Test_Apps_StatefulSet(t *testing.T) {
object: statefulsetProgressing(inputNamespace, inputName, targetService),
subErrors: []string{
"1 out of 2 replicas succeeded readiness checks",
"containers with unready status: [nginx] -- [ErrImagePull] manifest for nginx:busted not found",
"[Pod foo-0]: containers with unready status: [nginx] -- [ErrImagePull] manifest for nginx:busted not found",
}},
},
{
Expand All @@ -185,7 +185,7 @@ func Test_Apps_StatefulSet(t *testing.T) {
subErrors: []string{
"0 out of 2 replicas succeeded readiness checks",
"StatefulSet controller failed to advance from revision \"foo-7b5cf87b78\" to revision \"foo-789c4b994f\"",
"containers with unready status: [nginx] -- [ErrImagePull] manifest for nginx:busted not found",
"[Pod foo-0]: containers with unready status: [nginx] -- [ErrImagePull] manifest for nginx:busted not found",
}},
},
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (r ResourceId) String() string {
return r.Name
}

func (r ResourceId) GVKString() string {
return fmt.Sprintf(`'[%s] %s'`, r.GVK, r.String())
}

func ResourceIdFromUnstructured(uns *unstructured.Unstructured) ResourceId {
return ResourceId{
Namespace: clients.NamespaceOrDefault(uns.GetNamespace()),
Expand Down
38 changes: 38 additions & 0 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
appsV1Beta1StatefulSet = "apps/v1beta1/StatefulSet"
appsV1Beta2StatefulSet = "apps/v1beta2/StatefulSet"
autoscalingV1HorizontalPodAutoscaler = "autoscaling/v1/HorizontalPodAutoscaler"
batchV1Job = "batch/v1/Job"
coreV1ConfigMap = "v1/ConfigMap"
coreV1LimitRange = "v1/LimitRange"
coreV1Namespace = "v1/Namespace"
Expand Down Expand Up @@ -148,6 +149,19 @@ var deploymentAwaiter = awaitSpec{
awaitDeletion: untilAppsDeploymentDeleted,
}

var jobAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return makeJobInitAwaiter(c).Await()
},
awaitUpdate: func(u updateAwaitConfig) error {
return makeJobInitAwaiter(u.createAwaitConfig).Await()
},
awaitRead: func(c createAwaitConfig) error {
return makeJobInitAwaiter(c).Read()
},
awaitDeletion: untilBatchV1JobDeleted,
}

var statefulsetAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return makeStatefulSetInitAwaiter(updateAwaitConfig{createAwaitConfig: c}).Await()
Expand All @@ -172,6 +186,7 @@ var awaiters = map[string]awaitSpec{
appsV1Beta1StatefulSet: statefulsetAwaiter,
appsV1Beta2StatefulSet: statefulsetAwaiter,
autoscalingV1HorizontalPodAutoscaler: { /* NONE */ },
batchV1Job: jobAwaiter,
coreV1ConfigMap: { /* NONE */ },
coreV1LimitRange: { /* NONE */ },
coreV1Namespace: {
Expand Down Expand Up @@ -334,6 +349,29 @@ func untilAppsStatefulSetDeleted(config deleteAwaitConfig) error {

// --------------------------------------------------------------------------

// batch/v1/Job

// --------------------------------------------------------------------------

func untilBatchV1JobDeleted(config deleteAwaitConfig) error {
jobMissingOrKilled := func(pod *unstructured.Unstructured, err error) error {
if is404(err) {
return nil
} else if err != nil {
return err
}

e := fmt.Errorf("job %q still exists", config.currentInputs.GetName())
return watcher.RetryableError(e)
}

timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300)
return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
RetryUntil(jobMissingOrKilled, timeout)
}

// --------------------------------------------------------------------------

// core/v1/Namespace

// --------------------------------------------------------------------------
Expand Down
171 changes: 171 additions & 0 deletions pkg/await/batch_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2016-2019, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package await

import (
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/kinds"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi/pkg/diag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

const (
DefaultJobTimeoutMins = 10
)

type jobInitAwaiter struct {
job *unstructured.Unstructured
config createAwaitConfig
state *states.StateChecker
errors logging.TimeOrderedLogSet
resource ResourceId
}

func makeJobInitAwaiter(c createAwaitConfig) *jobInitAwaiter {
return &jobInitAwaiter{
config: c,
job: c.currentOutputs,
state: states.NewJobChecker(),
resource: ResourceIdFromUnstructured(c.currentOutputs),
}
}

func (jia *jobInitAwaiter) Await() error {
jobClient, err := clients.ResourceClient(kinds.Job, jia.config.currentInputs.GetNamespace(), jia.config.clientSet)
if err != nil {
return errors.Wrapf(err,
"Could not make client to watch Job %q",
jia.config.currentInputs.GetName())
}
jobWatcher, err := jobClient.Watch(metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "Couldn't set up watch for Job object %q",
jia.config.currentInputs.GetName())
}
defer jobWatcher.Stop()

podAggregator, err := NewPodAggregator(ResourceIdFromUnstructured(jia.job), jia.config.clientSet)
if err != nil {
return errors.Wrapf(err, "Could not create PodAggregator for %s", jia.resource.GVKString())
}
defer podAggregator.Stop()

timeout := metadata.TimeoutDuration(jia.config.timeout, jia.config.currentInputs, DefaultJobTimeoutMins*60)
for {
if jia.state.Ready() {
return nil
}

// Else, wait for updates.
select {
case <-jia.config.ctx.Done():
return &cancellationError{
object: jia.job,
subErrors: jia.errorMessages(),
}
case <-time.After(timeout):
return &timeoutError{
object: jia.job,
subErrors: jia.errorMessages(),
}
case event := <-jobWatcher.ResultChan():
err := jia.processJobEvent(event)
if err != nil {
return err
}
case messages := <-podAggregator.ResultChan():
for _, message := range messages {
jia.errors.Add(message)
jia.config.logMessage(message)
}
}
}
}

func (jia *jobInitAwaiter) Read() error {
jobClient, err := clients.ResourceClient(kinds.Job, jia.config.currentInputs.GetNamespace(), jia.config.clientSet)
if err != nil {
return errors.Wrapf(err,
"Could not make client to get Job %q",
jia.config.currentInputs.GetName())
}
// Get live version of Job.
job, err := jobClient.Get(jia.config.currentInputs.GetName(), metav1.GetOptions{})
if err != nil {
// IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it
// can mark the Pod as having been deleted.
return err
}

_ = jia.processJobEvent(watchAddedEvent(job))

// Check whether we've succeeded.
if jia.state.Ready() {
return nil
}

return &initializationError{
subErrors: jia.errorMessages(),
object: job,
}
}

func (jia *jobInitAwaiter) processJobEvent(event watch.Event) error {
job, err := clients.JobFromUnstructured(event.Object.(*unstructured.Unstructured))
if err != nil {
glog.V(3).Infof("Failed to unmarshal Job event: %v", err)
return nil
}

// Do nothing if this is not the job we're waiting for.
if job.GetName() != jia.config.currentInputs.GetName() {
return nil
}

messages := jia.state.Update(job)
for _, message := range messages.MessagesWithSeverity(diag.Warning, diag.Error) {
jia.errors.Add(message)
}
for _, message := range messages {
jia.config.logMessage(message)
}

if len(messages.Errors()) > 0 {
return &initializationError{
subErrors: jia.errorMessages(),
object: jia.job,
}
}

return nil
}

func (jia *jobInitAwaiter) errorMessages() []string {
messages := make([]string, 0)
for _, message := range jia.errors.Messages {
messages = append(messages, message.S)
}

return messages
}
75 changes: 75 additions & 0 deletions pkg/await/recordings/states/job/backoffLimit.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"batch/v1\",\"kind\":\"Job\",\"metadata\":{\"labels\":{\"app.kubernetes.io/managed-by\":\"pulumi\"},\"name\":\"foo\"},\"spec\":{\"backoffLimit\":1,\"template\":{\"spec\":{\"activeDeadlineSeconds\":10,\"containers\":[{\"command\":[\"perl\",\"-Mbignum=bpi\",\"-wle\",\"print bpi(2000)\"],\"image\":\"perl-fail\",\"name\":\"pi\"}],\"restartPolicy\":\"Never\"}}}}\n"
},
"creationTimestamp": "2019-09-04T18:19:35Z",
"labels": {
"app.kubernetes.io/managed-by": "pulumi"
},
"name": "foo",
"namespace": "default",
"resourceVersion": "1127282",
"selfLink": "/apis/batch/v1/namespaces/default/jobs/foo",
"uid": "8c668c31-cf40-11e9-8c3a-025000000001"
},
"spec": {
"backoffLimit": 1,
"completions": 1,
"parallelism": 1,
"selector": {
"matchLabels": {
"controller-uid": "8c668c31-cf40-11e9-8c3a-025000000001"
}
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"controller-uid": "8c668c31-cf40-11e9-8c3a-025000000001",
"job-name": "foo"
}
},
"spec": {
"activeDeadlineSeconds": 10,
"containers": [
{
"command": [
"perl",
"-Mbignum=bpi",
"-wle",
"print bpi(2000)"
],
"image": "perl-fail",
"imagePullPolicy": "Always",
"name": "pi",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
},
"status": {
"conditions": [
{
"lastProbeTime": "2019-09-04T18:19:55Z",
"lastTransitionTime": "2019-09-04T18:19:55Z",
"message": "Job has reached the specified backoff limit",
"reason": "BackoffLimitExceeded",
"status": "True",
"type": "Failed"
}
],
"failed": 2,
"startTime": "2019-09-04T18:19:35Z"
}
}
Loading

0 comments on commit 377ce21

Please sign in to comment.