-
Notifications
You must be signed in to change notification settings - Fork 5
/
client.go
256 lines (221 loc) · 5.45 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package p2pclient
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio"
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
)
// Client is responsible for sending requests and receiving responses to and
// from libp2p peers. Each instance of Client communicates with a single peer
// using a single protocolID.
type Client struct {
host host.Host
ownHost bool
peerID peer.ID
protoID protocol.ID
r msgio.ReadCloser
stream network.Stream
sendLock sync.Mutex
}
// Response is returned by SendRequest and contains the response to the
// request. It is the caller's responsibility to call Response.Close() after
// reading the data, to free the message buffer.
type Response struct {
Data []byte
Err error
msgReader msgio.Reader
}
// Close frees the message buffer that holds the response data.
func (r *Response) Close() {
if r.Data != nil {
r.msgReader.ReleaseMsg(r.Data)
r.Data = nil
r.msgReader = nil
}
}
const (
// default IPNI port for libp2p client to connect to
defaultLibp2pPort = 3003
// Timeout to wait for a response after a request is sent
readMessageTimeout = 10 * time.Second
)
// ErrReadTimeout is an error that occurs when no message is read within the
// timeout period.
var ErrReadTimeout = fmt.Errorf("timed out reading response")
// New creates a new Client that communicates with a specific peer identified
// by protocolID. If host is nil, then one is created.
func New(p2pHost host.Host, peerID peer.ID, protoID protocol.ID) (*Client, error) {
// If no host was given, create one.
var ownHost bool
if p2pHost == nil {
var err error
p2pHost, err = libp2p.New()
if err != nil {
return nil, err
}
ownHost = true
}
// Start a client
return &Client{
host: p2pHost,
ownHost: ownHost,
peerID: peerID,
protoID: protoID,
}, nil
}
// Connect connects the client to the host at the location specified by
// hostname. The value of hostname is a host or host:port, where the host is a
// hostname or IP address.
func (c *Client) Connect(ctx context.Context, hostname string) error {
port := defaultLibp2pPort
var netProto string
if hostname == "" {
hostname = "127.0.0.1"
netProto = "ip4"
} else {
hostport := strings.SplitN(hostname, ":", 2)
if len(hostport) > 1 {
hostname = hostport[0]
var err error
port, err = strconv.Atoi(hostport[1])
if err != nil {
return err
}
}
// Determine if hostname is a host name or IP address.
ip := net.ParseIP(hostname)
if ip == nil {
netProto = "dns"
} else if ip.To4() != nil {
netProto = "ip4"
} else if ip.To16() != nil {
netProto = "ip6"
} else {
return fmt.Errorf("host %q does not appear to be a hostname or ip address", hostname)
}
}
maddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d", netProto, hostname, port))
if err != nil {
return err
}
return c.ConnectAddrs(ctx, maddr)
}
func (c *Client) ConnectAddrs(ctx context.Context, maddrs ...multiaddr.Multiaddr) error {
addrInfo := peer.AddrInfo{
ID: c.peerID,
Addrs: maddrs,
}
return c.host.Connect(ctx, addrInfo)
}
// Self return the peer ID of this client.
func (c *Client) Self() peer.ID {
return c.host.ID()
}
// Close resets and closes the network stream if one exists.
func (c *Client) Close() error {
c.sendLock.Lock()
defer c.sendLock.Unlock()
if c.stream != nil {
c.closeStream()
}
if c.ownHost {
return c.host.Close()
}
return nil
}
// SendRequest sends out a request and reads a response.
func (c *Client) SendRequest(ctx context.Context, msg proto.Message) *Response {
c.sendLock.Lock()
defer c.sendLock.Unlock()
err := c.sendMessage(ctx, msg)
if err != nil {
return &Response{
Err: fmt.Errorf("cannot sent request: %w", err),
}
}
rsp := c.readResponse(ctx)
if rsp.Err != nil {
c.closeStream()
}
return rsp
}
// SendMessage sends out a message.
func (c *Client) SendMessage(ctx context.Context, msg proto.Message) error {
c.sendLock.Lock()
defer c.sendLock.Unlock()
return c.sendMessage(ctx, msg)
}
func (c *Client) sendMessage(ctx context.Context, msg proto.Message) error {
if ctx.Err() != nil {
return ctx.Err()
}
err := c.prepStreamReader(ctx)
if err != nil {
return err
}
if err = writeMsg(c.stream, msg); err != nil {
c.closeStream()
return err
}
return nil
}
func (c *Client) prepStreamReader(ctx context.Context) error {
if c.stream == nil {
nstr, err := c.host.NewStream(ctx, c.peerID, c.protoID)
if err != nil {
return err
}
c.r = msgio.NewVarintReaderSize(nstr, network.MessageSizeMax)
c.stream = nstr
}
return nil
}
func (c *Client) closeStream() {
_ = c.stream.Reset()
c.stream = nil
c.r = nil
}
func (c *Client) readResponse(ctx context.Context) *Response {
rspCh := make(chan *Response, 1)
go func(r msgio.ReadCloser, rsp chan<- *Response) {
data, err := r.ReadMsg()
if err != nil {
if data != nil {
r.ReleaseMsg(data)
}
rsp <- &Response{
Err: err,
}
return
}
rsp <- &Response{
Data: data,
msgReader: r,
}
}(c.r, rspCh)
t := time.NewTimer(readMessageTimeout)
defer t.Stop()
select {
case response := <-rspCh:
return response
case <-ctx.Done():
return &Response{
Err: ctx.Err(),
}
case <-t.C:
return &Response{
Err: ErrReadTimeout,
}
}
}