/
reschedule_jobs_with_quota_exceeded_error.go
95 lines (84 loc) · 2.94 KB
/
reschedule_jobs_with_quota_exceeded_error.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package tasks
import (
"context"
"fmt"
"github.com/golang/glog"
"github.com/ottogroup/penelope/pkg/repository"
"github.com/ottogroup/penelope/pkg/secret"
"go.opencensus.io/trace"
"time"
)
const pacificTimeLocation = "US/Pacific"
type rescheduleJobsWithQuotaErrorService struct {
jobRepository repository.JobRepository
pacificTimeLocation *time.Location
}
func newRescheduleJobsWithQuotaError(ctxIn context.Context, credentialsProvider secret.SecretProvider) (*rescheduleJobsWithQuotaErrorService, error) {
ctx, span := trace.StartSpan(ctxIn, "newRescheduleJobsWithQuotaError")
defer span.End()
jobRepository, err := repository.NewJobRepository(ctx, credentialsProvider)
if err != nil {
return &rescheduleJobsWithQuotaErrorService{}, fmt.Errorf("could not instantiate new JobRepository: %s", err)
}
timeLocation, err := time.LoadLocation(pacificTimeLocation)
if err != nil {
return &rescheduleJobsWithQuotaErrorService{}, fmt.Errorf("could not load location for %s: %s", pacificTimeLocation, err)
}
return &rescheduleJobsWithQuotaErrorService{jobRepository: jobRepository, pacificTimeLocation: timeLocation}, nil
}
func (r *rescheduleJobsWithQuotaErrorService) Run(ctxIn context.Context) {
ctx, span := trace.StartSpan(ctxIn, "(*rescheduleJobsWithQuotaErrorService).Run")
defer span.End()
glog.Infof("[START] Reschedule Jobs With Quota Error")
jobs, err := r.jobRepository.GetByJobTypeAndStatus(ctx, repository.BigQuery, repository.FinishedQuotaError)
if err != nil {
glog.Infof("[FAIL] GetByJobTypeAndStatus failed: %s", err)
return
}
if len(jobs) == 0 {
glog.Infof("[SUCCESS] No jobs has Quota exceeded")
return
}
glog.Infof("Rescheduling %d jobs with Quota error", len(jobs))
failedToRescheduleCount := 0
for _, job := range jobs {
isQuotaRenewed, err := hasQuotaRenewedForJob(job)
if err != nil {
glog.Warningf("[FAIL] not able to calculate job next quota time with ID: %s", job.ID)
continue
}
if !isQuotaRenewed {
continue
}
patch := repository.JobPatch{ID: job.ID, Status: repository.NotScheduled}
err = r.jobRepository.PatchJobStatus(ctx, patch)
if err != nil {
glog.Warningf("[FAIL] not able to reschedule job with ID: %s", job.ID)
failedToRescheduleCount++
}
}
if failedToRescheduleCount != 0 {
glog.Infof("[FAIL] %d jobs where not rescheduled", failedToRescheduleCount)
return
}
glog.Infof("[SUCCESS] All jobs where rescheduled")
}
func hasQuotaRenewedForJob(job *repository.Job) (bool, error) {
pacificTimeLocation, err := time.LoadLocation("US/Pacific")
if err != nil {
return false, err
}
jobUpdateTimeInPT := job.UpdatedTimestamp.In(pacificTimeLocation)
nextQuotaTimeResetForJob := jobUpdateTimeInPT.AddDate(0, 0, 1)
nextQuotaTimeResetForJob = time.Date(
nextQuotaTimeResetForJob.Year(),
nextQuotaTimeResetForJob.Month(),
nextQuotaTimeResetForJob.Day(),
0,
0,
0,
0,
pacificTimeLocation,
)
return time.Now().In(pacificTimeLocation).After(nextQuotaTimeResetForJob), nil
}