/
server_builder.go
153 lines (131 loc) · 5.01 KB
/
server_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package proxy
import (
"context"
"net/url"
"time"
"golang.org/x/exp/slices"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/config"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
)
// supplierStakeWaitTime is the time to wait for the supplier to be staked before
// attempting to retrieve the supplier's on-chain record.
// This is useful for testing and development purposes, where the supplier
// may not be staked before the relay miner starts.
const supplierStakeWaitTime = 5
// BuildProvidedServices builds the advertised relay servers from the supplier's on-chain advertised services.
// It populates the relayerProxy's `advertisedRelayServers` map of servers for each service, where each server
// is responsible for listening for incoming relay requests and relaying them to the supported proxied service.
func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error {
// Get the supplier address from the keyring
supplierKey, err := rp.keyring.Key(rp.signingKeyName)
if err != nil {
return err
}
supplierAddress, err := supplierKey.GetAddress()
if err != nil {
return err
}
// Prevent the RelayMiner from stopping by waiting until its associated supplier
// is staked and its on-chain record retrieved.
supplier, err := rp.waitForSupplierToStake(ctx, supplierAddress.String())
if err != nil {
return err
}
// Check that the supplier's advertised services' endpoints are present in
// the server config and handled by a server
// Iterate over the supplier's advertised services then iterate over each
// service's endpoint
for _, service := range supplier.Services {
for _, endpoint := range service.Endpoints {
endpointUrl, err := url.Parse(endpoint.Url)
if err != nil {
return err
}
found := false
// Iterate over the server configs and check if `endpointUrl` is present
// in any of the server config's suppliers' service's PubliclyExposedEndpoints
for _, serverConfig := range rp.serverConfigs {
supplierService, ok := serverConfig.SupplierConfigsMap[service.Service.Id]
if ok && slices.Contains(supplierService.PubliclyExposedEndpoints, endpointUrl.Hostname()) {
found = true
break
}
}
if !found {
return ErrRelayerProxyServiceEndpointNotHandled.Wrapf(
"service endpoint %s not handled by the relay miner",
endpoint.Url,
)
}
}
}
rp.supplierAddress = supplier.Address
if rp.servers, err = rp.initializeProxyServers(supplier.Services); err != nil {
return err
}
return nil
}
// initializeProxyServers initializes the proxy servers for each server config.
func (rp *relayerProxy) initializeProxyServers(
supplierServices []*sharedtypes.SupplierServiceConfig,
) (proxyServerMap map[string]relayer.RelayServer, err error) {
// Build a map of serviceId -> service for the supplier's advertised services
supplierServiceMap := make(map[string]*sharedtypes.Service)
for _, service := range supplierServices {
supplierServiceMap[service.Service.Id] = service.Service
}
// Build a map of listenAddress -> RelayServer for each server defined in the config file
servers := make(map[string]relayer.RelayServer)
for _, serverConfig := range rp.serverConfigs {
rp.logger.Info().Str("server host", serverConfig.ListenAddress).Msg("starting relay proxy server")
// TODO(@h5law): Implement a switch that handles all synchronous
// RPC types in one server type and asynchronous RPC types in another
// to create the appropriate RelayServer.
// Initialize the server according to the server type defined in the config file
switch serverConfig.ServerType {
case config.RelayMinerServerTypeHTTP:
servers[serverConfig.ListenAddress] = NewSynchronousServer(
rp.logger,
serverConfig,
supplierServiceMap,
rp.servedRelaysPublishCh,
rp,
)
default:
return nil, ErrRelayerProxyUnsupportedTransportType
}
}
return servers, nil
}
// waitForSupplierToStake waits in a loop until it gets the on-chain supplier's
// information back.
// This is useful for testing and development purposes, in production the supplier
// is most likely staked before the relay miner starts.
func (rp *relayerProxy) waitForSupplierToStake(
ctx context.Context,
supplierAddress string,
) (supplier sharedtypes.Supplier, err error) {
for {
// Get the supplier's on-chain record
supplier, err = rp.supplierQuerier.GetSupplier(ctx, supplierAddress)
// If the supplier is not found, wait for the supplier to be staked.
if err != nil && suppliertypes.ErrSupplierNotFound.Is(err) {
rp.logger.Info().Msgf(
"Waiting %d seconds for the supplier with address %s to stake",
supplierStakeWaitTime,
supplierAddress,
)
time.Sleep(supplierStakeWaitTime * time.Second)
continue
}
// If there is an error other than the supplier not being found, return the error
if err != nil {
return sharedtypes.Supplier{}, err
}
// If the supplier is found, break out of the wait loop.
break
}
return supplier, nil
}