forked from v2ray/v2ray-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
128 lines (109 loc) · 3.47 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package socks
import (
"context"
"time"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/retry"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/ray"
)
// Client is a Socks5 client.
type Client struct {
serverPicker protocol.ServerPicker
}
// NewClient create a new Socks5 client based on the given config.
func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
serverList := protocol.NewServerList()
for _, rec := range config.Server {
serverList.AddServer(protocol.NewServerSpecFromPB(*rec))
}
if serverList.Size() == 0 {
return nil, newError("0 target server")
}
return &Client{
serverPicker: protocol.NewRoundRobinServerPicker(serverList),
}, nil
}
// Process implements proxy.Outbound.Process.
func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.Dialer) error {
destination, ok := proxy.TargetFromContext(ctx)
if !ok {
return newError("target not specified.")
}
var server *protocol.ServerSpec
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
server = c.serverPicker.PickServer()
dest := server.Destination()
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn
return nil
})
if err != nil {
return newError("failed to find an available destination").Base(err)
}
defer conn.Close()
request := &protocol.RequestHeader{
Version: socks5Version,
Command: protocol.RequestCommandTCP,
Address: destination.Address,
Port: destination.Port,
}
if destination.Network == net.Network_UDP {
request.Command = protocol.RequestCommandUDP
}
user := server.PickUser()
if user != nil {
request.User = user
}
udpRequest, err := ClientHandshake(request, conn, conn)
if err != nil {
return newError("failed to establish connection to server").AtWarning().Base(err)
}
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*5)
var requestFunc func() error
var responseFunc func() error
if request.Command == protocol.RequestCommandTCP {
requestFunc = func() error {
return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer))
}
responseFunc = func() error {
defer ray.OutboundOutput().Close()
return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer))
}
} else if request.Command == protocol.RequestCommandUDP {
udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
if err != nil {
return newError("failed to create UDP connection").Base(err)
}
defer udpConn.Close()
requestFunc = func() error {
return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer))
}
responseFunc = func() error {
defer ray.OutboundOutput().Close()
reader := &UDPReader{reader: udpConn}
return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer))
}
}
requestDone := signal.ExecuteAsync(requestFunc)
responseDone := signal.ExecuteAsync(responseFunc)
if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
return newError("connection ends").Base(err)
}
return nil
}
func init() {
common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return NewClient(ctx, config.(*ClientConfig))
}))
}