-
Notifications
You must be signed in to change notification settings - Fork 119
/
range_operations.go
107 lines (82 loc) · 2.35 KB
/
range_operations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package yt
import (
"context"
"go.ytsaurus.tech/library/go/ptr"
)
const rangeJobsDefaultLimit = 1000
type RangeOperationsCallback func(op OperationStatus)
// RangeOperations iterates over operations with pagination and calls cb on each operation.
func RangeOperations(ctx context.Context, yc Client, opts *ListOperationsOptions, cb RangeOperationsCallback) error {
if opts == nil {
opts = &ListOperationsOptions{}
}
for incomplete := true; incomplete; {
rsp, err := yc.ListOperations(ctx, opts)
if err != nil {
return err
}
for _, o := range rsp.Operations {
cb(o)
}
incomplete = rsp.Incomplete
if len(rsp.Operations) > 0 {
lastOp := rsp.Operations[len(rsp.Operations)-1]
opts.Cursor = &lastOp.StartTime
}
}
return nil
}
// ListAllOperations lists operations with pagination.
//
// Depending on the filters used the result might be quite big.
// Consider using RangeOperations to limit memory consumption.
func ListAllOperations(ctx context.Context, yc Client, opts *ListOperationsOptions) ([]OperationStatus, error) {
var ops []OperationStatus
err := RangeOperations(ctx, yc, opts, func(op OperationStatus) {
ops = append(ops, op)
})
if err != nil {
return nil, err
}
return ops, nil
}
type RangeJobsCallback func(job JobStatus)
// RangeJobs iterates over operation jobs with pagination and calls cb on each job.
func RangeJobs(ctx context.Context, yc Client, opID OperationID, opts *ListJobsOptions, cb RangeJobsCallback) error {
if opts == nil {
opts = &ListJobsOptions{}
}
if opts.Limit == nil {
opts.Limit = ptr.Int(rangeJobsDefaultLimit)
}
if opts.Offset == nil {
opts.Offset = ptr.Int(0)
}
for offset := *opts.Offset; ; offset += *opts.Limit {
opts.Offset = &offset
rsp, err := yc.ListJobs(ctx, opID, opts)
if err != nil {
return err
}
for _, job := range rsp.Jobs {
cb(job)
}
if len(rsp.Jobs) < *opts.Limit {
return nil
}
}
}
// ListAllJobs lists operation jobs with pagination.
//
// Depending on the filters used the result might be quite big.
// Consider using RangeOperations to limit memory consumption.
func ListAllJobs(ctx context.Context, yc Client, opID OperationID, opts *ListJobsOptions) ([]JobStatus, error) {
var jobs []JobStatus
err := RangeJobs(ctx, yc, opID, opts, func(op JobStatus) {
jobs = append(jobs, op)
})
if err != nil {
return nil, err
}
return jobs, nil
}