-
Notifications
You must be signed in to change notification settings - Fork 402
/
chore.go
151 lines (127 loc) · 3.83 KB
/
chore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Chore is the contact chore for nodes announcing themselves to their trusted satellites.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
service *Service
mu sync.Mutex
cycles map[storj.NodeID]*sync2.Cycle
started sync2.Fence
interval time.Duration
}
// NewChore creates a new contact chore.
func NewChore(log *zap.Logger, interval time.Duration, service *Service) *Chore {
return &Chore{
log: log,
service: service,
cycles: make(map[storj.NodeID]*sync2.Cycle),
interval: interval,
}
}
// Run the contact chore on a regular interval with jitter.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var group errgroup.Group
if !chore.service.initialized.Wait(ctx) {
return ctx.Err()
}
// configure the satellite ping cycles
chore.updateCycles(ctx, &group, chore.service.trust.GetSatellites(ctx))
// set up a cycle to update ping cycles on a frequent interval
refreshCycle := sync2.NewCycle(time.Minute)
refreshCycle.Start(ctx, &group, func(ctx context.Context) error {
chore.updateCycles(ctx, &group, chore.service.trust.GetSatellites(ctx))
return nil
})
defer refreshCycle.Close()
chore.started.Release()
return group.Wait()
}
func (chore *Chore) updateCycles(ctx context.Context, group *errgroup.Group, satellites []storj.NodeID) {
chore.mu.Lock()
defer chore.mu.Unlock()
trustedIDs := make(map[storj.NodeID]struct{})
for _, satellite := range satellites {
satellite := satellite // alias the loop var since it is captured below
trustedIDs[satellite] = struct{}{}
if _, ok := chore.cycles[satellite]; ok {
// Ping cycle has already been started for this satellite
continue
}
// Set up a new ping cycle for the newly trusted satellite
chore.log.Debug("Starting cycle", zap.Stringer("Satellite ID", satellite))
cycle := sync2.NewCycle(chore.interval)
chore.cycles[satellite] = cycle
cycle.Start(ctx, group, func(ctx context.Context) error {
return chore.service.pingSatellite(ctx, satellite, chore.interval)
})
}
// Stop the ping cycle for satellites that are no longer trusted
for satellite, cycle := range chore.cycles {
if _, ok := trustedIDs[satellite]; !ok {
chore.log.Debug("Stopping cycle", zap.Stringer("Satellite ID", satellite))
cycle.Close()
delete(chore.cycles, satellite)
}
}
}
// Pause stops all the cycles in the contact chore.
func (chore *Chore) Pause(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle.Pause()
}
}
// Trigger ensures that each cycle is done at least once.
// If the cycle is currently running it waits for the previous to complete and then runs.
func (chore *Chore) Trigger(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle := cycle
go func() {
cycle.Trigger()
}()
}
}
// TriggerWait ensures that each cycle is done at least once and waits for completion.
// If the cycle is currently running it waits for the previous to complete and then runs.
func (chore *Chore) TriggerWait(ctx context.Context) {
chore.started.Wait(ctx)
chore.mu.Lock()
defer chore.mu.Unlock()
var group errgroup.Group
for _, cycle := range chore.cycles {
cycle := cycle
group.Go(func() error {
cycle.TriggerWait()
return nil
})
}
_ = group.Wait() // goroutines aren't returning any errors
}
// Close stops all the cycles in the contact chore.
func (chore *Chore) Close() error {
chore.mu.Lock()
defer chore.mu.Unlock()
for _, cycle := range chore.cycles {
cycle.Close()
}
chore.cycles = nil
return nil
}