Skip to content

Commit

Permalink
Add 24h volume to scorecards (#274)
Browse files Browse the repository at this point in the history
### Summary

This pull request adds volume metrics to influxdb. Also, it adds the 24h volume metric to `GET /api/v1/scorecards`.

Tracking issues: #221, #280

### Changes:
* The `parser` service no longer generates metrics for influxdb. All metrics-related code was removed from that service, that code was moved to the analytics service instead.
* New volume metrics were added to the analytics service.
* The notional cache was modified to use token names (i.e.: ticker symbols) as keys instead of chain IDs.
* The notional cache reader was moved to the `common/client/cache` package.
* A little bit of duplicated code between the cache reader and writer was removed.
* A 24h volume metric was added to `GET /api/v1/scorecards`.
* A dictionary that stores token metadata was added under `common/domain/tokenbridge.go`. More tokens will be added to it in the near future.
  • Loading branch information
agodnic committed May 4, 2023
1 parent fd51f0a commit d9d49ec
Show file tree
Hide file tree
Showing 21 changed files with 448 additions and 255 deletions.
4 changes: 3 additions & 1 deletion analytic/.gitignore
@@ -1 +1,3 @@
__debug_bin
__debug_bin
.env
analytic
5 changes: 4 additions & 1 deletion analytic/cmd/main.go
Expand Up @@ -59,7 +59,10 @@ func main() {
}

// create a metrics instance
metric := metric.New(influxCli, config.InfluxOrganization, config.InfluxBucket, logger)
metric, err := metric.New(rootCtx, influxCli, config, logger)
if err != nil {
logger.Fatal("failed to create metrics instance", zap.Error(err))
}

// create and start a consumer.
vaaConsumeFunc := newVAAConsume(rootCtx, config, logger)
Expand Down
2 changes: 2 additions & 0 deletions analytic/config/config.go
Expand Up @@ -24,6 +24,8 @@ type Configuration struct {
InfluxBucket string `env:"INFLUX_BUCKET"`
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
P2pNetwork string `env:"P2P_NETWORK,required"`
CacheURL string `env:"CACHE_URL"`
CacheChannel string `env:"CACHE_CHANNEL"`
}

// New creates a configuration with the values from .env file and environment variables.
Expand Down
193 changes: 179 additions & 14 deletions analytic/metric/metric.go
Expand Up @@ -2,51 +2,108 @@ package metric

import (
"context"
"fmt"
"math"
"math/big"
"strconv"
"time"

"github.com/go-redis/redis/v8"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"github.com/wormhole-foundation/wormhole-explorer/analytic/config"
wormscanCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)

// Metric definition.
type Metric struct {
influxCli influxdb2.Client
writeApi api.WriteAPIBlocking
logger *zap.Logger
influxCli influxdb2.Client
writeApi api.WriteAPIBlocking
logger *zap.Logger
notionalCache wormscanNotionalCache.NotionalLocalCacheReadable
}

// New create a new *Metric.
func New(influxCli influxdb2.Client, organization, bucket string, logger *zap.Logger) *Metric {
writeAPI := influxCli.WriteAPIBlocking(organization, bucket)
return &Metric{influxCli: influxCli, writeApi: writeAPI, logger: logger}
func New(
ctx context.Context,
influxCli influxdb2.Client,
cfg *config.Configuration,
logger *zap.Logger,
) (*Metric, error) {

writeAPI := influxCli.WriteAPIBlocking(cfg.InfluxOrganization, cfg.InfluxBucket)

_, notionalCache, err := newCache(ctx, cfg, logger)
if err != nil {
return nil, err
}

m := Metric{
influxCli: influxCli,
writeApi: writeAPI,
logger: logger,
notionalCache: notionalCache,
}
return &m, nil
}

func newCache(
ctx context.Context,
cfg *config.Configuration,
logger *zap.Logger,
) (wormscanCache.CacheReadable, wormscanNotionalCache.NotionalLocalCacheReadable, error) {

// use a distributed cache and for notional a pubsub to sync local cache.
redisClient := redis.NewClient(&redis.Options{Addr: cfg.CacheURL})

// get cache client
cacheClient, err := wormscanCache.NewCacheClient(redisClient, true /*enabled*/, logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create cache client: %w", err)
}

// get notional cache client and init load to local cache
notionalCache, err := wormscanNotionalCache.NewNotionalCache(ctx, redisClient, cfg.CacheChannel, logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create notional cache client: %w", err)
}
notionalCache.Init(ctx)

return cacheClient, notionalCache, nil
}

// Push implement MetricPushFunc definition.
func (m *Metric) Push(ctx context.Context, vaa *vaa.VAA) error {
return m.vaaCountMeasurement(ctx, vaa)
func (m *Metric) Push(ctx context.Context, vaa *sdk.VAA) error {

err1 := m.vaaCountMeasurement(ctx, vaa)
err2 := m.volumeMeasurement(ctx, vaa)

//TODO if we had go 1.20, we could just use `errors.Join(err1, err2)` here.
return fmt.Errorf("err1=%w, err2=%w", err1, err2)
}

// Close influx client.
func (m *Metric) Close() {
m.influxCli.Close()
}

// vaaCountMeasurement handle the push of metric point for measurement vaa_count.
func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *vaa.VAA) error {
// vaaCountMeasurement creates a new point for the `vaa_count` measurement.
func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *sdk.VAA) error {

measurement := "vaa_count"
const measurement = "vaa_count"

// Create a new point for the `vaa_count` measurement.
// Create a new point
point := influxdb2.
NewPointWithMeasurement(measurement).
AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).
AddField("count", 1).
SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(vaa.Sequence)))

// Write the point to influx.
// Write the point to influx
err := m.writeApi.WritePoint(ctx, point)
if err != nil {
m.logger.Error("failed to write metric",
Expand All @@ -59,3 +116,111 @@ func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *vaa.VAA) error {

return nil
}

// volumeMeasurement creates a new point for the `vaa_volume` measurement.
func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA) error {

const measurement = "vaa_volume"

// Decode the VAA payload
//
// If the VAA didn't come from the portal token bridge, we just skip it.
payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload)
if err != nil {
return nil
}

// Get the token metadata
//
// This is complementary data about the token that is not present in the VAA itself.
tokenMeta, ok := domain.GetTokenMetadata(payload.OriginChain, "0x"+payload.OriginAddress.String())
if !ok {
m.logger.Warn("found no token metadata for VAA",
zap.String("vaaId", vaa.MessageID()),
zap.String("tokenAddress", payload.OriginAddress.String()),
zap.Uint16("tokenChain", uint16(payload.OriginChain)),
)
return nil
}

// Normalize the amount to 8 decimals
amount := payload.Amount
if tokenMeta.Decimals < 8 {

// factor = 10 ^ (8 - tokenMeta.Decimals)
var factor big.Int
factor.Exp(big.NewInt(10), big.NewInt(int64(8-tokenMeta.Decimals)), nil)

amount = amount.Mul(amount, &factor)
}

// Try to obtain the token notional value from the cache
notional, err := m.notionalCache.Get(tokenMeta.UnderlyingSymbol)
if err != nil {
m.logger.Warn("failed to obtain notional for this token",
zap.String("vaaId", vaa.MessageID()),
zap.String("tokenAddress", payload.OriginAddress.String()),
zap.Uint16("tokenChain", uint16(payload.OriginChain)),
zap.Any("tokenMetadata", tokenMeta),
zap.Error(err),
)
return nil
}

// Convert the notional value to an integer with an implicit precision of 8 decimals
notionalBigInt, err := floatToBigInt(notional.NotionalUsd)
if err != nil {
return nil
}

// Calculate the volume, with an implicit precision of 8 decimals
var volume big.Int
volume.Mul(amount, notionalBigInt)
volume.Div(&volume, big.NewInt(1e8))

m.logger.Info("Pushing volume metrics",
zap.String("vaaId", vaa.MessageID()),
zap.String("amount", amount.String()),
zap.String("notional", notionalBigInt.String()),
zap.String("volume", volume.String()),
)

// Create a data point with volume-related fields
//
// We're converting big integers to int64 because influxdb doesn't support bigint/numeric types.
point := influxdb2.NewPointWithMeasurement(measurement).
AddTag("chain_source_id", fmt.Sprintf("%d", payload.OriginChain)).
AddTag("chain_destination_id", fmt.Sprintf("%d", payload.TargetChain)).
AddTag("app_id", domain.AppIdPortalTokenBridge).
AddField("amount", amount.Int64()).
AddField("notional", notionalBigInt.Int64()).
AddField("volume", volume.Int64()).
SetTime(vaa.Timestamp)

// Write the point to influx
err = m.writeApi.WritePoint(ctx, point)
if err != nil {
return err
}

return nil
}

// toInt converts a float64 into a big.Int with 8 decimals of implicit precision.
//
// If we ever upgrade the notional cache to store prices as big integers,
// this gnarly function won't be needed anymore.
func floatToBigInt(f float64) (*big.Int, error) {

integral, frac := math.Modf(f)

strIntegral := strconv.FormatFloat(integral, 'f', 0, 64)
strFrac := fmt.Sprintf("%.8f", frac)[2:]

i, err := strconv.ParseInt(strIntegral+strFrac, 10, 64)
if err != nil {
return nil, err
}

return big.NewInt(i), nil
}
3 changes: 3 additions & 0 deletions api/handlers/transactions/model.go
Expand Up @@ -13,6 +13,9 @@ type Scorecards struct {

// Number of VAAs emitted in the last 24 hours (does not include Pyth messages).
TxCount24h string

// Volume transferred through the token bridge in the last 24 hours, in USD.
Volume24h string
}

type GlobalTransactionDoc struct {
Expand Down
68 changes: 62 additions & 6 deletions api/handlers/transactions/repository.go
Expand Up @@ -60,6 +60,16 @@ from(bucket: "%s")
|> count()
`

const queryTemplateVolume24h = `
from(bucket: "%s")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "vaa_volume")
|> filter(fn:(r) => r._field == "volume")
|> drop(columns: ["_measurement", "app_id", "chain_destination_id", "chain_source_id", "symbol"])
|> sum(column: "_value")
|> toString()
`

type Repository struct {
influxCli influxdb2.Client
queryAPI api.QueryAPI
Expand Down Expand Up @@ -119,20 +129,28 @@ func (r *Repository) buildFindVolumeQuery(q *ChainActivityQuery) string {

func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) {

totalTxCount, err := r.getTotalTxCount(ctx)
if err != nil {
r.logger.Error("failed to query total transaction count", zap.Error(err))
}
//TODO the underlying query in this code is not using pre-summarized data.
// We should fix that before re-enabling the metric.
//totalTxCount, err := r.getTotalTxCount(ctx)
//if err != nil {
// return nil, fmt.Errorf("failed to query all-time tx count")
//}

txCount24h, err := r.getTxCount24h(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query 24h transactions: %w", err)
}

volume24h, err := r.getVolume24h(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query 24h volume: %w", err)
}

// build the result and return
scorecards := Scorecards{
TotalTxCount: totalTxCount,
TxCount24h: txCount24h,
//TotalTxCount: totalTxCount,
TxCount24h: txCount24h,
Volume24h: volume24h,
}

return &scorecards, nil
Expand Down Expand Up @@ -194,6 +212,44 @@ func (r *Repository) getTxCount24h(ctx context.Context) (string, error) {
return fmt.Sprint(row.Value), nil
}

func (r *Repository) getVolume24h(ctx context.Context) (string, error) {

// query 24h volume
query := fmt.Sprintf(queryTemplateVolume24h, r.bucket)
result, err := r.queryAPI.Query(ctx, query)
if err != nil {
r.logger.Error("failed to query 24h volume", zap.Error(err))
return "", err
}
if result.Err() != nil {
r.logger.Error("24h volume query result has errors", zap.Error(err))
return "", result.Err()
}
if !result.Next() {
return "", errors.New("expected at least one record in 24h volume query result")
}

// deserialize the row returned
row := struct {
Value string `mapstructure:"_value"`
}{}
if err := mapstructure.Decode(result.Record().Values(), &row); err != nil {
return "", fmt.Errorf("failed to decode 24h volume count query response: %w", err)
}

// If there is less than 1 USD un volume, round it down to 0 to make math simpler in the next step
l := len(row.Value)
if l < 9 {
return "0.00000000", nil
}

// Turn the integer amount into a decimal.
// The number always has 8 decimals, so we just need to insert a dot 8 digits from the end.
volume := row.Value[:l-8] + "." + row.Value[l-8:]

return volume, nil
}

// GetTransactionCount get the last transactions.
func (r *Repository) GetTransactionCount(ctx context.Context, q *TransactionCountQuery) ([]TransactionCountResult, error) {
query := r.buildLastTrxQuery(q)
Expand Down

0 comments on commit d9d49ec

Please sign in to comment.