/
service.go
258 lines (214 loc) · 9.57 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// package service provides functions and methods
// for creating and running the api of the proxy service
package service
import (
"context"
"fmt"
"net/http"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kava-labs/kava-proxy-service/clients/cache"
"github.com/kava-labs/kava-proxy-service/clients/database"
"github.com/kava-labs/kava-proxy-service/clients/database/migrations"
"github.com/kava-labs/kava-proxy-service/config"
"github.com/kava-labs/kava-proxy-service/logging"
"github.com/kava-labs/kava-proxy-service/service/batchmdw"
"github.com/kava-labs/kava-proxy-service/service/cachemdw"
)
// ProxyService represents an instance of the proxy service API
type ProxyService struct {
Database *database.PostgresClient
Cache *cachemdw.ServiceCache
httpProxy *http.Server
evmClient *ethclient.Client
*logging.ServiceLogger
}
// New returns a new ProxyService with the specified config and error (if any)
func New(ctx context.Context, config config.Config, serviceLogger *logging.ServiceLogger) (ProxyService, error) {
service := ProxyService{}
// create database client
db, err := createDatabaseClient(ctx, config, serviceLogger)
if err != nil {
return ProxyService{}, err
}
// create evm api client
evmClient, err := ethclient.Dial(config.EvmQueryServiceURL)
if err != nil {
return ProxyService{}, err
}
// create cache client
serviceCache, err := createServiceCache(ctx, config, serviceLogger, evmClient)
if err != nil {
return ProxyService{}, err
}
// create an http router for registering handlers for a given route
mux := http.NewServeMux()
// AfterProxyFinalizer will run after the proxy middleware handler and is
// the final function called after all other middleware
// allowing it to access values added to the request context
// to do things like metric the response and cache the response
afterProxyFinalizer := createAfterProxyFinalizer(&service, config)
// set up before and after request interceptors (a.k.a. raptors 🦖🦖)
// CachingMiddleware caches request in case of:
// - request isn't already cached
// - request is cacheable
// - response is present in context
cacheAfterProxyMiddleware := serviceCache.CachingMiddleware(afterProxyFinalizer)
// ProxyRequestMiddleware responds to the client with
// - cached data if present in the context
// - a forwarded request to the appropriate backend
// Backend is decided by the Proxies configuration for a particular host.
proxyMiddleware := createProxyRequestMiddleware(cacheAfterProxyMiddleware, config, serviceLogger, []RequestInterceptor{}, []RequestInterceptor{})
// IsCachedMiddleware works in the following way:
// - tries to get response from the cache
// - if present sets cached response in context, marks as cached in context and forwards to next middleware
// - if not present marks as uncached in context and forwards to next middleware
cacheMiddleware := serviceCache.IsCachedMiddleware(proxyMiddleware)
// BatchProcessingMiddleware separates a batch into multiple requests and routes each one
// through the single request middleware sequence.
// This allows the sub-requests of a batch to leverage the cache & metric recording.
// Expects non-zero length batch to be in the context.
batchMdwConfig := batchmdw.BatchMiddlewareConfig{
ServiceLogger: serviceLogger,
ContextKeyDecodedRequestBatch: DecodedBatchRequestContextKey,
ContextKeyDecodedRequestSingle: DecodedRequestContextKey,
MaximumBatchSize: config.ProxyMaximumBatchSize,
}
batchProcessingMiddleware := batchmdw.CreateBatchProcessingMiddleware(cacheMiddleware, &batchMdwConfig)
// DecodeRequestMiddleware captures the request start time & attempts to decode the request body.
// If successful, the decoded request is put into the request context:
// - if decoded as a single EVM request: it forwards it to the single request middleware sequence
// - if decoded as a batch EVM request: it forwards it to the batchProcessingMiddleware
// - if fails to decode: it passes to single request middleware sequence which will proxy the request
// When requests fail to decode, no context value is set.
decodeRequestMiddleware := createDecodeRequestMiddleware(cacheMiddleware, batchProcessingMiddleware, serviceLogger)
// register healthcheck handler that can be used during deployment and operations
// to determine if the service is ready to receive requests
mux.HandleFunc("/healthcheck", createHealthcheckHandler(&service))
// register healthcheck handler that can be used during deployment and operations
// to determine if the service is ready to receive requests
mux.HandleFunc("/servicecheck", createServicecheckHandler(&service))
// register middleware chain as the default handler for any request to the proxy service
mux.HandleFunc("/", decodeRequestMiddleware)
// create an http server for the caller to start on demand with a call to ProxyService.Run()
server := &http.Server{
Addr: fmt.Sprintf(":%s", config.ProxyServicePort),
Handler: mux,
WriteTimeout: time.Duration(config.HTTPWriteTimeoutSeconds) * time.Second,
ReadTimeout: time.Duration(config.HTTPReadTimeoutSeconds) * time.Second,
}
// register database status handler
// for responding to requests for the status
// of database related operations such as
// proxied request metrics compaction and
// partitioning
mux.HandleFunc("/status/database", createDatabaseStatusHandler(&service, db))
service = ProxyService{
httpProxy: server,
ServiceLogger: serviceLogger,
Database: db,
Cache: serviceCache,
evmClient: evmClient,
}
return service, nil
}
// createDatabaseClient creates a connection to the database
// using the specified config and runs migrations async
// (only if migration flag in config is true) returning the
// returning the database connection and error (if any)
func createDatabaseClient(ctx context.Context, config config.Config, logger *logging.ServiceLogger) (*database.PostgresClient, error) {
databaseConfig := database.PostgresDatabaseConfig{
DatabaseName: config.DatabaseName,
DatabaseEndpointURL: config.DatabaseEndpointURL,
DatabaseUsername: config.DatabaseUserName,
DatabasePassword: config.DatabasePassword,
SSLEnabled: config.DatabaseSSLEnabled,
QueryLoggingEnabled: config.DatabaseQueryLoggingEnabled,
ReadTimeoutSeconds: config.DatabaseReadTimeoutSeconds,
WriteTimeousSeconds: config.DatabaseWriteTimeoutSeconds,
DatabaseMaxIdleConnections: config.DatabaseMaxIdleConnections,
DatabaseConnectionMaxIdleSeconds: config.DatabaseConnectionMaxIdleSeconds,
DatabaseMaxOpenConnections: config.DatabaseMaxOpenConnections,
Logger: logger,
RunDatabaseMigrations: config.RunDatabaseMigrations,
}
serviceDatabase, err := database.NewPostgresClient(databaseConfig)
if err != nil {
logger.Error().Msg(fmt.Sprintf("error %s creating database using config %+v", err, databaseConfig))
return &database.PostgresClient{}, err
}
if !databaseConfig.RunDatabaseMigrations {
logger.Debug().Msg("skipping attempting to run migrations on database since RUN_DATABASE_MIGRATIONS was false")
return &serviceDatabase, nil
}
// run migrations async so waiting for the database to
// be reachable doesn't block the ability of the proxy service
// to degrade gracefully and continue to proxy requests even
// without it's database
go func() {
// wait for database to be reachable
var databaseOnline bool
for !databaseOnline {
err = serviceDatabase.HealthCheck()
if err != nil {
logger.Debug().Msg("unable to connect to database, will retry in 1 second")
time.Sleep(1 * time.Second)
continue
}
logger.Debug().Msg("connected to database")
databaseOnline = true
}
logger.Debug().Msg("running migrations on database")
migrations, err := database.Migrate(ctx, serviceDatabase.DB, *migrations.Migrations, logger)
if err != nil {
// TODO: retry based on config
logger.Error().Msg(fmt.Sprintf("error %s running migrations on database", err))
}
logger.Debug().Msg(fmt.Sprintf("run migrations %+v \n last group %+v \n unapplied %+v", migrations.Applied(), migrations.LastGroup(), migrations.Unapplied()))
}()
return &serviceDatabase, err
}
func createServiceCache(
ctx context.Context,
config config.Config,
logger *logging.ServiceLogger,
evmclient *ethclient.Client,
) (*cachemdw.ServiceCache, error) {
cfg := cache.RedisConfig{
Address: config.RedisEndpointURL,
Password: config.RedisPassword,
DB: 0,
}
redisCache, err := cache.NewRedisCache(
&cfg,
logger,
)
if err != nil {
logger.Error().Msg(fmt.Sprintf("error %s creating cache using endpoint %+v", err, config.RedisEndpointURL))
return nil, err
}
cacheConfig := cachemdw.Config{
CacheMethodHasBlockNumberParamTTL: config.CacheMethodHasBlockNumberParamTTL,
CacheMethodHasBlockHashParamTTL: config.CacheMethodHasBlockHashParamTTL,
CacheStaticMethodTTL: config.CacheStaticMethodTTL,
CacheMethodHasTxHashParamTTL: config.CacheMethodHasTxHashParamTTL,
}
serviceCache := cachemdw.NewServiceCache(
redisCache,
evmclient,
DecodedRequestContextKey,
config.CachePrefix,
config.CacheEnabled,
config.WhitelistedHeaders,
config.DefaultAccessControlAllowOriginValue,
config.HostnameToAccessControlAllowOriginValueMap,
&cacheConfig,
logger,
)
return serviceCache, nil
}
// Run runs the proxy service, returning error (if any) in the event
// the proxy service stops
func (p *ProxyService) Run() error {
return p.httpProxy.ListenAndServe()
}