-
Notifications
You must be signed in to change notification settings - Fork 77
/
client.go
252 lines (220 loc) · 6.39 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package rpcclient
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/util"
)
const (
defaultDialTimeout = 4 * time.Second
defaultRequestTimeout = 4 * time.Second
)
// Client represents the middleman for executing JSON RPC calls
// to remote NEO RPC nodes. Client is thread-safe and can be used from
// multiple goroutines.
type Client struct {
cli *http.Client
endpoint *url.URL
ctx context.Context
// ctxCancel is a cancel function aimed to send closing signal to the users of
// ctx.
ctxCancel func()
opts Options
requestF func(*neorpc.Request) (*neorpc.Response, error)
// reader is an Invoker that has no signers and uses current state,
// it's used to implement various getters. It'll be removed eventually,
// but for now it keeps Client's API compatibility.
reader *invoker.Invoker
cacheLock sync.RWMutex
// cache stores RPC node related information the client is bound to.
// cache is mostly filled in during Init(), but can also be updated
// during regular Client lifecycle.
cache cache
latestReqID atomic.Uint64
// getNextRequestID returns an ID to be used for the subsequent request creation.
// It is defined on Client, so that our testing code can override this method
// for the sake of more predictable request IDs generation behavior.
getNextRequestID func() uint64
}
// Options defines options for the RPC client.
// All values are optional. If any duration is not specified,
// a default of 4 seconds will be used.
type Options struct {
// Cert is a client-side certificate, it doesn't work at the moment along
// with the other two options below.
Cert string
Key string
CACert string
DialTimeout time.Duration
RequestTimeout time.Duration
// Limit total number of connections per host. No limit by default.
MaxConnsPerHost int
}
// cache stores cache values for the RPC client methods.
type cache struct {
initDone bool
network netmode.Magic
stateRootInHeader bool
nativeHashes map[string]util.Uint160
}
// New returns a new Client ready to use. You should call Init method to
// initialize stateroot setting for the network the client is operating on if
// you plan using GetBlock*.
func New(ctx context.Context, endpoint string, opts Options) (*Client, error) {
cl := new(Client)
err := initClient(ctx, cl, endpoint, opts)
if err != nil {
return nil, err
}
return cl, nil
}
func initClient(ctx context.Context, cl *Client, endpoint string, opts Options) error {
url, err := url.Parse(endpoint)
if err != nil {
return err
}
if opts.DialTimeout <= 0 {
opts.DialTimeout = defaultDialTimeout
}
if opts.RequestTimeout <= 0 {
opts.RequestTimeout = defaultRequestTimeout
}
httpClient := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: opts.DialTimeout,
}).DialContext,
MaxConnsPerHost: opts.MaxConnsPerHost,
},
Timeout: opts.RequestTimeout,
}
// TODO(@antdm): Enable SSL.
// if opts.Cert != "" && opts.Key != "" {
// }
cancelCtx, cancel := context.WithCancel(ctx)
cl.ctx = cancelCtx
cl.ctxCancel = cancel
cl.cli = httpClient
cl.endpoint = url
cl.cache = cache{
nativeHashes: make(map[string]util.Uint160),
}
cl.latestReqID = atomic.Uint64{}
cl.getNextRequestID = (cl).getRequestID
cl.opts = opts
cl.requestF = cl.makeHTTPRequest
cl.reader = invoker.New(cl, nil)
return nil
}
func (c *Client) getRequestID() uint64 {
return c.latestReqID.Add(1)
}
// Init sets magic of the network client connected to, stateRootInHeader option
// and native NEO, GAS and Policy contracts scripthashes. This method should be
// called before any header- or block-related requests in order to deserialize
// responses properly.
func (c *Client) Init() error {
version, err := c.GetVersion()
if err != nil {
return fmt.Errorf("failed to get network magic: %w", err)
}
natives, err := c.GetNativeContracts()
if err != nil {
return fmt.Errorf("failed to get native contracts: %w", err)
}
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.cache.network = version.Protocol.Network
c.cache.stateRootInHeader = version.Protocol.StateRootInHeader
for _, ctr := range natives {
c.cache.nativeHashes[ctr.Manifest.Name] = ctr.Hash
}
c.cache.initDone = true
return nil
}
// Close closes unused underlying networks connections.
func (c *Client) Close() {
c.ctxCancel()
c.cli.CloseIdleConnections()
}
func (c *Client) performRequest(method string, p []any, v any) error {
if p == nil {
p = []any{} // neo-project/neo-modules#742
}
var r = neorpc.Request{
JSONRPC: neorpc.JSONRPCVersion,
Method: method,
Params: p,
ID: c.getNextRequestID(),
}
raw, err := c.requestF(&r)
if raw != nil && raw.Error != nil {
return raw.Error
} else if err != nil {
return err
} else if raw == nil || raw.Result == nil {
return errors.New("no result returned")
}
return json.Unmarshal(raw.Result, v)
}
func (c *Client) makeHTTPRequest(r *neorpc.Request) (*neorpc.Response, error) {
var (
buf = new(bytes.Buffer)
raw = new(neorpc.Response)
)
if err := json.NewEncoder(buf).Encode(r); err != nil {
return nil, err
}
req, err := http.NewRequest("POST", c.endpoint.String(), buf)
if err != nil {
return nil, err
}
resp, err := c.cli.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// The node might send us a proper JSON anyway, so look there first and if
// it parses, it has more relevant data than HTTP error code.
err = json.NewDecoder(resp.Body).Decode(raw)
if err != nil {
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("HTTP %d/%s", resp.StatusCode, http.StatusText(resp.StatusCode))
} else {
err = fmt.Errorf("JSON decoding: %w", err)
}
}
if err != nil {
return nil, err
}
return raw, nil
}
// Ping attempts to create a connection to the endpoint
// and returns an error if there is any.
func (c *Client) Ping() error {
conn, err := net.DialTimeout("tcp", c.endpoint.Host, defaultDialTimeout)
if err != nil {
return err
}
_ = conn.Close()
return nil
}
// Context returns client instance context.
func (c *Client) Context() context.Context {
return c.ctx
}
// Endpoint returns the client endpoint.
func (c *Client) Endpoint() string {
return c.endpoint.String()
}