forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
net.go
105 lines (92 loc) · 2.41 KB
/
net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package dialchain
import (
"fmt"
"net"
"strconv"
"time"
"github.com/elastic/beats/heartbeat/look"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/transport"
)
// TCPDialer creates a new NetDialer with constant event fields and default
// connection timeout.
// The fields parameter holds additional constants to be added to the final
// event structure.
//
// The dialer will update the active events with:
//
// {
// "tcp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
func TCPDialer(to time.Duration) NetDialer {
return netDialer(to)
}
// UDPDialer creates a new NetDialer with constant event fields and default
// connection timeout.
// The fields parameter holds additional constants to be added to the final
// event structure.
//
// The dialer will update the active events with:
//
// {
// "udp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
func UDPDialer(to time.Duration) NetDialer {
return netDialer(to)
}
func netDialer(timeout time.Duration) NetDialer {
return func(event common.MapStr) (transport.Dialer, error) {
return makeDialer(func(network, address string) (net.Conn, error) {
namespace := ""
switch network {
case "tcp", "tcp4", "tcp6":
namespace = "tcp"
case "udp", "udp4", "udp6":
namespace = "udp"
default:
return nil, fmt.Errorf("unsupported network type %v", network)
}
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}
portNum, err := strconv.Atoi(port)
if err != nil || portNum < 0 || portNum > (1<<16) {
return nil, fmt.Errorf("invalid port number '%v' used", port)
}
event.DeepUpdate(common.MapStr{
namespace: common.MapStr{
"port": uint16(portNum),
},
})
addresses, err := net.LookupHost(host)
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
return nil, err
}
// dial via host IP by randomized iteration of known IPs
dialer := &net.Dialer{Timeout: timeout}
start := time.Now()
conn, err := transport.DialWith(dialer, network, host, addresses, port)
if err != nil {
return nil, err
}
end := time.Now()
event.DeepUpdate(common.MapStr{
namespace: common.MapStr{
"rtt": common.MapStr{
"connect": look.RTT(end.Sub(start)),
},
},
})
return conn, nil
}), nil
}
}