Skip to content

Commit

Permalink
OCPBUGS-31031: use retrywatcher when watching job (#921)
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes committed Mar 25, 2024
1 parent d00f74f commit 636bb48
Show file tree
Hide file tree
Showing 6 changed files with 629 additions and 8 deletions.
21 changes: 14 additions & 7 deletions pkg/controller/periodic/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
)

// JobController type responsible for
Expand Down Expand Up @@ -107,24 +109,29 @@ func (j *JobController) CreateGathererJob(ctx context.Context, dataGatherName, i
}

// WaitForJobCompletion listen the Kubernetes events to check if job finished.
func (j *JobController) WaitForJobCompletion(ctx context.Context, jobName string) error {
watcher, err := j.kubeClient.BatchV1().Jobs(insightsNamespace).
Watch(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", jobName)})
func (j *JobController) WaitForJobCompletion(ctx context.Context, job *batchv1.Job) error {
watcherFnc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
return j.kubeClient.BatchV1().Jobs(insightsNamespace).
Watch(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", job.Name)})
}

retryWatcher, err := watch.NewRetryWatcher(job.ResourceVersion, &cache.ListWatch{WatchFunc: watcherFnc})
if err != nil {
return err
}
defer watcher.Stop()

defer retryWatcher.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.ResultChan():
case event, ok := <-retryWatcher.ResultChan():
if !ok {
return fmt.Errorf("watcher channel was closed unexpectedly")
}

if event.Type != watch.Modified {
if event.Type != apiWatch.Modified {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (c *Controller) runJobAndCheckResults(ctx context.Context, dataGather *insi
}

klog.Infof("Created new gathering job %v", gj.Name)
err = c.jobController.WaitForJobCompletion(ctx, gj.Name)
err = c.jobController.WaitForJobCompletion(ctx, gj)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
klog.Errorf("Failed to read job status: %v", err)
Expand Down
150 changes: 150 additions & 0 deletions vendor/k8s.io/client-go/tools/watch/informerwatcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 636bb48

Please sign in to comment.