-
Notifications
You must be signed in to change notification settings - Fork 2
/
job_repository.go
316 lines (261 loc) · 10.3 KB
/
job_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package repository
import (
"context"
"fmt"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/ottogroup/penelope/pkg/secret"
"github.com/ottogroup/penelope/pkg/service"
"go.opencensus.io/trace"
"time"
)
// JobStatistics for a job
type JobStatistics map[JobStatus]uint64
// AllJobs will fetch all jobs
const AllJobs = -101
// JobPage represent what subset of jobs to fetch
type JobPage struct {
// Size how many elements to fetch, value AllJobs will fetch all
Size int
Number int
}
// JobRepository defines operation with backup job
type JobRepository interface {
AddJob(context.Context, *Job) error
AddJobs(ctxIn context.Context, jobs []*Job) error
DeleteJob(context.Context, string) error
GetJob(context.Context, string) (*Job, error)
MarkDeleted(context.Context, string) error
GetByJobTypeAndStatus(context.Context, BackupType, ...JobStatus) ([]*Job, error)
GetByStatusAndBefore(context.Context, []JobStatus, int) ([]*Job, error)
PatchJobStatus(ctx context.Context, patch JobPatch) error
GetJobsForBackupID(ctx context.Context, backupID string, jobPage JobPage) ([]*Job, error)
GetBackupRestoreJobs(ctx context.Context, backupID, jobID string) ([]*Job, error)
GetStatisticsForBackupID(ctx context.Context, backupID string) (JobStatistics, error)
}
// defaultJobRepository implements JobRepository
type defaultJobRepository struct {
storageService *service.Service
}
// NewJobRepository create new instance of JobRepository
func NewJobRepository(ctxIn context.Context, credentialsProvider secret.SecretProvider) (JobRepository, error) {
ctx, span := trace.StartSpan(ctxIn, "NewJobRepository")
defer span.End()
storageService, err := service.NewStorageService(ctx, credentialsProvider)
if err != nil {
return nil, err
}
return &defaultJobRepository{storageService: storageService}, nil
}
// AddJob add new backup job
func (d *defaultJobRepository) AddJob(ctxIn context.Context, job *Job) error {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).AddJob")
defer span.End()
_, err := d.storageService.DB().Model(job).Insert()
if err != nil {
return fmt.Errorf("error during executing add job statement: %s", err)
}
return nil
}
// AddJobs add new backup jobs
func (d *defaultJobRepository) AddJobs(ctxIn context.Context, jobs []*Job) error {
ctx, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).AddJobs")
defer span.End()
return d.storageService.DB().RunInTransaction(ctx, func(tx *pg.Tx) error {
for _, job := range jobs {
_, err := tx.Model(job).Insert()
if err != nil {
return err
}
}
return nil
})
}
// DeleteJob remove job
func (d *defaultJobRepository) DeleteJob(ctxIn context.Context, jobID string) error {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).DeleteJob")
defer span.End()
_, err := d.storageService.DB().Model(&Job{ID: jobID}).WherePK().Delete()
if err != nil {
return fmt.Errorf("delete job with id %s failed: %s", jobID, err)
}
return nil
}
// GetJob get backup job details
func (d *defaultJobRepository) GetJob(ctxIn context.Context, jobID string) (*Job, error) {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).GetJob")
defer span.End()
job := &Job{ID: jobID}
err := d.storageService.DB().
Model(job).
Where("id = ?", jobID).
Where("audit_deleted_timestamp is null").
Select()
if err == pg.ErrNoRows {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("get job with id %s failed: %s", jobID, err)
}
return job, err
}
// GetJobsForBackupID get all backup jobs
func (d *defaultJobRepository) GetJobsForBackupID(ctxIn context.Context, backupID string, jobPage JobPage) ([]*Job, error) {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).GetJobsForBackupID")
defer span.End()
var jobs []*Job
db := d.storageService.DB()
query := db.Model(&jobs).Where("backup_id = ?", backupID)
if jobPage.Size != AllJobs {
offset := jobPage.Number * jobPage.Size
query = query.Offset(offset).Limit(jobPage.Size)
}
err := query.Select()
if err != nil {
return nil, fmt.Errorf("error during executing GetJobsForBackupID statement: %s", err)
}
return jobs, err
}
// GetBackupRestoreJobs get restore jobs for a backup
func (d *defaultJobRepository) GetBackupRestoreJobs(ctxIn context.Context, backupID, jobID string) ([]*Job, error) {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).GetBackupRestoreJobs")
defer span.End()
var jobs []*Job
db := d.storageService.DB()
subselect := db.Model().
Table("source_metadata").
Column("*").
ColumnExpr("rank() over (partition by source order by audit_created_timestamp desc) as inner_rank").
Where("backup_id = ?", backupID)
if jobID != "" {
auditCreatedTimestamp := db.Model(&Job{}).Column("audit_created_timestamp").Where("id = ?", jobID)
subselect = subselect.Where("to_timestamp(to_char(audit_created_timestamp, 'YYYY-MM-DD HH24:MI'), 'YYYY-MM-DD HH24:MI') <= (?)", auditCreatedTimestamp)
} else {
subselect = subselect.Where("to_timestamp(to_char(audit_created_timestamp, 'YYYY-MM-DD HH24:MI'), 'YYYY-MM-DD HH24:MI') <= NOW()")
}
err := db.Model().TableExpr("(?) AS s", subselect).
Column("j.*").
Join("LEFT JOIN source_metadata_jobs as smj on s.id=smj.source_metadata_id").
Join("LEFT JOIN jobs j on smj.job_id = j.id").
Where("inner_rank = 1 AND j.backup_id is NOT NULL").
Where("s.operation != 'Delete'").
Select(&jobs)
if err != nil {
return jobs, fmt.Errorf("error during executing GetBackupRestoreJobs statement: %s", err)
}
return jobs, nil
}
// GetByJobTypeAndStatus filter backup jobs by status and type
func (d *defaultJobRepository) GetByJobTypeAndStatus(ctxIn context.Context, backupType BackupType, status ...JobStatus) ([]*Job, error) {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).GetByJobTypeAndStatus")
defer span.End()
var jobs []*Job
db := d.storageService.DB()
err := db.Model(&jobs).
Where("type = ?", backupType.String()).
Where("audit_deleted_timestamp is null").
Where("status in (?)", pg.In(status)).
Select()
if err != nil {
return jobs, fmt.Errorf("error during executing get job by status statement: %s", err)
}
return jobs, nil
}
// GetByStatusAndBefore get job by status and before given time
func (d *defaultJobRepository) GetByStatusAndBefore(ctxIn context.Context, status []JobStatus, deltaHours int) ([]*Job, error) {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).GetByStatusAndBefore")
defer span.End()
var jobs []*Job
db := d.storageService.DB()
err := db.Model(&jobs).
Where("audit_deleted_timestamp is null").
WhereGroup(func(sub *orm.Query) (*orm.Query, error) {
return sub.
WhereGroup(func(sub *orm.Query) (*orm.Query, error) {
return sub.
Where("audit_updated_timestamp is null").
Where("audit_created_timestamp < NOW()-interval '1 hour'*?", deltaHours), nil
}).
WhereOrGroup(func(sub *orm.Query) (*orm.Query, error) {
return sub.
Where("audit_updated_timestamp is not null").
Where("audit_updated_timestamp < NOW()-interval '1 hour'*?", deltaHours), nil
}), nil
}).
Where("status in (?)", pg.In(status)).
Select()
if err != nil {
return jobs, fmt.Errorf("error during executing get job by status statement: %s", err)
}
return jobs, nil
}
// PatchJobStatus change job status
func (d *defaultJobRepository) PatchJobStatus(ctxIn context.Context, jobPatcher JobPatch) error {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).PatchJobStatus")
defer span.End()
job := &Job{
Status: jobPatcher.Status,
ForeignJobID: ForeignJobID{
BigQueryID: jobPatcher.ForeignJobID.BigQueryID,
CloudStorageID: jobPatcher.ForeignJobID.CloudStorageID,
},
EntityAudit: EntityAudit{
UpdatedTimestamp: time.Now(),
},
}
_, err := d.storageService.DB().Model(job).
Column("status", "audit_updated_timestamp", "bigquery_extract_job_id", "cloudstorage_transfer_job_id").
Where("audit_deleted_timestamp IS NULL").
Where("id = ?", jobPatcher.ID).
Update()
if err != nil {
return fmt.Errorf("error during executing updating job statement: %s", err)
}
return nil
}
// MarkDeleted mark BigQuery job as deleted
func (d *defaultJobRepository) MarkDeleted(ctxIn context.Context, id string) error {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).MarkDeleted")
defer span.End()
job := &Job{
ID: id,
Status: JobDeleted,
EntityAudit: EntityAudit{
UpdatedTimestamp: time.Now(),
DeletedTimestamp: time.Now(),
},
}
_, err := d.storageService.DB().
Model(job).
Column("status", "audit_updated_timestamp", "audit_deleted_timestamp").
WherePK().
Where("audit_deleted_timestamp IS NULL").
Update()
if err != nil {
return fmt.Errorf("error during executing updating job statemant: %s", err)
}
return nil
}
// GetStatisticsForBackupID prepare stats for a backup
func (d *defaultJobRepository) GetStatisticsForBackupID(ctxIn context.Context, backupID string) (JobStatistics, error) {
_, span := trace.StartSpan(ctxIn, "(*defaultJobRepository).GetStatisticsForBackupID")
defer span.End()
var results []struct {
Count uint64
Status JobStatus
}
err := d.storageService.DB().
Model((*Job)(nil)).
ColumnExpr("count(*) AS count").
Column("status").
Where("backup_id = ?", backupID).
Group("status").
Select(&results)
if err != nil {
return nil, err
}
jobStatistics := make(JobStatistics)
for _, result := range results {
jobStatistics[result.Status] = result.Count
}
return jobStatistics, nil
}