/
helpers.go
86 lines (75 loc) · 1.84 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package reader
import (
"context"
"sort"
"sync"
"github.com/ydb-platform/jaeger-ydb-store/schema"
"github.com/ydb-platform/jaeger-ydb-store/storage/spanstore/dbmodel"
)
type bucketOperation func(ctx context.Context, bucket uint8)
func runBucketOperation(ctx context.Context, numBuckets uint8, callbackFunc bucketOperation) {
wg := new(sync.WaitGroup)
wg.Add(int(numBuckets))
for i := uint8(0); i < numBuckets; i++ {
go func(ctx context.Context, bucket uint8) {
defer wg.Done()
callbackFunc(ctx, bucket)
}(ctx, i)
}
wg.Wait()
}
type partitionOperation func(ctx context.Context, key schema.PartitionKey)
func runPartitionOperation(ctx context.Context, parts []schema.PartitionKey, opFunc partitionOperation) {
wg := new(sync.WaitGroup)
wg.Add(len(parts))
for _, part := range parts {
go func(ctx context.Context, part schema.PartitionKey) {
defer wg.Done()
opFunc(ctx, part)
}(ctx, part)
}
wg.Wait()
}
type sharedResult struct {
Rows []dbmodel.IndexResult
Error error
mx *sync.Mutex
cancelCtx context.CancelFunc
}
func newSharedResult(cancelFunc context.CancelFunc) *sharedResult {
return &sharedResult{
mx: new(sync.Mutex),
cancelCtx: cancelFunc,
Rows: make([]dbmodel.IndexResult, 0),
}
}
func (r *sharedResult) AddRows(rows []dbmodel.IndexResult, err error) {
r.mx.Lock()
defer r.mx.Unlock()
if err != nil {
if r.Error == nil {
r.Error = err
}
r.Rows = nil
r.cancelCtx()
return
}
for _, row := range rows {
r.Rows = append(r.Rows, row)
}
}
func (r *sharedResult) ProcessRows() (*dbmodel.UniqueTraceIDs, error) {
if r.Error != nil {
return nil, r.Error
}
sort.Slice(r.Rows, func(i, j int) bool {
return r.Rows[i].RevTs < r.Rows[j].RevTs
})
ids := dbmodel.NewUniqueTraceIDs()
for _, row := range r.Rows {
for _, id := range row.Ids {
ids.Add(id)
}
}
return ids, nil
}