-
Notifications
You must be signed in to change notification settings - Fork 6
/
relayerproxy.go
419 lines (357 loc) · 13.5 KB
/
relayerproxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
package testproxy
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"testing"
"cosmossdk.io/depinject"
ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1"
ringtypes "github.com/athanorlabs/go-dleq/types"
keyringtypes "github.com/cosmos/cosmos-sdk/crypto/keyring"
secp256k1 "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
"github.com/cosmos/cosmos-sdk/types/bech32"
"github.com/noot/ring-go"
"github.com/stretchr/testify/require"
"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/relayer/config"
"github.com/pokt-network/poktroll/pkg/signer"
"github.com/pokt-network/poktroll/testutil/testclient/testblock"
"github.com/pokt-network/poktroll/testutil/testclient/testdelegation"
testkeyring "github.com/pokt-network/poktroll/testutil/testclient/testkeyring"
"github.com/pokt-network/poktroll/testutil/testclient/testqueryclients"
testrings "github.com/pokt-network/poktroll/testutil/testcrypto/rings"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)
// TestBehavior is a struct that holds the test context and mocks
// for the relayer proxy tests.
// It is used to provide the context needed by the instrumentation functions
// in order to isolate specific execution paths of the subject under test.
type TestBehavior struct {
ctx context.Context
t *testing.T
// Deps is exported so it can be used by the dependency injection framework
// from the pkg/relayer/proxy/proxy_test.go
Deps depinject.Config
// proxyServersMap is a map from ServiceId to the actual Server that handles
// processing of incoming RPC requests.
proxyServersMap map[string]*http.Server
}
// blockHeight is the default block height used in the tests.
const blockHeight = 1
// blockHashBz is the []byte representation of the block hash used in the tests.
var blockHashBz []byte
func init() {
var err error
if blockHashBz, err = hex.DecodeString("1B1051B7BF236FEA13EFA65B6BE678514FA5B6EA0AE9A7A4B68D45F95E4F18E0"); err != nil {
panic(fmt.Errorf("error while trying to decode block hash: %w", err))
}
}
// NewRelayerProxyTestBehavior creates a TestBehavior with the provided set of
// behavior function that are used to instrument the tested subject's dependencies
// and isolate specific execution pathways.
func NewRelayerProxyTestBehavior(
ctx context.Context,
t *testing.T,
behaviors ...func(*TestBehavior),
) *TestBehavior {
test := &TestBehavior{
ctx: ctx,
t: t,
proxyServersMap: make(map[string]*http.Server),
}
for _, behavior := range behaviors {
behavior(test)
}
return test
}
// WithRelayerProxyDependenciesForBlockHeight creates the dependencies for the relayer proxy
// from the TestBehavior.mocks so they have the right interface and can be
// used by the dependency injection framework.
// blockHeight being the block height that will be returned by the block client's
// LastNBlock method
func WithRelayerProxyDependenciesForBlockHeight(
keyName string,
blockHeight int64,
) func(*TestBehavior) {
return func(test *TestBehavior) {
logger := polylog.Ctx(test.ctx)
accountQueryClient := testqueryclients.NewTestAccountQueryClient(test.t)
applicationQueryClient := testqueryclients.NewTestApplicationQueryClient(test.t)
sessionQueryClient := testqueryclients.NewTestSessionQueryClient(test.t)
supplierQueryClient := testqueryclients.NewTestSupplierQueryClient(test.t)
blockClient := testblock.NewAnyTimeLastBlockBlockClient(test.t, []byte{}, blockHeight)
keyring, _ := testkeyring.NewTestKeyringWithKey(test.t, keyName)
redelegationObs, _ := channel.NewReplayObservable[client.Redelegation](test.ctx, 1)
delegationClient := testdelegation.NewAnyTimesRedelegationsSequence(test.ctx, test.t, "", redelegationObs)
ringCache := testrings.NewRingCacheWithMockDependencies(test.ctx, test.t, accountQueryClient, applicationQueryClient, delegationClient)
deps := depinject.Supply(
logger,
accountQueryClient,
ringCache,
blockClient,
sessionQueryClient,
supplierQueryClient,
keyring,
)
test.Deps = deps
}
}
// WithServicesConfigMap creates the services that the relayer proxy will
// proxy requests to.
// It creates an HTTP server for each service and starts listening on the
// provided host.
func WithServicesConfigMap(
servicesConfigMap map[string]*config.RelayMinerServerConfig,
) func(*TestBehavior) {
return func(test *TestBehavior) {
for _, serviceConfig := range servicesConfigMap {
for serviceId, supplierConfig := range serviceConfig.SupplierConfigsMap {
server := &http.Server{Addr: supplierConfig.ServiceConfig.BackendUrl.Host}
server.Handler = http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write(prepareJsonRPCResponsePayload())
})
go func() { server.ListenAndServe() }()
go func() {
<-test.ctx.Done()
server.Shutdown(test.ctx)
}()
test.proxyServersMap[serviceId] = server
}
}
}
}
// WithDefaultSupplier creates the default staked supplier for the test
func WithDefaultSupplier(
supplierKeyName string,
supplierEndpoints map[string][]*sharedtypes.SupplierEndpoint,
) func(*TestBehavior) {
return func(test *TestBehavior) {
var keyring keyringtypes.Keyring
err := depinject.Inject(test.Deps, &keyring)
require.NoError(test.t, err)
supplierAccount, err := keyring.Key(supplierKeyName)
require.NoError(test.t, err)
supplierAccAddress, err := supplierAccount.GetAddress()
require.NoError(test.t, err)
supplierAddress := supplierAccAddress.String()
for serviceId, endpoints := range supplierEndpoints {
testqueryclients.AddSuppliersWithServiceEndpoints(
test.t,
supplierAddress,
serviceId,
endpoints,
)
}
}
}
// WithDefaultApplication creates the default staked application actor for the test
func WithDefaultApplication(appPrivateKey *secp256k1.PrivKey) func(*TestBehavior) {
return func(test *TestBehavior) {
appPubKey := appPrivateKey.PubKey()
appAddress := GetAddressFromPrivateKey(test, appPrivateKey)
delegateeAccounts := map[string]cryptotypes.PubKey{}
testqueryclients.AddAddressToApplicationMap(
test.t,
appAddress,
appPubKey,
delegateeAccounts,
)
}
}
// WithDefaultSessionSupplier adds the default staked supplier to the
// application's current session
// If the supplierKeyName is empty, the supplier will not be staked so we can
// test the case where the supplier is not in the application's session's supplier list.
func WithDefaultSessionSupplier(
supplierKeyName string,
serviceId string,
appPrivateKey *secp256k1.PrivKey,
) func(*TestBehavior) {
return func(test *TestBehavior) {
if supplierKeyName == "" {
return
}
appAddress := GetAddressFromPrivateKey(test, appPrivateKey)
sessionSuppliers := []string{}
var keyring keyringtypes.Keyring
err := depinject.Inject(test.Deps, &keyring)
require.NoError(test.t, err)
supplierAccount, err := keyring.Key(supplierKeyName)
require.NoError(test.t, err)
supplierAccAddress, err := supplierAccount.GetAddress()
require.NoError(test.t, err)
supplierAddress := supplierAccAddress.String()
sessionSuppliers = append(sessionSuppliers, supplierAddress)
testqueryclients.AddToExistingSessions(
test.t,
appAddress,
serviceId,
blockHeight,
sessionSuppliers,
)
}
}
// WithSuccessiveSessions creates sessions with SessionNumber 0 through SessionCount -1
// and adds all of them to the sessionMap.
// Each session is configured for the same serviceId and application provided.
func WithSuccessiveSessions(
supplierKeyName string,
serviceId string,
appPrivateKey *secp256k1.PrivKey,
sessionsCount int,
) func(*TestBehavior) {
return func(test *TestBehavior) {
appAddress := GetAddressFromPrivateKey(test, appPrivateKey)
sessionSuppliers := []string{}
var keyring keyringtypes.Keyring
err := depinject.Inject(test.Deps, &keyring)
require.NoError(test.t, err)
supplierAccount, err := keyring.Key(supplierKeyName)
require.NoError(test.t, err)
supplierAccAddress, err := supplierAccount.GetAddress()
require.NoError(test.t, err)
supplierAddress := supplierAccAddress.String()
sessionSuppliers = append(sessionSuppliers, supplierAddress)
// Adding `sessionCount` sessions to the sessionsMap to make them available
// to the MockSessionQueryClient.
for i := 0; i < sessionsCount; i++ {
testqueryclients.AddToExistingSessions(
test.t,
appAddress,
serviceId,
sessionkeeper.NumBlocksPerSession*int64(i),
sessionSuppliers,
)
}
}
}
// TODO_TECHDEBT(@red-0ne): This function only supports JSON-RPC requests and
// needs to have its http.Request "Content-Type" header passed-in as a parameter
// and take out the GetRelayResponseError function which parses JSON-RPC responses
// to make it RPC-type agnostic.
// MarshalAndSend marshals the request and sends it to the provided service.
func MarshalAndSend(
test *TestBehavior,
servicesConfigMap map[string]*config.RelayMinerServerConfig,
serviceEndpoint string,
serviceId string,
request *servicetypes.RelayRequest,
) (errCode int32, errorMessage string) {
var scheme string
switch servicesConfigMap[serviceEndpoint].ServerType {
case config.RelayMinerServerTypeHTTP:
scheme = "http"
default:
require.FailNow(test.t, "unsupported server type")
}
// originHost is the endpoint that the client will retrieve from the on-chain supplier record.
// The supplier may have multiple endpoints (e.g. for load geo-balancing, host failover, etc.).
// In the current test setup, we only have one endpoint per supplier, which is why we are accessing `[0]`.
// In a real-world scenario, the publicly exposed endpoint would reach a load balancer
// or a reverse proxy that would route the request to the address specified by ListenAddress.
originHost := servicesConfigMap[serviceEndpoint].SupplierConfigsMap[serviceId].PubliclyExposedEndpoints[0]
reqBz, err := request.Marshal()
require.NoError(test.t, err)
reader := io.NopCloser(bytes.NewReader(reqBz))
req := &http.Request{
Method: http.MethodPost,
Header: http.Header{
"Content-Type": []string{"application/json"},
},
URL: &url.URL{Scheme: scheme, Host: servicesConfigMap[serviceEndpoint].ListenAddress},
Host: originHost,
Body: reader,
}
res, err := http.DefaultClient.Do(req)
require.NoError(test.t, err)
require.NotNil(test.t, res)
return GetRelayResponseError(test.t, res)
}
// GetRelayResponseError returns the error code and message from the relay response.
// If the response is not an error, it returns `0, ""`.
func GetRelayResponseError(t *testing.T, res *http.Response) (errCode int32, errMsg string) {
responseBody, err := io.ReadAll(res.Body)
require.NoError(t, err)
relayResponse := &servicetypes.RelayResponse{}
err = relayResponse.Unmarshal(responseBody)
if err != nil {
return 0, "cannot unmarshal request body"
}
var payload JSONRpcErrorReply
err = json.Unmarshal(relayResponse.Payload, &payload)
if err != nil {
return 0, "cannot unmarshal request payload"
}
if payload.Error == nil {
return 0, ""
}
return payload.Error.Code, payload.Error.Message
}
// GetApplicationRingSignature crafts a ring signer for test purposes and uses
// it to sign the relay request
func GetApplicationRingSignature(
t *testing.T,
req *servicetypes.RelayRequest,
appPrivateKey *secp256k1.PrivKey,
) []byte {
publicKey := appPrivateKey.PubKey()
curve := ring_secp256k1.NewCurve()
point, err := curve.DecodeToPoint(publicKey.Bytes())
require.NoError(t, err)
// At least two points are required to create a ring signer so we are reusing
// the same key for it
points := []ringtypes.Point{point, point}
pointsRing, err := ring.NewFixedKeyRingFromPublicKeys(curve, points)
require.NoError(t, err)
scalar, err := curve.DecodeToScalar(appPrivateKey.Bytes())
require.NoError(t, err)
signer := signer.NewRingSigner(pointsRing, scalar)
signableBz, err := req.GetSignableBytesHash()
require.NoError(t, err)
signature, err := signer.Sign(signableBz)
require.NoError(t, err)
return signature
}
// GetAddressFromPrivateKey returns the address of the provided private key
func GetAddressFromPrivateKey(test *TestBehavior, privKey *secp256k1.PrivKey) string {
addressBz := privKey.PubKey().Address()
address, err := bech32.ConvertAndEncode("pokt", addressBz)
require.NoError(test.t, err)
return address
}
// GenerateRelayRequest generates a relay request with the provided parameters
func GenerateRelayRequest(
test *TestBehavior,
privKey *secp256k1.PrivKey,
serviceId string,
blockHeight int64,
payload []byte,
) *servicetypes.RelayRequest {
appAddress := GetAddressFromPrivateKey(test, privKey)
sessionId, _ := sessionkeeper.GetSessionId(appAddress, serviceId, blockHashBz, blockHeight)
return &servicetypes.RelayRequest{
Meta: servicetypes.RelayRequestMetadata{
SessionHeader: &sessiontypes.SessionHeader{
ApplicationAddress: appAddress,
SessionId: string(sessionId[:]),
Service: &sharedtypes.Service{Id: serviceId},
SessionStartBlockHeight: sessionkeeper.GetSessionStartBlockHeight(blockHeight),
SessionEndBlockHeight: sessionkeeper.GetSessionEndBlockHeight(blockHeight),
},
// The returned relay is unsigned and must be signed elsewhere for functionality
Signature: []byte(""),
},
Payload: payload,
}
}