-
Notifications
You must be signed in to change notification settings - Fork 390
/
observer_sync.go
171 lines (143 loc) · 4.94 KB
/
observer_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/overlay"
)
// SyncObserver implements a rangedloop observer to collect bloom filters for the garbage collection.
type SyncObserver struct {
log *zap.Logger
config Config
overlay overlay.DB
upload *Upload
// The following fields are reset for each loop.
startTime time.Time
lastPieceCounts map[storj.NodeID]int64
seed byte
mu sync.Mutex
retainInfos map[storj.NodeID]*RetainInfo
// LatestCreationTime will be used to set bloom filter CreationDate.
// Because bloom filter service needs to be run against immutable database snapshot
// we can set CreationDate for bloom filters as a latest segment CreatedAt value.
latestCreationTime time.Time
}
var _ (rangedloop.Observer) = (*Observer)(nil)
// NewSyncObserver creates a new instance of the gc rangedloop observer.
func NewSyncObserver(log *zap.Logger, config Config, overlay overlay.DB) *SyncObserver {
return &SyncObserver{
log: log,
overlay: overlay,
upload: NewUpload(log, config),
config: config,
}
}
// Start is called at the beginning of each segment loop.
func (obs *SyncObserver) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
switch {
case obs.config.AccessGrant == "":
return errs.New("Access Grant is not set")
case obs.config.Bucket == "":
return errs.New("Bucket is not set")
}
obs.log.Debug("collecting bloom filters started")
// load last piece counts from overlay db
lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx)
if err != nil {
obs.log.Error("error getting last piece counts", zap.Error(err))
err = nil
}
if lastPieceCounts == nil {
lastPieceCounts = make(map[storj.NodeID]int64)
}
obs.startTime = startTime
obs.lastPieceCounts = lastPieceCounts
obs.retainInfos = make(map[storj.NodeID]*RetainInfo, len(lastPieceCounts))
obs.latestCreationTime = time.Time{}
obs.seed = bloomfilter.GenerateSeed()
return nil
}
// Fork creates a Partial to build bloom filters over a chunk of all the segments.
func (obs *SyncObserver) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
return obs, nil
}
// Join merges the bloom filters gathered by each Partial.
func (obs *SyncObserver) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
return nil
}
// Finish uploads the bloom filters.
func (obs *SyncObserver) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if err := obs.upload.UploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil {
return err
}
obs.log.Debug("collecting bloom filters finished")
return nil
}
// Process adds pieces to the bloom filter from remote segments.
func (obs *SyncObserver) Process(ctx context.Context, segments []rangedloop.Segment) error {
latestCreationTime := time.Time{}
for _, segment := range segments {
if segment.Inline() {
continue
}
// sanity check to detect if loop is not running against live database
if segment.CreatedAt.After(obs.startTime) {
obs.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID),
zap.Time("loop started", obs.startTime),
zap.Time("segment created", segment.CreatedAt))
return errs.New("segment created after loop started")
}
if latestCreationTime.Before(segment.CreatedAt) {
latestCreationTime = segment.CreatedAt
}
deriver := segment.RootPieceID.Deriver()
for _, piece := range segment.Pieces {
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
obs.add(piece.StorageNode, pieceID)
}
}
obs.mu.Lock()
defer obs.mu.Unlock()
if obs.latestCreationTime.Before(latestCreationTime) {
obs.latestCreationTime = latestCreationTime
}
return nil
}
// add adds a pieceID to the relevant node's RetainInfo.
func (obs *SyncObserver) add(nodeID storj.NodeID, pieceID storj.PieceID) {
obs.mu.Lock()
defer obs.mu.Unlock()
info, ok := obs.retainInfos[nodeID]
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := obs.config.InitialPieces
if pieceCounts, found := obs.lastPieceCounts[nodeID]; found {
if pieceCounts > 0 {
numPieces = pieceCounts
}
} else {
// node was not in lastPieceCounts which means it was disqalified
// and we won't generate bloom filter for it
return
}
hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, obs.config.FalsePositiveRate, 2*memory.MiB)
// limit size of bloom filter to ensure we are under the limit for RPC
filter := bloomfilter.NewExplicit(obs.seed, hashCount, tableSize)
info = &RetainInfo{
Filter: filter,
}
obs.retainInfos[nodeID] = info
}
info.Filter.Add(pieceID)
info.Count++
}