Skip to content

Commit

Permalink
feat: implement monitoring for API endpoints
Browse files Browse the repository at this point in the history
Emerald Gateway now supports monitoring via Prometheus metrics.
Monitoring can be enabled via config by setting
`gateway.monitoring.host` and `gateway.monitoring.port` endpoints.
Prometheus scraping endpoint will listen on the configured address.
  • Loading branch information
ptrus committed Apr 1, 2022
1 parent a9f4e48 commit 2242476
Show file tree
Hide file tree
Showing 17 changed files with 762 additions and 86 deletions.
33 changes: 33 additions & 0 deletions conf/config.go
Expand Up @@ -112,6 +112,9 @@ type GatewayConfig struct {
// HTTP is the gateway http endpoint config.
HTTP *GatewayHTTPConfig `koanf:"http"`

// Monitoring is the gateway prometheus configuration.
Monitoring *GatewayMonitoringConfig `koanf:"monitoring"`

// WS is the gateway websocket endpoint config.
WS *GatewayWSConfig `koanf:"ws"`

Expand All @@ -122,6 +125,36 @@ type GatewayConfig struct {
MethodLimits *MethodLimits `koanf:"method_limits"`
}

// GatewayMonitoringConfig is the gateway prometheus configuration.
type GatewayMonitoringConfig struct {
// Host is the host interface on which to start the prometheus http server. Disabled if unset.
Host string `koanf:"host"`

// Port is the port number on which to start the prometheus http server.
Port int `koanf:"port"`
}

// Enabled returns true if monitoring is configured.
func (cfg *GatewayMonitoringConfig) Enabled() bool {
if cfg == nil {
return false
}
if cfg.Host == "" {
return false
}
return true
}

// Address returns the prometheus listen address.
//
// Returns empty string if monitoring is not configured.
func (cfg *GatewayMonitoringConfig) Address() string {
if !cfg.Enabled() {
return ""
}
return fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
}

// Validate validates the gateway configuration.
func (cfg *GatewayConfig) Validate() error {
// TODO:
Expand Down
3 changes: 3 additions & 0 deletions conf/server.yml
Expand Up @@ -32,5 +32,8 @@ gateway:
ws:
host: "localhost"
port: 8546
monitoring:
host: "" # Disabled.
port: 9999
method_limits:
get_logs_max_rounds: 100
3 changes: 3 additions & 0 deletions conf/tests.yml
Expand Up @@ -29,5 +29,8 @@ gateway:
ws:
host: "localhost"
port: 8546
monitoring:
host: "localhost"
port: 9999
method_limits:
get_logs_max_rounds: 100
3 changes: 3 additions & 0 deletions docker/emerald-dev/emerald-dev.yml
Expand Up @@ -26,5 +26,8 @@ gateway:
host: "0.0.0.0"
port: 8546
cors: ["*"]
monitoring:
host: "0.0.0.0"
port: 9999
method_limits:
get_logs_max_rounds: 100
25 changes: 20 additions & 5 deletions rpc/apis.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/oasisprotocol/emerald-web3-gateway/indexer"
"github.com/oasisprotocol/emerald-web3-gateway/rpc/eth"
"github.com/oasisprotocol/emerald-web3-gateway/rpc/eth/filters"
ethmetrics "github.com/oasisprotocol/emerald-web3-gateway/rpc/eth/metrics"
"github.com/oasisprotocol/emerald-web3-gateway/rpc/net"
"github.com/oasisprotocol/emerald-web3-gateway/rpc/txpool"
"github.com/oasisprotocol/emerald-web3-gateway/rpc/web3"
Expand All @@ -27,35 +28,49 @@ func GetRPCAPIs(
) []ethRpc.API {
var apis []ethRpc.API

web3Service := web3.NewPublicAPI()
ethService := eth.NewPublicAPI(client, logging.GetLogger("eth_rpc"), config.ChainID, backend, config.MethodLimits)
netService := net.NewPublicAPI(config.ChainID)
txpoolService := txpool.NewPublicAPI()
filtersService := filters.NewPublicAPI(client, logging.GetLogger("eth_filters"), backend, eventSystem)

if config.Monitoring.Enabled() {
web3Service = web3.NewMetricsWrapper(web3Service)
netService = net.NewMetricsWrapper(netService)
ethService = ethmetrics.NewMetricsWrapper(ethService, logging.GetLogger("eth_rpc_metrics"), backend)
txpoolService = txpool.NewMetricsWrapper(txpoolService)
filtersService = filters.NewMetricsWrapper(filtersService)
}

apis = append(apis,
ethRpc.API{
Namespace: "web3",
Version: "1.0",
Service: web3.NewPublicAPI(),
Service: web3Service,
Public: true,
},
ethRpc.API{
Namespace: "net",
Version: "1.0",
Service: net.NewPublicAPI(config.ChainID),
Service: netService,
Public: true,
},
ethRpc.API{
Namespace: "eth",
Version: "1.0",
Service: eth.NewPublicAPI(client, logging.GetLogger("eth_rpc"), config.ChainID, backend, config.MethodLimits),
Service: ethService,
Public: true,
},
ethRpc.API{
Namespace: "txpool",
Version: "1.0",
Service: txpool.NewPublicAPI(),
Service: txpoolService,
Public: true,
},
ethRpc.API{
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicAPI(client, logging.GetLogger("eth_filters"), backend, eventSystem),
Service: filtersService,
Public: true,
},
)
Expand Down
134 changes: 80 additions & 54 deletions rpc/eth/api.go

Large diffs are not rendered by default.

19 changes: 13 additions & 6 deletions rpc/eth/filters/api.go
Expand Up @@ -15,8 +15,15 @@ import (
"github.com/oasisprotocol/emerald-web3-gateway/indexer"
)

// PublicAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec.
type PublicFilterAPI struct {
// API is the eth_ prefixed set of APIs in the filtering Web3 JSON-RPC spec.
type API interface {
// NewHeads send a notification each time a new (header) block is appended to the chain.
NewHeads(ctx context.Context) (*ethrpc.Subscription, error)
// Logs creates a subscription that fires for all new log that match the given filter criteria.
Logs(ctx context.Context, crit ethfilters.FilterCriteria) (*ethrpc.Subscription, error)
}

type publicFilterAPI struct {
client client.RuntimeClient
backend indexer.Backend
Logger *logging.Logger
Expand All @@ -29,16 +36,16 @@ func NewPublicAPI(
logger *logging.Logger,
backend indexer.Backend,
eventSystem *eventFilters.EventSystem,
) *PublicFilterAPI {
return &PublicFilterAPI{
) API {
return &publicFilterAPI{
client: client,
Logger: logger,
backend: backend,
es: eventSystem,
}
}

func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*ethrpc.Subscription, error) {
func (api *publicFilterAPI) NewHeads(ctx context.Context) (*ethrpc.Subscription, error) {
notifier, supported := ethrpc.NotifierFromContext(ctx)
if !supported {
return &ethrpc.Subscription{}, ethrpc.ErrNotificationsUnsupported
Expand Down Expand Up @@ -71,7 +78,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*ethrpc.Subscription,
return rpcSub, nil
}

func (api *PublicFilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria) (*ethrpc.Subscription, error) {
func (api *publicFilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria) (*ethrpc.Subscription, error) {
notifier, supported := ethrpc.NotifierFromContext(ctx)
if !supported {
return &ethrpc.Subscription{}, ethrpc.ErrNotificationsUnsupported
Expand Down
90 changes: 90 additions & 0 deletions rpc/eth/filters/metrics.go
@@ -0,0 +1,90 @@
package filters

import (
"context"

ethfilters "github.com/ethereum/go-ethereum/eth/filters"
ethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/oasisprotocol/emerald-web3-gateway/rpc/metrics"
)

var (
durations = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "oasis_emerald_web3_gateway_subscription_seconds",
// Buckets ranging from 1 second to 24 hours.
Buckets: []float64{1, 10, 30, 60, 600, 1800, 3600, 7200, 21600, 86400},
Help: "Histogram for the eth subscription API subscriptions duration.",
},
[]string{"method_name"},
)
inflightSubs = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_emerald_web3_gateway_subscription_inflight",
Help: "Number of concurrent eth inflight subscriptions.",
},
[]string{"method_name"},
)
)

type metricsWrapper struct {
api API
}

// Logs implements API.
func (m *metricsWrapper) Logs(ctx context.Context, crit ethfilters.FilterCriteria) (rpcSub *ethrpc.Subscription, err error) {
r, s, f, i, d := metrics.GetAPIMethodMetrics("eth_logs")
defer metrics.InstrumentCaller(r, s, f, i, d, &err)()

inflightSubs := inflightSubs.WithLabelValues("eth_logs")
duration := durations.WithLabelValues("eth_logs")

// Measure subscirpiton duration and concurrent subscriptions.
timer := prometheus.NewTimer(duration)
inflightSubs.Inc()

rpcSub, err = m.api.Logs(ctx, crit)
go func() {
// Wait for subscription to unsubscribe.
<-rpcSub.Err()
timer.ObserveDuration()
// Decrement in-flight.
inflightSubs.Dec()
}()

return rpcSub, err
}

// NewHeads implements API.
func (m *metricsWrapper) NewHeads(ctx context.Context) (rpcSub *ethrpc.Subscription, err error) {
r, s, f, i, d := metrics.GetAPIMethodMetrics("eth_newHeads")
defer metrics.InstrumentCaller(r, s, f, i, d, &err)()

inflightSubs := inflightSubs.WithLabelValues("eth_newHeads")
duration := durations.WithLabelValues("eth_newHeads")

// Measure subscirpiton duration and concurrent subscriptions.
timer := prometheus.NewTimer(duration)
inflightSubs.Inc()

rpcSub, err = m.api.NewHeads(ctx)
go func() {
// Wait for subscription to unsubscribe.
<-rpcSub.Err()
timer.ObserveDuration()
// Decrement in-flight.
inflightSubs.Dec()
}()

return rpcSub, err
}

// NewMetricsWrapper returns an instrumanted API service.
func NewMetricsWrapper(api API) API {
return &metricsWrapper{
api,
}
}

0 comments on commit 2242476

Please sign in to comment.