Skip to content

Commit

Permalink
Retrieve sender address for each transaction (#408)
Browse files Browse the repository at this point in the history
### Summary

Tracking issue: #404

The WormholeScan UI needs to display the sender address for each token bridge VAA. This pull request modifies the `tx-tracker` service to obtain that information for Solana and eight EVM chains.

The transaction sender will become accessible through the following endpoints:
* `GET /api/v1/global-tx/{chain}/{emitter}/{seq}`: field `originTx.from`.
* `GET /api/v1/transactions`: field `originAddress`.
In both cases, the field is nullable (i.e.: sometimes it may not be available due to eventual consistency or internal errors)
  • Loading branch information
agodnic committed Jun 14, 2023
1 parent a0475ab commit 1974332
Show file tree
Hide file tree
Showing 20 changed files with 177 additions and 123 deletions.
1 change: 1 addition & 0 deletions api/handlers/transactions/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type OriginTx struct {
ChainID sdk.ChainID `bson:"chainId" json:"chainId"`
TxHash string `bson:"nativeTxHash" json:"txHash"`
Timestamp *time.Time `bson:"timestamp" json:"timestamp"`
From string `bson:"from" json:"from"`
Status string `bson:"status" json:"status"`
}

Expand Down
7 changes: 7 additions & 0 deletions api/routes/wormscan/transactions/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,13 @@ func (c *Controller) ListTransactions(ctx *fiber.Ctx) error {
tx.Status = TxStatusOngoing
}

// Set the origin address, if available
if len(queryResult.Transactions[i].GlobalTransations) == 1 &&
queryResult.Transactions[i].GlobalTransations[0].OriginTx != nil {

tx.OriginAddress = queryResult.Transactions[i].GlobalTransations[0].OriginTx.From
}

response.Transactions = append(response.Transactions, tx)
}

Expand Down
1 change: 1 addition & 0 deletions api/routes/wormscan/transactions/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type TransactionOverview struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
TxHash string `json:"txHash,omitempty"`
OriginAddress string `json:"originAddress,omitempty"`
OriginChain sdk.ChainID `json:"originChain"`
DestinationAddress string `json:"destinationAddress,omitempty"`
DestinationChain sdk.ChainID `json:"destinationChain,omitempty"`
Expand Down
2 changes: 0 additions & 2 deletions deploy/tx-tracker-backfiller/env/production.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,5 @@ RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan
VAA_PAYLOAD_PARSER_TIMEOUT=10
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=6
2 changes: 0 additions & 2 deletions deploy/tx-tracker-backfiller/env/staging.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ RESOURCES_LIMITS_MEMORY=128Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=64Mi
RESOURCES_REQUESTS_CPU=250m
VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan
VAA_PAYLOAD_PARSER_TIMEOUT=10

SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=6
Expand Down
2 changes: 0 additions & 2 deletions deploy/tx-tracker-backfiller/env/test.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,5 @@ RESOURCES_LIMITS_MEMORY=128Mi
RESOURCES_LIMITS_CPU=200m
RESOURCES_REQUESTS_MEMORY=64Mi
RESOURCES_REQUESTS_CPU=100m
VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan-testnet
VAA_PAYLOAD_PARSER_TIMEOUT=10
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=6
36 changes: 32 additions & 4 deletions deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,38 @@ spec:
configMapKeyRef:
name: config
key: mongo-database
- name: VAA_PAYLOAD_PARSER_URL
value: {{ .VAA_PAYLOAD_PARSER_URL }}
- name: VAA_PAYLOAD_PARSER_TIMEOUT
value: "{{ .VAA_PAYLOAD_PARSER_TIMEOUT }}"
- name: ARBITRUM_BASE_URL
value: {{ .ARBITRUM_BASE_URL }}
- name: ARBITRUM_REQUESTS_PER_MINUTE
value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}"
- name: AVALANCHE_BASE_URL
value: {{ .AVALANCHE_BASE_URL }}
- name: AVALANCHE_REQUESTS_PER_MINUTE
value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}"
- name: BSC_BASE_URL
value: {{ .BSC_BASE_URL }}
- name: BSC_REQUESTS_PER_MINUTE
value: "{{ .BSC_REQUESTS_PER_MINUTE }}"
- name: CELO_BASE_URL
value: {{ .CELO_BASE_URL }}
- name: CELO_REQUESTS_PER_MINUTE
value: "{{ .CELO_REQUESTS_PER_MINUTE }}"
- name: ETHEREUM_BASE_URL
value: {{ .ETHEREUM_BASE_URL }}
- name: ETHEREUM_REQUESTS_PER_MINUTE
value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}"
- name: FANTOM_BASE_URL
value: {{ .FANTOM_BASE_URL }}
- name: FANTOM_REQUESTS_PER_MINUTE
value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}"
- name: OPTIMISM_BASE_URL
value: {{ .OPTIMISM_BASE_URL }}
- name: OPTIMISM_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}"
- name: POLYGON_BASE_URL
value: {{ .POLYGON_BASE_URL }}
- name: POLYGON_REQUESTS_PER_MINUTE
value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}"
- name: SOLANA_BASE_URL
value: {{ .SOLANA_BASE_URL }}
- name: SOLANA_REQUESTS_PER_MINUTE
Expand Down
2 changes: 0 additions & 2 deletions deploy/tx-tracker/env/production.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
SQS_AWS_REGION=
VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan
VAA_PAYLOAD_PARSER_TIMEOUT=10
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=6
AWS_IAM_ROLE=
3 changes: 0 additions & 3 deletions deploy/tx-tracker/env/staging.env
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ RESOURCES_REQUESTS_CPU=40m
SQS_URL=
SQS_AWS_REGION=

VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan
VAA_PAYLOAD_PARSER_TIMEOUT=10

SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=6
AWS_IAM_ROLE=
2 changes: 0 additions & 2 deletions deploy/tx-tracker/env/test.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
SQS_URL=
SQS_AWS_REGION=
VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan-testnet
VAA_PAYLOAD_PARSER_TIMEOUT=10
SOLANA_BASE_URL=https://api.devnet.solana.com
SOLANA_REQUESTS_PER_MINUTE=6
AWS_IAM_ROLE=
36 changes: 32 additions & 4 deletions deploy/tx-tracker/tx-tracker-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,38 @@ spec:
value: {{ .SQS_URL }}
- name: AWS_REGION
value: {{ .SQS_AWS_REGION }}
- name: VAA_PAYLOAD_PARSER_URL
value: {{ .VAA_PAYLOAD_PARSER_URL }}
- name: VAA_PAYLOAD_PARSER_TIMEOUT
value: "{{ .VAA_PAYLOAD_PARSER_TIMEOUT }}"
- name: ARBITRUM_BASE_URL
value: {{ .ARBITRUM_BASE_URL }}
- name: ARBITRUM_REQUESTS_PER_MINUTE
value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}"
- name: AVALANCHE_BASE_URL
value: {{ .AVALANCHE_BASE_URL }}
- name: AVALANCHE_REQUESTS_PER_MINUTE
value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}"
- name: BSC_BASE_URL
value: {{ .BSC_BASE_URL }}
- name: BSC_REQUESTS_PER_MINUTE
value: "{{ .BSC_REQUESTS_PER_MINUTE }}"
- name: CELO_BASE_URL
value: {{ .CELO_BASE_URL }}
- name: CELO_REQUESTS_PER_MINUTE
value: "{{ .CELO_REQUESTS_PER_MINUTE }}"
- name: ETHEREUM_BASE_URL
value: {{ .ETHEREUM_BASE_URL }}
- name: ETHEREUM_REQUESTS_PER_MINUTE
value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}"
- name: FANTOM_BASE_URL
value: {{ .FANTOM_BASE_URL }}
- name: FANTOM_REQUESTS_PER_MINUTE
value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}"
- name: OPTIMISM_BASE_URL
value: {{ .OPTIMISM_BASE_URL }}
- name: OPTIMISM_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}"
- name: POLYGON_BASE_URL
value: {{ .POLYGON_BASE_URL }}
- name: POLYGON_REQUESTS_PER_MINUTE
value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}"
- name: SOLANA_BASE_URL
value: {{ .SOLANA_BASE_URL }}
- name: SOLANA_REQUESTS_PER_MINUTE
Expand Down
11 changes: 2 additions & 9 deletions tx-tracker/chains/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,10 @@ func fetchEthTx(
ctx context.Context,
txHash string,
baseUrl string,
apiKey string,
) (*TxDetail, error) {

// build RPC URL
url := baseUrl
if apiKey != "" {
url += "/" + apiKey
}

// initialize RPC client
client, err := rpc.DialContext(ctx, url)
client, err := rpc.DialContext(ctx, baseUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize RPC client: %w", err)
}
Expand Down Expand Up @@ -69,7 +62,7 @@ func fetchEthTx(

// build results and return
txDetail := &TxDetail{
Signer: strings.ToLower(txReply.From),
From: strings.ToLower(txReply.From),
Timestamp: timestamp,
NativeTxHash: fmt.Sprintf("0x%s", strings.ToLower(txHash)),
}
Expand Down
4 changes: 2 additions & 2 deletions tx-tracker/chains/solanaRpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func fetchSolanaTx(
// set sender/receiver
for i := range response.Transaction.Message.AccountKeys {
if response.Transaction.Message.AccountKeys[i].Signer {
txDetail.Signer = response.Transaction.Message.AccountKeys[i].Pubkey
txDetail.From = response.Transaction.Message.AccountKeys[i].Pubkey
}
}
if txDetail.Signer == "" {
if txDetail.From == "" {
return nil, fmt.Errorf("failed to find source account")
}

Expand Down
64 changes: 60 additions & 4 deletions tx-tracker/chains/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ var (
)

type TxDetail struct {
// Signer is the address that signed the transaction, encoded in the chain's native format.
Signer string
// From is the address that signed the transaction, encoded in the chain's native format.
From string
// Timestamp indicates the time at which the transaction was confirmed.
Timestamp time.Time
// NativeTxHash contains the transaction hash, encoded in the chain's native format.
NativeTxHash string
}

var tickers = struct {
solana *time.Ticker
arbitrum *time.Ticker
avalanche *time.Ticker
bsc *time.Ticker
celo *time.Ticker
ethereum *time.Ticker
fantom *time.Ticker
optimism *time.Ticker
polygon *time.Ticker
solana *time.Ticker
}{}

func Initialize(cfg *config.RpcProviderSettings) {
Expand All @@ -44,7 +52,15 @@ func Initialize(cfg *config.RpcProviderSettings) {
return time.Duration(roundedUp)
}

// this adapter sends 2 requests per txHash
// these adapters send 2 requests per txHash
tickers.arbitrum = time.NewTicker(f(cfg.ArbitrumRequestsPerMinute / 2))
tickers.avalanche = time.NewTicker(f(cfg.AvalancheRequestsPerMinute / 2))
tickers.bsc = time.NewTicker(f(cfg.BscRequestsPerMinute / 2))
tickers.celo = time.NewTicker(f(cfg.CeloRequestsPerMinute / 2))
tickers.ethereum = time.NewTicker(f(cfg.EthereumRequestsPerMinute / 2))
tickers.fantom = time.NewTicker(f(cfg.FantomRequestsPerMinute / 2))
tickers.optimism = time.NewTicker(f(cfg.OptimismRequestsPerMinute / 2))
tickers.polygon = time.NewTicker(f(cfg.PolygonRequestsPerMinute / 2))
tickers.solana = time.NewTicker(f(cfg.SolanaRequestsPerMinute / 2))
}

Expand All @@ -63,6 +79,46 @@ func FetchTx(
case vaa.ChainIDSolana:
fetchFunc = fetchSolanaTx
rateLimiter = *tickers.solana
case vaa.ChainIDCelo:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.CeloBaseUrl)
}
rateLimiter = *tickers.celo
case vaa.ChainIDEthereum:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.EthereumBaseUrl)
}
rateLimiter = *tickers.ethereum
case vaa.ChainIDBSC:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.BscBaseUrl)
}
rateLimiter = *tickers.bsc
case vaa.ChainIDPolygon:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.PolygonBaseUrl)
}
rateLimiter = *tickers.polygon
case vaa.ChainIDFantom:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.FantomBaseUrl)
}
rateLimiter = *tickers.fantom
case vaa.ChainIDArbitrum:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.ArbitrumBaseUrl)
}
rateLimiter = *tickers.arbitrum
case vaa.ChainIDOptimism:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.OptimismBaseUrl)
}
rateLimiter = *tickers.optimism
case vaa.ChainIDAvalanche:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.AvalancheBaseUrl)
}
rateLimiter = *tickers.avalanche
default:
return nil, ErrChainNotSupported
}
Expand Down
40 changes: 16 additions & 24 deletions tx-tracker/cmd/backfiller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ func main() {
for i := uint(0); i < cfg.NumWorkers; i++ {
name := fmt.Sprintf("worker-%d", i)
p := consumerParams{
logger: makeLogger(rootLogger, name),
vaaPayloadParserSettings: &cfg.VaaPayloadParserSettings,
rpcProviderSettings: &cfg.RpcProviderSettings,
repository: repository,
queueRx: queue,
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
logger: makeLogger(rootLogger, name),
rpcProviderSettings: &cfg.RpcProviderSettings,
repository: repository,
queueRx: queue,
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
}
go consume(rootCtx, &p)
}
Expand Down Expand Up @@ -233,14 +232,13 @@ func produce(ctx context.Context, params *producerParams) {

// consumerParams contains the parameters for the consumer goroutine.
type consumerParams struct {
logger *zap.Logger
vaaPayloadParserSettings *config.VaaPayloadParserSettings
rpcProviderSettings *config.RpcProviderSettings
repository *consumer.Repository
queueRx <-chan consumer.GlobalTransaction
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
logger *zap.Logger
rpcProviderSettings *config.RpcProviderSettings
repository *consumer.Repository
queueRx <-chan consumer.GlobalTransaction
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
}

// consume reads VAA IDs from a channel, processes them, and updates the database accordingly.
Expand All @@ -252,18 +250,12 @@ type consumerParams struct {
func consume(ctx context.Context, params *consumerParams) {

// Initialize the client, which processes source Txs.
client, err := consumer.New(
client := consumer.New(
nil,
params.vaaPayloadParserSettings,
params.rpcProviderSettings,
params.logger,
params.repository,
)
if err != nil {
params.logger.Error("Failed to initialize consumer", zap.Error(err))
params.wg.Done()
return
}

// Main loop: fetch global txs and process them
for {
Expand Down Expand Up @@ -314,7 +306,7 @@ func consume(ctx context.Context, params *consumerParams) {
Sequence: v.Sequence,
TxHash: *v.TxHash,
}
err = client.ProcessSourceTx(ctx, &p)
err := client.ProcessSourceTx(ctx, &p)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),
Expand Down
2 changes: 1 addition & 1 deletion tx-tracker/cmd/fetchone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ func main() {

// print tx details
log.Printf("tx detail: sender=%s nativeTxHash=%s timestamp=%s",
txDetail.Signer, txDetail.NativeTxHash, txDetail.Timestamp)
txDetail.From, txDetail.NativeTxHash, txDetail.Timestamp)
}
5 changes: 1 addition & 4 deletions tx-tracker/cmd/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ func main() {
// create and start a consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, logger)
repository := consumer.NewRepository(logger, db)
consumer, err := consumer.New(vaaConsumeFunc, &cfg.VaaPayloadParserSettings, &cfg.RpcProviderSettings, logger, repository)
if err != nil {
logger.Fatal("Failed to create VAA consumer", zap.Error(err))
}
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, logger, repository)
consumer.Start(rootCtx)

logger.Info("Started wormhole-explorer-tx-tracker")
Expand Down
Loading

0 comments on commit 1974332

Please sign in to comment.