Skip to content

Commit

Permalink
Add job ready action test
Browse files Browse the repository at this point in the history
  • Loading branch information
thegridman committed May 22, 2023
1 parent 282c319 commit 4b41ce3
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 15 deletions.
3 changes: 2 additions & 1 deletion api/v1/coherencejobresource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ type CoherenceJobResourceSpec struct {
// is ready. One the required number of ready Pods is reached the probe
// will also be executed on every Pod that becomes ready after that time.
// +optional
ReadyAction *CoherenceJobProbe `json:"ReadyAction,omitempty"`
ReadyAction *CoherenceJobProbe `json:"readyAction,omitempty"`
}

// GetRestartPolicy returns the name of the application image to use
Expand Down Expand Up @@ -505,4 +505,5 @@ type CoherenceJobProbeStatus struct {
LastReadyTime *metav1.Time `json:"lastReadyTime,omitempty"`
LastProbeTime *metav1.Time `json:"lastProbeTime,omitempty"`
Success *bool `json:"success,omitempty"`
Error *string `json:"error,omitempty"`
}
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 24 additions & 14 deletions controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request
}

logger := in.GetLog().WithValues("Namespace", request.Namespace, "Name", request.Name)
logger.Info("Reconciling Job for deployment")
logger.Info("Reconciling Job")

// Fetch the Job's current state
jobCurrent, jobExists, err := in.MaybeFindJob(ctx, request.Namespace, request.Name)
if err != nil {
logger.Info("Finished reconciling Job for deployment. Error getting Job", "error", err.Error())
logger.Info("Finished reconciling Job. Error getting Job", "error", err.Error())
return result, errors.Wrapf(err, "getting Job %s/%s", request.Namespace, request.Name)
}

Expand Down Expand Up @@ -134,13 +134,13 @@ func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request
err = in.UpdateDeploymentStatusActionsState(ctx, request.NamespacedName, false)
// TODO: what to do with error?
if err != nil {
logger.Info("Error updating deployment status", "error", err.Error())
logger.Info("Error updating CoherenceJob status", "error", err.Error())
}
}
// delete the Job
err = in.Delete(ctx, request.Namespace, request.Name, logger)
} else {
// The Job and parent resource has been deleted so no more to do
// The Job and parent resource have been deleted so no more to do
err = in.updateDeploymentStatus(ctx, request, nil)
return reconcile.Result{}, err
}
Expand All @@ -153,7 +153,7 @@ func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request
return reconcile.Result{}, err
default:
// Both Job and deployment exists so this is maybe an update
result, err = in.updateJob(ctx, deployment, jobCurrent, storage, logger)
result, err = in.updateJob(ctx, deployment, jobCurrent.DeepCopy(), storage, logger)
if err == nil {
statuses, err = in.maybeExecuteProbe(ctx, jobCurrent, deployment, logger)
}
Expand All @@ -169,7 +169,7 @@ func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request
return result, err
}

logger.Info("Finished reconciling Job for deployment")
logger.Info("Finished reconciling Job")
return result, nil
}

Expand Down Expand Up @@ -207,11 +207,11 @@ func (in *ReconcileJob) createJob(ctx context.Context, deployment coh.CoherenceR
return reconcile.Result{}, err
}

func (in *ReconcileJob) updateJob(ctx context.Context, deployment coh.CoherenceResource, current *batchv1.Job, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
func (in *ReconcileJob) updateJob(ctx context.Context, deployment coh.CoherenceResource, job *batchv1.Job, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
logger.Info("Updating job")

// get the desired resource state from the store
resource, found := storage.GetLatest().GetResource(coh.ResourceTypeJob, current.Name)
resource, found := storage.GetLatest().GetResource(coh.ResourceTypeJob, job.Name)
if !found {
// Desired state not found requeue and the request should sort itself out next time around
logger.Info("Cannot locate desired state for Job, possibly a deletion, re-queuing request")
Expand All @@ -224,13 +224,14 @@ func (in *ReconcileJob) updateJob(ctx context.Context, deployment coh.CoherenceR
}

desired := resource.Spec.(*batchv1.Job)
return in.patchJob(ctx, deployment, current, desired, storage, logger)
// copy the job as the patch
return in.patchJob(ctx, deployment, job, desired, storage, logger)
}

// Patch the Job if required, returning a bool to indicate whether a patch was applied.
func (in *ReconcileJob) patchJob(ctx context.Context, deployment coh.CoherenceResource, current, desired *batchv1.Job, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
hashMatches := in.HashLabelsMatch(current, storage)
resource, _ := storage.GetPrevious().GetResource(coh.ResourceTypeJob, current.GetName())
func (in *ReconcileJob) patchJob(ctx context.Context, deployment coh.CoherenceResource, job, desired *batchv1.Job, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
hashMatches := in.HashLabelsMatch(job, storage)
resource, _ := storage.GetPrevious().GetResource(coh.ResourceTypeJob, job.GetName())
original := &batchv1.Job{}

if resource.IsPresent() {
Expand All @@ -251,6 +252,9 @@ func (in *ReconcileJob) patchJob(ctx context.Context, deployment coh.CoherenceRe
return reconcile.Result{Requeue: false}, fmt.Errorf(msg)
}

// copy the job, so we do not alter the passed in job
current := job.DeepCopy()

// We NEVER patch finalizers
original.ObjectMeta.Finalizers = current.ObjectMeta.Finalizers
desired.ObjectMeta.Finalizers = current.ObjectMeta.Finalizers
Expand Down Expand Up @@ -386,13 +390,18 @@ func (in *ReconcileJob) maybeExecuteProbe(ctx context.Context, job *batchv1.Job,

// get the
var readyCount int32
if action.ReadyCount == nil {
if action.ReadyCount != nil {
readyCount = *action.ReadyCount
} else {
readyCount = deployment.GetReplicas()
}

if job.Status.Ready == nil || *job.Status.Ready < readyCount {
count := job.Status.Succeeded
if job.Status.Ready != nil {
count += *job.Status.Ready
}

if count < readyCount {
return statuses, nil
}

Expand Down Expand Up @@ -431,6 +440,7 @@ func (in *ReconcileJob) maybeExecuteProbe(ctx context.Context, job *batchv1.Job,
} else {
logger.Info(fmt.Sprintf("Executed probe using pod %s", name), "Error", err)
probeStatus.Success = pointer.Bool(false)
probeStatus.Error = pointer.String(err.Error())
}
now := metav1.Now()
probeStatus.LastProbeTime = &now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.tangosol.net.CacheFactory;
Expand All @@ -33,6 +34,8 @@
*/
public class RestServer {

private static final AtomicLong COUNTER = new AtomicLong();

/**
* Private constructor.
*/
Expand Down Expand Up @@ -60,6 +63,9 @@ public static void main(String[] args) throws Exception {
server.createContext("/canaryCheck", RestServer::canaryCheck);
server.createContext("/canaryClear", RestServer::canaryClear);
server.createContext("/shutdown", RestServer::shutdown);
server.createContext("/test", RestServer::test);
server.createContext("/testGet", RestServer::resetTest);
server.createContext("/testReset", RestServer::getTest);

server.setExecutor(null); // creates a default executor
server.start();
Expand Down Expand Up @@ -175,6 +181,25 @@ private static void shutdown(HttpExchange t) throws IOException {
System.exit(exitCode);
}

private static void test(HttpExchange t) throws IOException {
long count = COUNTER.getAndIncrement();
System.out.println("Test: incremented count=" + count);
send(t, 200, String.valueOf(count));
}

private static void getTest(HttpExchange t) throws IOException {
long count = COUNTER.get();
System.out.println("Test: get count=" + count);
send(t, 200, String.valueOf(count));
}

private static void resetTest(HttpExchange t) throws IOException {
COUNTER.set(0);
System.out.println("Test: reset");
send(t, 200, "");
}


private static String getMainClass() {
try {
return Coherence.class.getCanonicalName();
Expand Down
18 changes: 18 additions & 0 deletions test/e2e/local/job-with-ready-action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: coherence.oracle.com/v1
kind: CoherenceJob
metadata:
name: test-job
spec:
cluster: test
replicas: 3
image: ${TEST_APPLICATION_IMAGE}
readyAction:
httpGet:
port: rest
path: test
ports:
- name: rest
port: 8080
readinessProbe:
initialDelaySeconds: 10
periodSeconds: 10
40 changes: 40 additions & 0 deletions test/e2e/local/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ func TestJobWithSingleFailedReplica(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
}

func TestJobWithReadyAction(t *testing.T) {
// Make sure we defer clean-up when we're done!!
testContext.CleanupAfterTest(t)
g := NewWithT(t)

name := "test-job"

jobs, _ := helper.AssertCoherenceJobs(testContext, t, "job-with-ready-action.yaml")

job, ok := jobs[name]
g.Expect(ok).To(BeTrue(), fmt.Sprintf("did not find expected '%s' deployment", name))

condition := jobProbesExecuted{count: int(job.GetReplicas())}
_, err := helper.WaitForCoherenceJobCondition(testContext, job.Namespace, job.Name, condition, time.Second*10, time.Minute*5)
g.Expect(err).NotTo(HaveOccurred())
}

func deployJob(t *testing.T, ns, name string, replicas int32) []corev1.Pod {
g := NewWithT(t)

Expand All @@ -124,3 +141,26 @@ func deployJob(t *testing.T, ns, name string, replicas int32) []corev1.Pod {

return pods
}

type jobProbesExecuted struct {
count int
}

func (in jobProbesExecuted) Test(d coh.CoherenceResource) bool {
status := d.GetStatus()
if len(status.JobProbes) == 0 {
return false
}

success := 0
for _, s := range status.JobProbes {
if s.Success != nil && *s.Success {
success++
}
}
return success == in.count
}

func (in jobProbesExecuted) String() string {
return fmt.Sprintf("Job ready probes executed on %d pods", in.count)
}

0 comments on commit 4b41ce3

Please sign in to comment.