Skip to content

Commit

Permalink
feat: indexer-retry substrate node connection
Browse files Browse the repository at this point in the history
  • Loading branch information
robdefeo committed Sep 26, 2020
1 parent 241500d commit 968ed8d
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions cmd/indexer/commands/substrate.go
Expand Up @@ -2,7 +2,9 @@ package commands

import (
"context"
"fmt"

"github.com/cenkalti/backoff"
"github.com/jmoiron/sqlx"
"github.com/mailchain/mailchain/cmd/indexer/internal/processor"
sub "github.com/mailchain/mailchain/cmd/indexer/internal/substrate"
Expand All @@ -22,7 +24,6 @@ func substrateCmd() *cobra.Command {
TraverseChildren: true,
RunE: func(cmd *cobra.Command, args []string) error {
network := viper.GetString("network")
println(network)
protocol := viper.GetString("protocol")
blockNumber := viper.GetString("start_block")
maxRetries := viper.GetUint64("max_retries")
Expand Down Expand Up @@ -50,7 +51,23 @@ func substrateCmd() *cobra.Command {
defer connPublicKey.Close()
defer connEnvelope.Close()

seqProcessor, err := createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope, blockNumber, protocol, network, addressRPC)
var subClient *sub.BlockClient
operation := func() error {
var err error
subClient, err = sub.NewRPC(addressRPC)
if err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "%+v\n", err)
}

return err
}

if err := backoff.Retry(operation, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "Number of retries has reached to %d. Exiting.\\n", 3)
return err
}

seqProcessor, err := createSubstrateProcessor(subClient, connIndexer, connPublicKey, connEnvelope, blockNumber, protocol, network)
if err != nil {
return err
}
Expand All @@ -73,15 +90,10 @@ func substrateCmd() *cobra.Command {
return cmd
}

func createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope *sqlx.DB, blockNumber, protocol, network, addressRPC string) (*processor.Sequential, error) {
func createSubstrateProcessor(client *sub.BlockClient, connIndexer, connPublicKey, connEnvelope *sqlx.DB, blockNumber, protocol, network string) (*processor.Sequential, error) {
ctx := context.Background()

subClient, err := sub.NewRPC(addressRPC)
if err != nil {
return nil, err
}

blockNo, err := getBlockNumber(blockNumber, subClient)
blockNo, err := getBlockNumber(blockNumber, client)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,6 +129,6 @@ func createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope *sqlx.DB,
network,
syncStore,
sub.NewBlockProcessor(processorTransaction),
subClient,
client,
), nil
}

0 comments on commit 968ed8d

Please sign in to comment.