/
heartbeat.go
122 lines (94 loc) · 2.2 KB
/
heartbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package heartbeat
import (
"context"
"errors"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)
var HeartbeatEvery = 1 * time.Second
type status int
const (
unknown status = iota
alive
dead
)
type PeerStatus struct {
Peer peer.ID
Alive bool
}
type HeartbeatService struct {
done chan struct{}
ping *ping.PingService
peer peer.ID
peerStatus status
reportCh chan PeerStatus
}
func NewHeartbeat(ping *ping.PingService, p peer.ID, outCh chan PeerStatus) (*HeartbeatService, error) {
if ping == nil {
return nil, errors.New("ping service is nil")
}
hb := &HeartbeatService{
done: make(chan struct{}),
ping: ping,
peer: p,
peerStatus: unknown,
reportCh: outCh,
}
go hb.run()
return hb, nil
}
func (h *HeartbeatService) run() {
ctx, cancel := context.WithCancel(context.Background())
// fakeCh will never transmit anything, if anyone would want to receive
// from it, they would block forever.
// XXX: seems like a generic primitive and I've never seen it beeing used,
// so it is probably an incorrect usage.
fakeCh := make(chan ping.Result)
defer close(fakeCh)
resCh := h.ping.Ping(ctx, h.peer)
timer := time.NewTimer(HeartbeatEvery)
for {
select {
case <-h.done:
cancel()
close(h.done)
return
case res := <-resCh:
if res.Error != nil {
if h.peerStatus != dead {
h.peerStatus = dead
// XXX What the hell is this???
// The problem is that when join service calls Close
// the select chooses the done channel and quits the loop,
// and then it waits us to send the status on channel that
// it does not listen anymore...
// Wait... why not close heartbeats before the done ch?
h.reportCh <- PeerStatus{
Peer: h.peer,
Alive: false,
}
}
resCh = fakeCh
continue
}
if h.peerStatus != alive {
h.peerStatus = alive
h.reportCh <- PeerStatus{
Peer: h.peer,
Alive: true,
}
}
resCh = fakeCh
case <-timer.C:
cancel()
ctx, cancel = context.WithCancel(context.Background())
resCh = h.ping.Ping(ctx, h.peer)
timer.Reset(HeartbeatEvery)
}
}
}
func (h *HeartbeatService) Close() {
h.done <- struct{}{}
<-h.done
}