-
Notifications
You must be signed in to change notification settings - Fork 402
/
observer.go
161 lines (137 loc) · 4.36 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"math/rand"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
)
// Observer populates reservoirs and the audit queue.
//
// architecture: Observer
type Observer struct {
log *zap.Logger
queue VerifyQueue
config Config
seedRand *rand.Rand
// The follow fields are reset on each segment loop cycle.
Reservoirs map[metabase.NodeAlias]*Reservoir
}
var _ rangedloop.Observer = (*Observer)(nil)
var _ rangedloop.Partial = (*observerFork)(nil)
// NewObserver instantiates Observer.
func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer {
if config.VerificationPushBatchSize < 1 {
config.VerificationPushBatchSize = 1
}
return &Observer{
log: log,
queue: queue,
config: config,
seedRand: rand.New(rand.NewSource(time.Now().Unix())),
}
}
// Start prepares the observer for audit segment collection.
func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
obs.Reservoirs = make(map[metabase.NodeAlias]*Reservoir)
return nil
}
// Fork returns a new audit reservoir collector for the range.
func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
defer mon.Task()(&ctx)(&err)
// Each collector needs an RNG for sampling. On systems where time
// resolution is low (e.g. windows is 15ms), seeding an RNG using the
// current time (even with nanosecond precision) may end up reusing a seed
// for two or more RNGs. To prevent that, the observer itself uses an RNG
// to seed the per-collector RNGs.
rnd := rand.New(rand.NewSource(obs.seedRand.Int63()))
return newObserverFork(obs.config.Slots, rnd), nil
}
// Join merges the audit reservoir collector into the per-node reservoirs.
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
fork, ok := partial.(*observerFork)
if !ok {
return errs.New("expected partial type %T but got %T", fork, partial)
}
for nodeAlias, reservoir := range fork.reservoirs {
existing, ok := obs.Reservoirs[nodeAlias]
if !ok {
obs.Reservoirs[nodeAlias] = reservoir
continue
}
if err := existing.Merge(reservoir); err != nil {
return err
}
}
return nil
}
// Finish builds and dedups an audit queue from the merged per-node reservoirs.
func (obs *Observer) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
type SegmentKey struct {
StreamID uuid.UUID
Position uint64
}
var newQueue []Segment
queueSegments := make(map[SegmentKey]struct{})
// Add reservoir segments to queue in pseudorandom order.
for i := 0; i < obs.config.Slots; i++ {
for _, res := range obs.Reservoirs {
segments := res.Segments()
// Skip reservoir if no segment at this index.
if len(segments) <= i {
continue
}
segment := segments[i]
segmentKey := SegmentKey{
StreamID: segment.StreamID,
Position: segment.Position.Encode(),
}
if _, ok := queueSegments[segmentKey]; !ok {
newQueue = append(newQueue, NewSegment(segment))
queueSegments[segmentKey] = struct{}{}
}
}
}
// Push new queue to queues struct so it can be fetched by worker.
return obs.queue.Push(ctx, newQueue, obs.config.VerificationPushBatchSize)
}
type observerFork struct {
reservoirs map[metabase.NodeAlias]*Reservoir
slotCount int
rand *rand.Rand
}
func newObserverFork(reservoirSlots int, r *rand.Rand) *observerFork {
return &observerFork{
reservoirs: make(map[metabase.NodeAlias]*Reservoir),
slotCount: reservoirSlots,
rand: r,
}
}
// Process performs per-node reservoir sampling on remote segments for addition into the audit queue.
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
for _, segment := range segments {
// The reservoir ends up deferencing and copying the segment internally
// but that's not obvious, so alias the loop variable.
segment := segment
if segment.Inline() {
continue
}
for _, piece := range segment.AliasPieces {
res, ok := fork.reservoirs[piece.Alias]
if !ok {
res = NewReservoir(fork.slotCount)
fork.reservoirs[piece.Alias] = res
}
res.Sample(fork.rand, segment)
}
}
return nil
}