-
Notifications
You must be signed in to change notification settings - Fork 210
/
sync.go
80 lines (71 loc) · 1.6 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package validation
import (
"context"
"fmt"
"time"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"golang.org/x/sync/errgroup"
"github.com/spacemeshos/go-spacemesh/systest/cluster"
)
// Periodic runs validation once in a period, starting immediately.
func Periodic(ctx context.Context, period time.Duration, f Validation) error {
if err := f(ctx); err != nil {
return err
}
ticker := time.NewTicker(period)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := f(ctx); err != nil {
return err
}
}
}
}
type Validation func(context.Context) error
func isSynced(ctx context.Context, node *cluster.NodeClient) bool {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
svc := pb.NewNodeServiceClient(node.PubConn())
resp, err := svc.Status(ctx, &pb.StatusRequest{})
if err != nil {
return false
}
return resp.Status.IsSynced
}
func Sync(c *cluster.Cluster, tolerate int) Validation {
sv := &SyncValidation{
failures: make([]int, c.Total()),
tolerate: tolerate,
}
return func(ctx context.Context) error {
var eg errgroup.Group
for i := range c.Total() {
node := c.Client(i)
eg.Go(func() error {
return sv.OnData(i, isSynced(ctx, node))
})
}
return eg.Wait()
}
}
type SyncValidation struct {
failures []int
tolerate int
}
func (s *SyncValidation) OnData(id int, synced bool) error {
if !synced {
s.failures[id]++
if rst := s.failures[id]; rst > s.tolerate {
return fmt.Errorf("node %d not synced in %d periods",
id, rst,
)
}
} else {
s.failures[id] = 0
}
return nil
}