Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport the psql indexer into v0.34.x #6906

Merged
merged 19 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,12 @@ type TxIndexConfig struct {
// 1) "null"
// 2) "kv" (default) - the simplest possible indexer,
// backed by key-value storage (defaults to levelDB; see DBBackend).
// 3) "psql" - the indexer services backed by PostgreSQL.
Indexer string `mapstructure:"indexer"`

// The PostgreSQL connection configuration, the connection format:
// postgresql://<user>:<password>@<host>:<port>/<db>?<opts>
PsqlConn string `mapstructure:"psql-conn"`
}

// DefaultTxIndexConfig returns a default configuration for the transaction indexer.
Expand Down
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ module github.com/tendermint/tendermint
go 1.15

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
creachadair marked this conversation as resolved.
Show resolved Hide resolved
github.com/BurntSushi/toml v0.3.1
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/Workiva/go-datastructures v1.0.52
github.com/adlio/schema v1.1.13
github.com/btcsuite/btcd v0.21.0-beta
github.com/btcsuite/btcutil v1.0.2
github.com/confio/ics23/go v0.6.3
github.com/containerd/continuity v0.1.0 // indirect
github.com/cosmos/iavl v0.15.3
github.com/fortytw2/leaktest v1.3.0
github.com/go-kit/kit v0.10.0
github.com/go-logfmt/logfmt v0.5.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.0
github.com/google/orderedcode v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/gtank/merlin v0.1.1
github.com/lib/pq v1.2.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/minio/highwayhash v1.0.1
github.com/opencontainers/runc v1.0.2 // indirect
github.com/ory/dockertest v3.3.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
Expand All @@ -31,6 +38,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tendermint/tm-db v0.6.4
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f
golang.org/x/sys v0.0.0-20210903071746-97244b99971b // indirect
google.golang.org/grpc v1.37.0
)
91 changes: 84 additions & 7 deletions go.sum

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/tendermint/tendermint/state/indexer"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null"
Expand Down Expand Up @@ -259,6 +260,7 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {

func createAndStartIndexerService(
config *cfg.Config,
chainID string,
dbProvider DBProvider,
eventBus *types.EventBus,
logger log.Logger,
Expand All @@ -278,6 +280,18 @@ func createAndStartIndexerService(

txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))

case "psql":
if config.TxIndex.PsqlConn == "" {
return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`)
}
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err)
}
txIndexer = es.TxIndexer()
blockIndexer = es.BlockIndexer()

default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
Expand Down Expand Up @@ -681,7 +695,7 @@ func NewNode(config *cfg.Config,
return nil, err
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}
Expand Down
78 changes: 78 additions & 0 deletions state/indexer/sink/psql/backport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package psql

// This file adds code to the psql package that is needed for integration with
// v0.34, but which is not part of the original implementation.

import (
"context"
"errors"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)

const (
eventTypeBeginBlock = "begin_block"
eventTypeEndBlock = "end_block"
)

// TxIndexer returns a bridge from es to the Tendermint v0.34 transaction indexer.
func (es *EventSink) TxIndexer() BackportTxIndexer {
creachadair marked this conversation as resolved.
Show resolved Hide resolved
return BackportTxIndexer{psql: es}
}

// BackportTxIndexer implements the txindex.TxIndexer interface by delegating
// indexing operations to an underlying PostgreSQL event sink.
type BackportTxIndexer struct{ psql *EventSink }

// AddBatch indexes a batch of transactions in Postgres, as part of TxIndexer.
func (b BackportTxIndexer) AddBatch(batch *txindex.Batch) error {
return b.psql.IndexTxEvents(batch.Ops)
}

// Index indexes a single transaction result in Postgres, as part of TxIndexer.
func (b BackportTxIndexer) Index(txr *abci.TxResult) error {
return b.psql.IndexTxEvents([]*abci.TxResult{txr})
}

// Get is implemented to satisfy the TxIndexer interface, but is not supported
// by the psql event sink and reports an error for all inputs.
func (BackportTxIndexer) Get([]byte) (*abci.TxResult, error) {
return nil, errors.New("the TxIndexer.Get method is not supported")
}

// Search is implemented to satisfy the TxIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportTxIndexer) Search(context.Context, *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("the TxIndexer.Search method is not supported")
}

// BlockIndexer returns a bridge that implements the Tendermint v0.34 block
// indexer interface, using the Postgres event sink as a backing store.
func (es *EventSink) BlockIndexer() BackportBlockIndexer {
return BackportBlockIndexer{psql: es}
}

// BackportBlockIndexer implements the indexer.BlockIndexer interface by
// delegating indexing operations to an underlying PostgreSQL event sink.
type BackportBlockIndexer struct{ psql *EventSink }

// Has is implemented to satisfy the BlockIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportBlockIndexer) Has(height int64) (bool, error) {
return false, errors.New("the BlockIndexer.Has method is not supported")
}

// Index indexes block begin and end events for the specified block. It is
// part of the BlockIndexer interface.
func (b BackportBlockIndexer) Index(block types.EventDataNewBlockHeader) error {
return b.psql.IndexBlockEvents(block)
}

// Search is implemented to satisfy the BlockIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportBlockIndexer) Search(context.Context, *query.Query) ([]int64, error) {
return nil, errors.New("the BlockIndexer.Search method is not supported")
}
11 changes: 11 additions & 0 deletions state/indexer/sink/psql/backport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package psql

import (
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
)

var (
_ indexer.BlockIndexer = BackportBlockIndexer{}
_ txindex.TxIndexer = BackportTxIndexer{}
tychoish marked this conversation as resolved.
Show resolved Hide resolved
)