Skip to content

Commit

Permalink
Add Job test
Browse files Browse the repository at this point in the history
  • Loading branch information
thegridman committed May 15, 2023
1 parent 43a7cc6 commit 0864ea3
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 72 deletions.
26 changes: 25 additions & 1 deletion api/v1/coherencejobresource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
// +kubebuilder:printcolumn:name="Active",priority=1,type="integer",JSONPath=".status.active",description="When the Coherence resource is running a Job, the number of pending and running pods"
// +kubebuilder:printcolumn:name="Succeeded",priority=1,type="integer",JSONPath=".status.succeeded",description="When the Coherence resource is running a Job, the number of pods which reached phase Succeeded"
// +kubebuilder:printcolumn:name="Failed",priority=1,type="integer",JSONPath=".status.failed",description="When the Coherence resource is running a Job, the number of pods which reached phase Failed"
// +kubebuilder:printcolumn:name="Image",priority=1,type="string",JSONPath=".spec.image",description="The image name"
type CoherenceJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down Expand Up @@ -463,6 +462,13 @@ func (in *CoherenceJobResourceSpec) CreateJob(deployment CoherenceResource) batc
return job
}

// ----- CoherenceResourceStatus type ---------------------------------------

type CoherenceJobStatus struct {
CoherenceResourceStatus `json:",inline"`
ProbeStatus []CoherenceJobProbeStatus `json:"probeStatus,omitempty"`
}

// ----- CoherenceJobList type ----------------------------------------------

// +kubebuilder:object:root=true
Expand All @@ -473,3 +479,21 @@ type CoherenceJobList struct {
metav1.ListMeta `json:"metadata,omitempty"`
Items []CoherenceJob `json:"items"`
}

// ----- CoherenceJobProbe type ---------------------------------------------

type CoherenceJobProbe struct {
Probe `json:",inline"`
// The number of job Pods that should be ready before executing the Probe.
// If not set the default will be the same as the job's Completions value.
// The probe will be executed on all Pods
// +optional
ReadyCount *int32 `json:"readyCount,omitempty"`
}

// ----- CoherenceJobProbeStatus type ----------------------------------------

type CoherenceJobProbeStatus struct {
Pod string `json:"pod,omitempty"`
Conditions Conditions `json:"conditions,omitempty"`
}
30 changes: 11 additions & 19 deletions api/v1/coherenceresource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ var _ CoherenceResource = &Coherence{}
// +kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".status.replicas",description="The number of Coherence deployments for this deployment"
// +kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas",description="The number of ready Coherence deployments for this deployment"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase",description="The status of this deployment"
// +kubebuilder:printcolumn:name="Image",priority=1,type="string",JSONPath=".spec.image",description="The image name"
type Coherence struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down Expand Up @@ -626,15 +625,9 @@ type CoherenceResourceStatus struct {
ActionsExecuted bool `json:"actionsExecuted,omitempty"`
}

// UpdatePhase updates the current Phase
// TODO not used?
func (in *CoherenceResourceStatus) UpdatePhase(deployment *Coherence, phase ConditionType) bool {
return in.SetCondition(deployment, Condition{Type: phase, Status: corev1.ConditionTrue})
}

// SetCondition sets the current Status Condition
func (in *CoherenceResourceStatus) SetCondition(deployment *Coherence, c Condition) bool {
deployment.Status.DeepCopyInto(in)
func (in *CoherenceResourceStatus) SetCondition(deployment CoherenceResource, c Condition) bool {
deployment.GetStatus().DeepCopyInto(in)
updated := in.ensureInitialized(deployment)
if in.Phase != "" && in.Phase == c.Type {
// already at the desired phase
Expand Down Expand Up @@ -688,7 +681,7 @@ func (in *CoherenceResourceStatus) Update(deployment *Coherence, sts *appsv1.Sta
}
}

if deployment.Spec.GetReplicas() == 0 {
if deployment.GetSpec().GetReplicas() == 0 {
// scaled to zero
if in.Phase != ConditionTypeStopped {
updated = in.setPhase(ConditionTypeStopped)
Expand All @@ -699,12 +692,10 @@ func (in *CoherenceResourceStatus) Update(deployment *Coherence, sts *appsv1.Sta
}

// UpdateFromJob the status based on the condition of the Job status.
func (in *CoherenceResourceStatus) UpdateFromJob(deployment *Coherence, jobStatus *batchv1.JobStatus) bool {
func (in *CoherenceResourceStatus) UpdateFromJob(deployment *CoherenceJob, jobStatus *batchv1.JobStatus) bool {
// ensure that there is an Initialized condition
updated := in.ensureInitialized(deployment)

fmt.Printf("***** UpdateFromJob %v\n", jobStatus)

if jobStatus != nil {
count := jobStatus.Active + jobStatus.Succeeded
// update CurrentReplicas from Job if required
Expand Down Expand Up @@ -813,18 +804,19 @@ func (in *CoherenceResourceStatus) setPhase(phase ConditionType) bool {
}

// ensure that the initial state conditions are present
func (in *CoherenceResourceStatus) ensureInitialized(deployment *Coherence) bool {
func (in *CoherenceResourceStatus) ensureInitialized(deployment CoherenceResource) bool {
updated := false

// update Hash if required
if in.Hash != deployment.Status.Hash {
in.Hash = deployment.Status.Hash
hash := deployment.GetStatus().Hash
if in.Hash != hash {
in.Hash = hash
updated = true
}

// update Replicas if required
if in.Replicas != deployment.Spec.GetReplicas() {
in.Replicas = deployment.Spec.GetReplicas()
if in.Replicas != deployment.GetReplicas() {
in.Replicas = deployment.GetReplicas()
updated = true
}

Expand All @@ -842,7 +834,7 @@ func (in *CoherenceResourceStatus) ensureInitialized(deployment *Coherence) bool

// update Selector if required
if in.Selector == "" {
in.Selector = fmt.Sprintf(StatusSelectorTemplate, deployment.GetCoherenceClusterName(), deployment.Name)
in.Selector = fmt.Sprintf(StatusSelectorTemplate, deployment.GetCoherenceClusterName(), deployment.GetName())
updated = true
}

Expand Down
4 changes: 2 additions & 2 deletions api/v1/coherenceresourcespec_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ type Action struct {
// +optional
Name string `json:"name,omitempty"`

// This is the spec of some sort of probe to fire when the StatefulSet becomes ready
// This is the spec of some sort of probe to fire when the Coherence resource becomes ready
Probe *Probe `json:"probe,omitempty"`
// or this is the spec of a Job to create when the StatefulSet becomes ready
// or this is the spec of a Job to create when the Coherence resource becomes ready
Job *ActionJob `json:"job,omitempty"`
}

Expand Down
66 changes: 66 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions controllers/coherence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (in *CoherenceReconciler) Reconcile(ctx context.Context, request ctrl.Reque

// ensure that the deployment has an initial status
if deployment.Status.Phase == "" {
err := in.UpdateDeploymentStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeInitialized)
err := in.UpdateCoherenceStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeInitialized)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func (in *CoherenceReconciler) Reconcile(ctx context.Context, request ctrl.Reque

// if replica count is zero update the status to Stopped
if deployment.GetReplicas() == 0 {
if err = in.UpdateDeploymentStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeStopped); err != nil {
if err = in.UpdateCoherenceStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeStopped); err != nil {
return result, errors.Wrap(err, "error updating deployment status")
}
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/coherencejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (in *CoherenceJobReconciler) ReconcileDeployment(ctx context.Context, reque
// ensure that the deployment has an initial status
status := deployment.GetStatus()
if status.Phase == "" {
err := in.UpdateDeploymentStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeInitialized)
err := in.UpdateCoherenceJobStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeInitialized)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (in *CoherenceJobReconciler) ReconcileDeployment(ctx context.Context, reque

// if replica count is zero update the status to Stopped
if deployment.GetReplicas() == 0 {
if err = in.UpdateDeploymentStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeStopped); err != nil {
if err = in.UpdateCoherenceJobStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeStopped); err != nil {
return result, errors.Wrap(err, "error updating deployment status")
}
}
Expand Down
28 changes: 14 additions & 14 deletions controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request
err = in.Delete(ctx, request.Namespace, request.Name, logger)
} else {
// The Job and parent resource has been deleted so no more to do
_, err = in.updateDeploymentStatus(ctx, request)
err = in.updateDeploymentStatus(ctx, request)
return reconcile.Result{}, err
}
case !jobExists:
// Job does not exist but deployment does so create the Job (checking any start quorum)
result, err = in.createJob(ctx, deployment, storage, logger)
case jobCompleted:
// Nothing to do, the job is complete
_, err = in.updateDeploymentStatus(ctx, request)
err = in.updateDeploymentStatus(ctx, request)
return reconcile.Result{}, err
default:
// Both Job and deployment exists so this is maybe an update
Expand All @@ -156,7 +156,7 @@ func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request
return result, err
}

_, err = in.updateDeploymentStatus(ctx, request)
err = in.updateDeploymentStatus(ctx, request)
if err != nil {
return result, err
}
Expand All @@ -173,7 +173,7 @@ func (in *ReconcileJob) createJob(ctx context.Context, deployment coh.CoherenceR
if !ok {
// start quorum not met, send event and update deployment status
in.GetEventRecorder().Event(deployment, corev1.EventTypeNormal, "Waiting", reason)
_ = in.UpdateDeploymentStatusCondition(ctx, deployment.GetNamespacedName(), coh.Condition{
_ = in.UpdateCoherenceJobStatusCondition(ctx, deployment.GetNamespacedName(), coh.Condition{
Type: coh.ConditionTypeWaiting,
Status: corev1.ConditionTrue,
Reason: "StatusQuorum",
Expand All @@ -185,7 +185,7 @@ func (in *ReconcileJob) createJob(ctx context.Context, deployment coh.CoherenceR
err := in.Create(ctx, deployment.GetName(), storage, logger)
if err == nil {
// ensure that the deployment has a Created status
err := in.UpdateDeploymentStatusPhase(ctx, deployment.GetNamespacedName(), coh.ConditionTypeCreated)
err := in.UpdateCoherenceJobStatusPhase(ctx, deployment.GetNamespacedName(), coh.ConditionTypeCreated)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "updating deployment status")
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (in *ReconcileJob) patchJob(ctx context.Context, deployment coh.CoherenceRe
// if there is any patch to apply, this will check StatusHA if required and update the deployment status
callback := func() {
// ensure that the deployment has an "Upgrading" status
if err := in.UpdateDeploymentStatusPhase(ctx, deployment.GetNamespacedName(), coh.ConditionTypeRollingUpgrade); err != nil {
if err := in.UpdateCoherenceJobStatusPhase(ctx, deployment.GetNamespacedName(), coh.ConditionTypeRollingUpgrade); err != nil {
logger.Error(err, "Error updating deployment status to Upgrading")
}
}
Expand Down Expand Up @@ -330,39 +330,39 @@ func (in *ReconcileJob) patchJob(ctx context.Context, deployment coh.CoherenceRe
}

// updateDeploymentStatus updates the Coherence resource's status.
func (in *ReconcileJob) updateDeploymentStatus(ctx context.Context, request reconcile.Request) (*coh.Coherence, error) {
func (in *ReconcileJob) updateDeploymentStatus(ctx context.Context, request reconcile.Request) error {
var err error
var job *batchv1.Job
job, _, err = in.MaybeFindJob(ctx, request.Namespace, request.Name)
if err != nil {
// an error occurred
err = errors.Wrapf(err, "getting Job %s", request.Name)
return nil, err
return err
}

deployment := &coh.Coherence{}
err = in.GetClient().Get(ctx, request.NamespacedName, deployment)
cj := &coh.CoherenceJob{}
err = in.GetClient().Get(ctx, request.NamespacedName, cj)
switch {
case err != nil && apierrors.IsNotFound(err):
// deployment not found - possibly deleted
err = nil
case err != nil:
// an error occurred
err = errors.Wrapf(err, "getting deployment %s", request.Name)
case deployment.GetDeletionTimestamp() != nil:
case cj.GetDeletionTimestamp() != nil:
// deployment is being deleted
err = nil
default:
updated := deployment.DeepCopy()
updated := cj.DeepCopy()
var jobStatus *batchv1.JobStatus
if job == nil {
jobStatus = nil
} else {
jobStatus = &job.Status
}
if updated.Status.UpdateFromJob(deployment, jobStatus) {
if updated.Status.UpdateFromJob(cj, jobStatus) {
err = in.GetClient().Status().Update(ctx, updated)
}
}
return deployment, err
return err
}
Loading

0 comments on commit 0864ea3

Please sign in to comment.