-
Notifications
You must be signed in to change notification settings - Fork 397
/
ranged_loop.go
158 lines (126 loc) · 4.43 KB
/
ranged_loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package nodetally
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
var (
// check if Observer and Partial interfaces are satisfied.
_ rangedloop.Observer = (*RangedLoopObserver)(nil)
_ rangedloop.Partial = (*RangedLoopPartial)(nil)
)
// RangedLoopObserver implements node tally ranged loop observer.
type RangedLoopObserver struct {
log *zap.Logger
accounting accounting.StoragenodeAccounting
nowFn func() time.Time
lastTallyTime time.Time
Node map[storj.NodeID]float64
}
// NewRangedLoopObserver creates new RangedLoopObserver.
func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting) *RangedLoopObserver {
return &RangedLoopObserver{
log: log,
accounting: accounting,
nowFn: time.Now,
Node: map[storj.NodeID]float64{},
}
}
// Start implements ranged loop observer start method.
func (observer *RangedLoopObserver) Start(ctx context.Context, time time.Time) (err error) {
observer.Node = map[storj.NodeID]float64{}
observer.lastTallyTime, err = observer.accounting.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return err
}
if observer.lastTallyTime.IsZero() {
observer.lastTallyTime = observer.nowFn()
}
return nil
}
// Fork forks new node tally ranged loop partial.
func (observer *RangedLoopObserver) Fork(ctx context.Context) (rangedloop.Partial, error) {
return NewRangedLoopPartial(observer.log, observer.nowFn), nil
}
// Join joins node tally ranged loop partial to main observer updating main per node usage map.
func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop.Partial) error {
tallyPartial, ok := partial.(*RangedLoopPartial)
if !ok {
return Error.New("expected partial type %T but got %T", tallyPartial, partial)
}
for nodeID, val := range tallyPartial.Node {
observer.Node[nodeID] += val
}
return nil
}
// for backwards compatibility.
var monRangedTally = monkit.ScopeNamed("storj.io/storj/satellite/accounting/tally")
// Finish calculates byte*hours from per node storage usage and save tallies to DB.
func (observer *RangedLoopObserver) Finish(ctx context.Context) error {
finishTime := observer.nowFn()
// calculate byte hours, not just bytes
hours := finishTime.Sub(observer.lastTallyTime).Hours()
byteHours := make(map[storj.NodeID]float64)
var totalSum float64
for id, pieceSize := range observer.Node {
totalSum += pieceSize
byteHours[id] = pieceSize * hours
}
monRangedTally.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
err := observer.accounting.SaveTallies(ctx, finishTime, byteHours)
if err != nil {
return Error.New("StorageNodeAccounting.SaveTallies failed: %v", err)
}
return nil
}
// SetNow overrides the timestamp used to store the result.
func (observer *RangedLoopObserver) SetNow(nowFn func() time.Time) {
observer.nowFn = nowFn
}
// RangedLoopPartial implements node tally ranged loop partial.
type RangedLoopPartial struct {
log *zap.Logger
nowFn func() time.Time
Node map[storj.NodeID]float64
}
// NewRangedLoopPartial creates new node tally ranged loop partial.
func NewRangedLoopPartial(log *zap.Logger, nowFn func() time.Time) *RangedLoopPartial {
return &RangedLoopPartial{
log: log,
nowFn: nowFn,
Node: map[storj.NodeID]float64{},
}
}
// Process iterates over segment range updating partial node usage map.
func (partial *RangedLoopPartial) Process(ctx context.Context, segments []segmentloop.Segment) error {
now := partial.nowFn()
for _, segment := range segments {
partial.processSegment(now, segment)
}
return nil
}
func (partial *RangedLoopPartial) processSegment(now time.Time, segment segmentloop.Segment) {
if segment.Inline() {
return
}
if segment.Expired(now) {
return
}
// add node info
minimumRequired := segment.Redundancy.RequiredShares
if minimumRequired <= 0 {
partial.log.Error("failed sanity check", zap.String("StreamID", segment.StreamID.String()), zap.Uint64("Position", segment.Position.Encode()))
return
}
pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme
for _, piece := range segment.Pieces {
partial.Node[piece.StorageNode] += pieceSize
}
}