Skip to content

Commit

Permalink
Add Job awaiter
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed Sep 18, 2019
1 parent 3e8dabf commit 8d6658d
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
appsV1Beta1StatefulSet = "apps/v1beta1/StatefulSet"
appsV1Beta2StatefulSet = "apps/v1beta2/StatefulSet"
autoscalingV1HorizontalPodAutoscaler = "autoscaling/v1/HorizontalPodAutoscaler"
batchV1Job = "batch/v1/Job"
coreV1ConfigMap = "v1/ConfigMap"
coreV1LimitRange = "v1/LimitRange"
coreV1Namespace = "v1/Namespace"
Expand Down Expand Up @@ -148,6 +149,19 @@ var deploymentAwaiter = awaitSpec{
awaitDeletion: untilAppsDeploymentDeleted,
}

var jobAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return makeJobInitAwaiter(c).Await()
},
awaitUpdate: func(u updateAwaitConfig) error {
return makeJobInitAwaiter(u.createAwaitConfig).Await()
},
awaitRead: func(c createAwaitConfig) error {
return makeJobInitAwaiter(c).Read()
},
awaitDeletion: untilBatchV1JobDeleted,
}

var statefulsetAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return makeStatefulSetInitAwaiter(updateAwaitConfig{createAwaitConfig: c}).Await()
Expand All @@ -172,6 +186,7 @@ var awaiters = map[string]awaitSpec{
appsV1Beta1StatefulSet: statefulsetAwaiter,
appsV1Beta2StatefulSet: statefulsetAwaiter,
autoscalingV1HorizontalPodAutoscaler: { /* NONE */ },
batchV1Job: jobAwaiter,
coreV1ConfigMap: { /* NONE */ },
coreV1LimitRange: { /* NONE */ },
coreV1Namespace: {
Expand Down Expand Up @@ -334,6 +349,20 @@ func untilAppsStatefulSetDeleted(config deleteAwaitConfig) error {

// --------------------------------------------------------------------------

// batch/v1/Job

// --------------------------------------------------------------------------

func untilBatchV1JobDeleted(
ctx context.Context, clientForResource dynamic.ResourceInterface, name string,
) error {
// TODO

return nil
}

// --------------------------------------------------------------------------

// core/v1/Namespace

// --------------------------------------------------------------------------
Expand Down
133 changes: 133 additions & 0 deletions pkg/await/batch_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2016-2019, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package await

import (
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/kinds"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

const (
DefaultJobTimeoutMins = 10
)

type jobInitAwaiter struct {
job *unstructured.Unstructured
config createAwaitConfig
state *states.StateChecker
messages logging.Messages
}

func makeJobInitAwaiter(c createAwaitConfig) *jobInitAwaiter {
return &jobInitAwaiter{
config: c,
job: c.currentOutputs,
state: states.NewJobChecker(),
}
}

func (jia *jobInitAwaiter) Await() error {
jobClient, err := clients.ResourceClient(kinds.Job, jia.config.currentInputs.GetNamespace(), jia.config.clientSet)
if err != nil {
return errors.Wrapf(err,
"Could not make client to watch Job %q",
jia.config.currentInputs.GetName())
}
jobWatcher, err := jobClient.Watch(metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "Couldn't set up watch for Job object %q",
jia.config.currentInputs.GetName())
}
defer jobWatcher.Stop()

timeout := metadata.TimeoutDuration(jia.config.timeout, jia.config.currentInputs, DefaultJobTimeoutMins*60)
for {
if jia.state.Ready() {
return nil
}

// Else, wait for updates.
select {
case <-jia.config.ctx.Done():
return &cancellationError{
object: jia.job,
// subErrors: jia.errorMessages(),
}
case <-time.After(timeout):
return &timeoutError{
object: jia.job,
// subErrors: jia.errorMessages(),
}
case event := <-jobWatcher.ResultChan():
jia.processJobEvent(event)
}
}
}

func (jia *jobInitAwaiter) Read() error {
jobClient, err := clients.ResourceClient(kinds.Job, jia.config.currentInputs.GetNamespace(), jia.config.clientSet)
if err != nil {
return errors.Wrapf(err,
"Could not make client to get Job %q",
jia.config.currentInputs.GetName())
}
// Get live version of Job.
job, err := jobClient.Get(jia.config.currentInputs.GetName(), metav1.GetOptions{})
if err != nil {
// IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it
// can mark the Pod as having been deleted.
return err
}

jia.processJobEvent(watchAddedEvent(job))

// Check whether we've succeeded.
if jia.state.Ready() {
return nil
}

return &initializationError{
// subErrors: jia.errorMessages(),
object: job,
}
}

func (jia *jobInitAwaiter) processJobEvent(event watch.Event) {
job, err := clients.JobFromUnstructured(event.Object.(*unstructured.Unstructured))
if err != nil {
glog.V(3).Infof("Failed to unmarshal Job event: %v", err)
return
}

// Do nothing if this is not the job we're waiting for.
if job.GetName() != jia.config.currentInputs.GetName() {
return
}

jia.messages = jia.state.Update(job)
for _, message := range jia.messages {
jia.config.logMessage(message)
}
}
200 changes: 200 additions & 0 deletions pkg/await/recordings/states/job/jobSucceeded.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
[
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"batch/v1\",\"kind\":\"Job\",\"metadata\":{\"labels\":{\"app.kubernetes.io/managed-by\":\"pulumi\"},\"name\":\"foo\"},\"spec\":{\"backoffLimit\":4,\"template\":{\"spec\":{\"containers\":[{\"command\":[\"perl\",\"-Mbignum=bpi\",\"-wle\",\"print bpi(2000)\"],\"image\":\"perl\",\"name\":\"pi\"}],\"restartPolicy\":\"Never\"}}}}\n"
},
"creationTimestamp": "2019-07-11T19:30:43Z",
"labels": {
"app.kubernetes.io/managed-by": "pulumi"
},
"name": "foo",
"namespace": "default",
"resourceVersion": "764676",
"selfLink": "/apis/batch/v1/namespaces/default/jobs/foo",
"uid": "6000eff6-a412-11e9-a3c5-025000000001"
},
"spec": {
"backoffLimit": 4,
"completions": 1,
"parallelism": 1,
"selector": {
"matchLabels": {
"controller-uid": "6000eff6-a412-11e9-a3c5-025000000001"
}
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"controller-uid": "6000eff6-a412-11e9-a3c5-025000000001",
"job-name": "foo"
}
},
"spec": {
"containers": [
{
"command": [
"perl",
"-Mbignum=bpi",
"-wle",
"print bpi(2000)"
],
"image": "perl",
"imagePullPolicy": "Always",
"name": "pi",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
},
"status": {}
},
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"batch/v1\",\"kind\":\"Job\",\"metadata\":{\"labels\":{\"app.kubernetes.io/managed-by\":\"pulumi\"},\"name\":\"foo\"},\"spec\":{\"backoffLimit\":4,\"template\":{\"spec\":{\"containers\":[{\"command\":[\"perl\",\"-Mbignum=bpi\",\"-wle\",\"print bpi(2000)\"],\"image\":\"perl\",\"name\":\"pi\"}],\"restartPolicy\":\"Never\"}}}}\n"
},
"creationTimestamp": "2019-07-11T19:30:43Z",
"labels": {
"app.kubernetes.io/managed-by": "pulumi"
},
"name": "foo",
"namespace": "default",
"resourceVersion": "764679",
"selfLink": "/apis/batch/v1/namespaces/default/jobs/foo",
"uid": "6000eff6-a412-11e9-a3c5-025000000001"
},
"spec": {
"backoffLimit": 4,
"completions": 1,
"parallelism": 1,
"selector": {
"matchLabels": {
"controller-uid": "6000eff6-a412-11e9-a3c5-025000000001"
}
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"controller-uid": "6000eff6-a412-11e9-a3c5-025000000001",
"job-name": "foo"
}
},
"spec": {
"containers": [
{
"command": [
"perl",
"-Mbignum=bpi",
"-wle",
"print bpi(2000)"
],
"image": "perl",
"imagePullPolicy": "Always",
"name": "pi",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
},
"status": {
"active": 1,
"startTime": "2019-07-11T19:30:43Z"
}
},
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"batch/v1\",\"kind\":\"Job\",\"metadata\":{\"labels\":{\"app.kubernetes.io/managed-by\":\"pulumi\"},\"name\":\"foo\"},\"spec\":{\"backoffLimit\":4,\"template\":{\"spec\":{\"containers\":[{\"command\":[\"perl\",\"-Mbignum=bpi\",\"-wle\",\"print bpi(2000)\"],\"image\":\"perl\",\"name\":\"pi\"}],\"restartPolicy\":\"Never\"}}}}\n"
},
"creationTimestamp": "2019-07-11T19:30:43Z",
"labels": {
"app.kubernetes.io/managed-by": "pulumi"
},
"name": "foo",
"namespace": "default",
"resourceVersion": "764704",
"selfLink": "/apis/batch/v1/namespaces/default/jobs/foo",
"uid": "6000eff6-a412-11e9-a3c5-025000000001"
},
"spec": {
"backoffLimit": 4,
"completions": 1,
"parallelism": 1,
"selector": {
"matchLabels": {
"controller-uid": "6000eff6-a412-11e9-a3c5-025000000001"
}
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"controller-uid": "6000eff6-a412-11e9-a3c5-025000000001",
"job-name": "foo"
}
},
"spec": {
"containers": [
{
"command": [
"perl",
"-Mbignum=bpi",
"-wle",
"print bpi(2000)"
],
"image": "perl",
"imagePullPolicy": "Always",
"name": "pi",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
},
"status": {
"completionTime": "2019-07-11T19:30:56Z",
"conditions": [
{
"lastProbeTime": "2019-07-11T19:30:56Z",
"lastTransitionTime": "2019-07-11T19:30:56Z",
"status": "True",
"type": "Complete"
}
],
"startTime": "2019-07-11T19:30:43Z",
"succeeded": 1
}
}
]
Loading

0 comments on commit 8d6658d

Please sign in to comment.