-
Notifications
You must be signed in to change notification settings - Fork 0
/
hub.go
136 lines (113 loc) · 2.8 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package udp
import (
"context"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
)
// Payload represents a single UDP payload.
type Payload struct {
Content *buf.Buffer
Source net.Destination
OriginalDestination net.Destination
}
type HubOption func(h *Hub)
func HubCapacity(capacity int) HubOption {
return func(h *Hub) {
h.capacity = capacity
}
}
func HubReceiveOriginalDestination(r bool) HubOption {
return func(h *Hub) {
h.recvOrigDest = r
}
}
type Hub struct {
conn *net.UDPConn
cache chan *Payload
capacity int
recvOrigDest bool
}
func ListenUDP(ctx context.Context, address net.Address, port net.Port, options ...HubOption) (*Hub, error) {
hub := &Hub{
capacity: 256,
recvOrigDest: false,
}
for _, opt := range options {
opt(hub)
}
streamSettings := internet.StreamSettingsFromContext(ctx)
if streamSettings != nil && streamSettings.SocketSettings != nil && streamSettings.SocketSettings.ReceiveOriginalDestAddress {
hub.recvOrigDest = true
}
udpConn, err := internet.ListenSystemPacket(ctx, &net.UDPAddr{
IP: address.IP(),
Port: int(port),
})
if err != nil {
return nil, err
}
newError("listening UDP on ", address, ":", port).WriteToLog()
hub.conn = udpConn.(*net.UDPConn)
hub.cache = make(chan *Payload, hub.capacity)
go hub.start()
return hub, nil
}
// Close implements net.Listener.
func (h *Hub) Close() error {
h.conn.Close()
return nil
}
func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) {
return h.conn.WriteToUDP(payload, &net.UDPAddr{
IP: dest.Address.IP(),
Port: int(dest.Port),
})
}
func (h *Hub) start() {
c := h.cache
defer close(c)
oobBytes := make([]byte, 256)
for {
buffer := buf.New()
var noob int
var addr *net.UDPAddr
rawBytes := buffer.Extend(buf.Size)
n, noob, _, addr, err := ReadUDPMsg(h.conn, rawBytes, oobBytes)
if err != nil {
newError("failed to read UDP msg").Base(err).WriteToLog()
buffer.Release()
break
}
buffer.Resize(0, int32(n))
if buffer.IsEmpty() {
buffer.Release()
continue
}
payload := &Payload{
Content: buffer,
Source: net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)),
}
if h.recvOrigDest && noob > 0 {
payload.OriginalDestination = RetrieveOriginalDest(oobBytes[:noob])
if payload.OriginalDestination.IsValid() {
newError("UDP original destination: ", payload.OriginalDestination).AtDebug().WriteToLog()
} else {
newError("failed to read UDP original destination").WriteToLog()
}
}
select {
case c <- payload:
default:
buffer.Release()
payload.Content = nil
}
}
}
// Addr implements net.Listener.
func (h *Hub) Addr() net.Addr {
return h.conn.LocalAddr()
}
func (h *Hub) Receive() <-chan *Payload {
return h.cache
}