From 968ed8d0356e31f616c08e2ddafe2c4096706cdb Mon Sep 17 00:00:00 2001 From: Rob De Feo Date: Sat, 26 Sep 2020 14:30:25 +0100 Subject: [PATCH] feat: indexer-retry substrate node connection --- cmd/indexer/commands/substrate.go | 32 +++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/cmd/indexer/commands/substrate.go b/cmd/indexer/commands/substrate.go index 802b8a68c..ac2cbe07a 100644 --- a/cmd/indexer/commands/substrate.go +++ b/cmd/indexer/commands/substrate.go @@ -2,7 +2,9 @@ package commands import ( "context" + "fmt" + "github.com/cenkalti/backoff" "github.com/jmoiron/sqlx" "github.com/mailchain/mailchain/cmd/indexer/internal/processor" sub "github.com/mailchain/mailchain/cmd/indexer/internal/substrate" @@ -22,7 +24,6 @@ func substrateCmd() *cobra.Command { TraverseChildren: true, RunE: func(cmd *cobra.Command, args []string) error { network := viper.GetString("network") - println(network) protocol := viper.GetString("protocol") blockNumber := viper.GetString("start_block") maxRetries := viper.GetUint64("max_retries") @@ -50,7 +51,23 @@ func substrateCmd() *cobra.Command { defer connPublicKey.Close() defer connEnvelope.Close() - seqProcessor, err := createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope, blockNumber, protocol, network, addressRPC) + var subClient *sub.BlockClient + operation := func() error { + var err error + subClient, err = sub.NewRPC(addressRPC) + if err != nil { + fmt.Fprintf(cmd.ErrOrStderr(), "%+v\n", err) + } + + return err + } + + if err := backoff.Retry(operation, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil { + fmt.Fprintf(cmd.ErrOrStderr(), "Number of retries has reached to %d. Exiting.\\n", 3) + return err + } + + seqProcessor, err := createSubstrateProcessor(subClient, connIndexer, connPublicKey, connEnvelope, blockNumber, protocol, network) if err != nil { return err } @@ -73,15 +90,10 @@ func substrateCmd() *cobra.Command { return cmd } -func createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope *sqlx.DB, blockNumber, protocol, network, addressRPC string) (*processor.Sequential, error) { +func createSubstrateProcessor(client *sub.BlockClient, connIndexer, connPublicKey, connEnvelope *sqlx.DB, blockNumber, protocol, network string) (*processor.Sequential, error) { ctx := context.Background() - subClient, err := sub.NewRPC(addressRPC) - if err != nil { - return nil, err - } - - blockNo, err := getBlockNumber(blockNumber, subClient) + blockNo, err := getBlockNumber(blockNumber, client) if err != nil { return nil, err } @@ -117,6 +129,6 @@ func createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope *sqlx.DB, network, syncStore, sub.NewBlockProcessor(processorTransaction), - subClient, + client, ), nil }