-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
hub.go
132 lines (111 loc) · 2.72 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package udp
import (
"context"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol/udp"
"github.com/xtls/xray-core/transport/internet"
)
type HubOption func(h *Hub)
func HubCapacity(capacity int) HubOption {
return func(h *Hub) {
h.capacity = capacity
}
}
func HubReceiveOriginalDestination(r bool) HubOption {
return func(h *Hub) {
h.recvOrigDest = r
}
}
type Hub struct {
conn *net.UDPConn
cache chan *udp.Packet
capacity int
recvOrigDest bool
}
func ListenUDP(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, options ...HubOption) (*Hub, error) {
hub := &Hub{
capacity: 256,
recvOrigDest: false,
}
for _, opt := range options {
opt(hub)
}
var sockopt *internet.SocketConfig
if streamSettings != nil {
sockopt = streamSettings.SocketSettings
}
if sockopt != nil && sockopt.ReceiveOriginalDestAddress {
hub.recvOrigDest = true
}
udpConn, err := internet.ListenSystemPacket(ctx, &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, sockopt)
if err != nil {
return nil, err
}
newError("listening UDP on ", address, ":", port).WriteToLog()
hub.conn = udpConn.(*net.UDPConn)
hub.cache = make(chan *udp.Packet, hub.capacity)
go hub.start()
return hub, nil
}
// Close implements net.Listener.
func (h *Hub) Close() error {
h.conn.Close()
return nil
}
func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) {
return h.conn.WriteToUDP(payload, &net.UDPAddr{
IP: dest.Address.IP(),
Port: int(dest.Port),
})
}
func (h *Hub) start() {
c := h.cache
defer close(c)
oobBytes := make([]byte, 256)
for {
buffer := buf.New()
var noob int
var addr *net.UDPAddr
rawBytes := buffer.Extend(buf.Size)
n, noob, _, addr, err := ReadUDPMsg(h.conn, rawBytes, oobBytes)
if err != nil {
newError("failed to read UDP msg").Base(err).WriteToLog()
buffer.Release()
break
}
buffer.Resize(0, int32(n))
if buffer.IsEmpty() {
buffer.Release()
continue
}
payload := &udp.Packet{
Payload: buffer,
Source: net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)),
}
if h.recvOrigDest && noob > 0 {
payload.Target = RetrieveOriginalDest(oobBytes[:noob])
if payload.Target.IsValid() {
newError("UDP original destination: ", payload.Target).AtDebug().WriteToLog()
} else {
newError("failed to read UDP original destination").WriteToLog()
}
}
select {
case c <- payload:
default:
buffer.Release()
payload.Payload = nil
}
}
}
// Addr implements net.Listener.
func (h *Hub) Addr() net.Addr {
return h.conn.LocalAddr()
}
func (h *Hub) Receive() <-chan *udp.Packet {
return h.cache
}