-
Notifications
You must be signed in to change notification settings - Fork 246
/
rtt.go
98 lines (83 loc) · 2.19 KB
/
rtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package rtt
import (
"context"
"sync"
"time"
errors "github.com/pkg/errors"
tcp "github.com/status-im/tcp-shaker"
)
type Result struct {
Addr string
RTTMs int
Err error
}
// timeoutError indicates an error due to TCP connection timeout.
// tcp-shaker returns an error implementing this interface in such a case.
type timeoutError interface {
Timeout() bool
}
func runCheck(c *tcp.Checker, address string, timeout time.Duration) Result {
// mesaure RTT
start := time.Now()
// TCP Ping
err := c.CheckAddr(address, timeout)
// measure RTT
elapsed := time.Since(start)
latency := int(elapsed.Nanoseconds() / 1e6)
if err != nil { // don't confuse users with valid latency values on error
latency = -1
switch err.(type) {
case timeoutError:
err = errors.Wrap(err, "tcp check timeout")
case tcp.ErrConnect:
err = errors.Wrap(err, "unable to connect")
}
}
return Result{
Addr: address,
RTTMs: latency,
Err: err,
}
}
func waitForResults(errCh <-chan error, resCh <-chan Result) (results []Result, err error) {
for {
select {
case err = <-errCh:
return nil, err
case res, ok := <-resCh:
if !ok {
return
}
results = append(results, res)
}
}
}
func CheckHosts(addresses []string, timeout time.Duration) ([]Result, error) {
c := tcp.NewChecker()
// channel for receiving possible checking loop failure
errCh := make(chan error, 1)
// stop the checking loop when function exists
ctx, stopChecker := context.WithCancel(context.Background())
defer stopChecker()
// loop that queries Epoll and pipes events to CheckAddr() calls
go func() {
errCh <- c.CheckingLoop(ctx)
}()
// wait for CheckingLoop to prepare the epoll/kqueue
<-c.WaitReady()
// channel for returning results from concurrent checks
resCh := make(chan Result, len(addresses))
var wg sync.WaitGroup
for i := 0; i < len(addresses); i++ {
wg.Add(1)
go func(address string, resCh chan<- Result) {
defer wg.Done()
resCh <- runCheck(c, address, timeout)
}(addresses[i], resCh)
}
// wait for all the routines to finish before closing results channel
wg.Wait()
close(resCh)
// wait for the results for all addresses or a checking loop error
return waitForResults(errCh, resCh)
}