-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
udp.go
129 lines (109 loc) · 2.41 KB
/
udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package portforward
import (
"net"
"sync"
"github.com/golang/glog"
)
const bufSize = 1500
func (p *portForwarder) startForwardingUDP(address *net.IPAddr, port forwardedPort) error {
listener, err := net.ListenUDP(
"udp",
&net.UDPAddr{
IP: address.IP,
Zone: address.Zone,
Port: port.local,
},
)
if err != nil {
return err
}
proxy := udpProxy{
listener: listener,
remoteDialer: func() (net.Conn, error) {
glog.Infof("opening new udp tunnel to %d", port.remote)
stream, err := p.resource.PortForward(p.name, port.remote, port.protocol)
if err != nil {
glog.Errorf("can't access %s/%s.%s: %v", p.kind, p.name, p.namespace, err)
return nil, err
}
return stream.AsConn(), nil
},
clients: make(map[string]*udpProxyConn),
}
go proxy.Run()
return nil
}
type udpProxy struct {
listener *net.UDPConn
remoteDialer func() (net.Conn, error)
sync.Mutex
clients map[string]*udpProxyConn
}
func (p *udpProxy) Run() {
buf := make([]byte, bufSize)
for {
if err := p.handleRead(buf); err != nil {
glog.Errorln(err)
}
}
}
func (p *udpProxy) handleRead(buf []byte) error {
n, clientAddr, err := p.listener.ReadFromUDP(buf[0:])
if err != nil {
return err
}
clientID := clientAddr.String()
p.Lock()
defer p.Unlock()
client, isKnownClient := p.clients[clientID]
if !isKnownClient {
remoteConn, err := p.remoteDialer()
if err != nil {
return err
}
client = &udpProxyConn{
localConn: p.listener,
clientAddr: clientAddr,
remoteConn: remoteConn,
close: make(chan struct{}),
}
p.clients[clientID] = client
go client.handleRemoteReads()
go p.cleanupClient(clientID, client)
}
_, err = client.remoteConn.Write(buf[0:n])
return err
}
func (p *udpProxy) cleanupClient(clientID string, client *udpProxyConn) {
<-client.close
p.Lock()
defer p.Unlock()
delete(p.clients, clientID)
}
type udpProxyConn struct {
localConn *net.UDPConn
clientAddr *net.UDPAddr
remoteConn net.Conn
close chan struct{}
}
func (c *udpProxyConn) handleRemoteReads() {
defer close(c.close)
buf := make([]byte, bufSize)
for {
if err := c.handleRemoteRead(buf); err != nil {
glog.Errorf("closing client: %v\n", err)
return
}
}
}
func (c *udpProxyConn) handleRemoteRead(buf []byte) error {
n, err := c.remoteConn.Read(buf[0:])
if err != nil {
return err
}
_, err = c.localConn.WriteToUDP(buf[0:n], c.clientAddr)
if err != nil {
return err
}
return nil
}