Skip to content

Commit

Permalink
(unfinished): config and client factory changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed Mar 16, 2023
1 parent 59138cd commit dcc64ff
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 238 deletions.
8 changes: 2 additions & 6 deletions analyzer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ type Analyzer interface {
// ConsensusConfig specifies configuration parameters for
// for processing the consensus layer.
type ConsensusConfig struct {
// ChainID is the chain ID for the underlying network.
ChainID string

// ChainContext is the ChainContext (= chain identifier, based on a hash of the
// genesis file) for the underlying network.
ChainContext string
// ChainName e.g. "mainnet."
ChainName string

// Range is the range of blocks to process.
// If this is set, the analyzer analyzes blocks in the provided range.
Expand Down
38 changes: 14 additions & 24 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
oasisConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"

"github.com/oasisprotocol/oasis-indexer/analyzer"
"github.com/oasisprotocol/oasis-indexer/analyzer/queries"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/oasisprotocol/oasis-indexer/metrics"
"github.com/oasisprotocol/oasis-indexer/storage"
source "github.com/oasisprotocol/oasis-indexer/storage/oasis"
"github.com/oasisprotocol/oasis-indexer/storage/oasis/nodeapi/history"
)

const (
Expand Down Expand Up @@ -57,22 +57,11 @@ type Main struct {
var _ analyzer.Analyzer = (*Main)(nil)

// NewMain returns a new main analyzer for the consensus layer.
func NewMain(nodeCfg config.NodeConfig, cfg *config.BlockBasedAnalyzerConfig, target storage.TargetStorage, logger *log.Logger) (*Main, error) {
func NewMain(sourceConfig config.SourceConfig, cfg *config.BlockBasedAnalyzerConfig, target storage.TargetStorage, logger *log.Logger) (*Main, error) {
ctx := context.Background()

// Initialize source storage.
networkCfg := oasisConfig.Network{
ChainContext: nodeCfg.ChainContext,
RPC: nodeCfg.RPC,
}
factory, err := source.NewClientFactory(ctx, &networkCfg, nodeCfg.FastStartup)
if err != nil {
logger.Error("error creating client factory",
"err", err.Error(),
)
return nil, err
}
client, err := factory.Consensus()
client, err := source.NewConsensusClient(ctx, &sourceConfig)
if err != nil {
logger.Error("error creating consensus client",
"err", err.Error(),
Expand All @@ -86,10 +75,9 @@ func NewMain(nodeCfg config.NodeConfig, cfg *config.BlockBasedAnalyzerConfig, ta
To: cfg.To,
}
ac := analyzer.ConsensusConfig{
ChainID: nodeCfg.ChainID,
ChainContext: nodeCfg.ChainContext,
Range: blockRange,
Source: client,
ChainName: sourceConfig.ChainName,
Range: blockRange,
Source: client,
}

logger.Info("Starting consensus analyzer", "config", ac)
Expand All @@ -108,15 +96,17 @@ func (m *Main) Start() {
// Get block to be indexed.
var height int64

isGenesisProcessed, err := m.isGenesisProcessed(ctx)
// TODO: Generalize processing multiple genesis documents.
genesisChainContext := history.Networks[m.cfg.ChainName].CurrentRecord().ChainContext
isGenesisProcessed, err := m.isGenesisProcessed(ctx, genesisChainContext)
if err != nil {
m.logger.Error("failed to check if genesis is processed",
"err", err.Error(),
)
return
}
if !isGenesisProcessed {
if err = m.processGenesis(ctx); err != nil {
if err = m.processGenesis(ctx, genesisChainContext); err != nil {
m.logger.Error("failed to process genesis",
"err", err.Error(),
)
Expand Down Expand Up @@ -208,19 +198,19 @@ func (m *Main) latestBlock(ctx context.Context) (int64, error) {
return latest, nil
}

func (m *Main) isGenesisProcessed(ctx context.Context) (bool, error) {
func (m *Main) isGenesisProcessed(ctx context.Context, chainContext string) (bool, error) {
var processed bool
if err := m.target.QueryRow(
ctx,
queries.IsGenesisProcessed,
m.cfg.ChainContext,
chainContext,
).Scan(&processed); err != nil {
return false, err
}
return processed, nil
}

func (m *Main) processGenesis(ctx context.Context) error {
func (m *Main) processGenesis(ctx context.Context, chainContext string) error {
m.logger.Info("fetching genesis document")
genesisDoc, err := m.cfg.Source.GenesisDocument(ctx)
if err != nil {
Expand Down Expand Up @@ -251,7 +241,7 @@ func (m *Main) processGenesis(ctx context.Context) error {
}
batch.Queue(
queries.GenesisIndexingProgress,
m.cfg.ChainContext,
chainContext,
)
if err := m.target.SendBatch(ctx, batch); err != nil {
return err
Expand Down
28 changes: 2 additions & 26 deletions analyzer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/jackc/pgx/v5"
oasisConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"
"github.com/oasisprotocol/oasis-sdk/client-sdk/go/modules/rewards"
"github.com/oasisprotocol/oasis-sdk/client-sdk/go/types"

Expand Down Expand Up @@ -44,38 +43,15 @@ var _ analyzer.Analyzer = (*Main)(nil)
// NewRuntimeAnalyzer returns a new main analyzer for a runtime.
func NewRuntimeAnalyzer(
runtime analyzer.Runtime,
nodeCfg config.NodeConfig,
sourceConfig *config.SourceConfig,
cfg *config.BlockBasedAnalyzerConfig,
target storage.TargetStorage,
logger *log.Logger,
) (*Main, error) {
ctx := context.Background()

// Initialize source storage.
networkCfg := oasisConfig.Network{
ChainContext: nodeCfg.ChainContext,
RPC: nodeCfg.RPC,
}
factory, err := oasis.NewClientFactory(ctx, &networkCfg, nodeCfg.FastStartup)
if err != nil {
logger.Error("error creating client factory",
"err", err,
)
return nil, err
}

network, err := analyzer.FromChainContext(nodeCfg.ChainContext)
if err != nil {
return nil, err
}

id, err := runtime.ID(network)
if err != nil {
return nil, err
}
logger.Info("Runtime ID determined", "runtime", runtime.String(), "runtime_id", id)

client, err := factory.Runtime(id)
client, err := oasis.NewRuntimeClient(ctx, sourceConfig, runtime)
if err != nil {
logger.Error("error creating runtime client",
"err", err,
Expand Down
6 changes: 4 additions & 2 deletions cmd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/oasis-indexer/log"
"github.com/oasisprotocol/oasis-indexer/metrics"
storage "github.com/oasisprotocol/oasis-indexer/storage/client"
"github.com/oasisprotocol/oasis-indexer/storage/oasis/nodeapi/history"
)

const (
Expand Down Expand Up @@ -100,14 +101,15 @@ func NewService(cfg *config.ServerConfig) (*Service, error) {
if err != nil {
return nil, err
}
client, err := storage.NewStorageClient(cfg.ChainID, cfg.ChainName, backing, logger)
chainID := history.Networks[cfg.ChainName].CurrentRecord().ChainContext
client, err := storage.NewStorageClient(chainID, cfg.ChainName, backing, logger)
if err != nil {
return nil, err
}

return &Service{
address: cfg.Endpoint,
chainID: cfg.ChainID,
chainID: chainID,
target: client,
logger: logger,
}, nil
Expand Down
40 changes: 19 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (cfg *Config) Validate() error {

// AnalysisConfig is the configuration for chain analyzers.
type AnalysisConfig struct {
// Node is the configuration for accessing oasis-node.
Node NodeConfig `koanf:"node"`
// Source is the configuration for accessing oasis-node(s).
Source SourceConfig `koanf:"source"`

// Analyzers is the analyzer configs.
Analyzers AnalyzersList `koanf:"analyzers"`
Expand All @@ -61,6 +61,9 @@ type AnalysisConfig struct {

// Validate validates the analysis configuration.
func (cfg *AnalysisConfig) Validate() error {
if cfg.Source.ChainName == "" {
return fmt.Errorf("no chain name provided")
}
if cfg.Analyzers.Consensus != nil {
if err := cfg.Analyzers.Consensus.Validate(); err != nil {
return err
Expand Down Expand Up @@ -109,26 +112,27 @@ type AnalyzersList struct {
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
}

// NodeConfig is the configuration for chain analyzers.
type NodeConfig struct {
// ChainID is the chain ID of the chain this analyzer will process.
// Used to identify the chain in the database.
ChainID string `koanf:"chain_id"`

// RPC is the node endpoint.
RPC string `koanf:"rpc"`
// SourceConfig has some controls about what chain we're analyzing and how to connect.
type SourceConfig struct {
// ChainName is the name of the chain (e.g. mainnet/testnet/local).
ChainName string `koanf:"chain_name"`

// ChainContext is the domain separation context.
// It uniquely identifies the chain; it is derived as a hash of the genesis data.
// Used as safety check to prevent accidental use of the wrong RPC endpoint.
ChainContext string `koanf:"chaincontext"`
// Nodes describe the oasis-node(s) to connect to. Keys are "archive
// names," e.g. "damask."
Nodes map[string]*NodeConfig `koanf:"nodes"`

// If set, the analyzer will skip some initial checks, e.g. that
// `rpc` really serves the chain with `chain_context`.
// `rpc` really serves the chain with the chain context we expect.
// NOT RECOMMENDED in production; intended for faster testing.
FastStartup bool `koanf:"fast_startup"`
}

// NodeConfig is information about one oasis-node to connect to.
type NodeConfig struct {
// RPC is the node endpoint.
RPC string `koanf:"rpc"`
}

type BlockBasedAnalyzerConfig struct {
// From is the (inclusive) starting block for this analyzer.
From int64 `koanf:"from"`
Expand Down Expand Up @@ -186,9 +190,6 @@ func (cfg *AggregateStatsConfig) Validate() error {

// ServerConfig contains the API server configuration.
type ServerConfig struct {
// ChainID is the chain ID (normally latest) for the server API
ChainID string `koanf:"chain_id"`

// ChainName is the name of the chain (e.g. mainnet/testnet/local).
ChainName string `koanf:"chain_name"`

Expand All @@ -206,9 +207,6 @@ func (cfg *ServerConfig) Validate() error {
if cfg.Storage == nil {
return fmt.Errorf("no storage config provided")
}
if cfg.ChainName == "" {
return fmt.Errorf("no chain name provided")
}

return cfg.Storage.Validate(false /* requireMigrations */)
}
Expand Down
10 changes: 5 additions & 5 deletions config/docker-dev.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
analysis:
node:
chain_id: oasis-3
rpc: unix:/tmp/node.sock #unix:/node/data/internal.sock
chaincontext: b11b369e0da5bb230b220127f5e7b242d385ef8c6f54906243f30af63c815535
source:
chain_name: mainnet
nodes:
damask:
rpc: unix:/tmp/node.sock #unix:/node/data/internal.sock
analyzers:
metadata_registry:
interval: 1h
Expand All @@ -20,7 +21,6 @@ analysis:
migrations: file://storage/migrations

server:
chain_id: oasis-3
chain_name: mainnet
endpoint: 0.0.0.0:8008
storage:
Expand Down
10 changes: 5 additions & 5 deletions config/local-dev.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
analysis:
node:
chain_id: oasis-3
rpc: unix:/tmp/node.sock #unix:/node/data/internal.sock
chaincontext: b11b369e0da5bb230b220127f5e7b242d385ef8c6f54906243f30af63c815535
source:
chain_name: mainnet
nodes:
damask:
rpc: unix:/tmp/node.sock #unix:/node/data/internal.sock
analyzers:
metadata_registry:
interval: 1h
Expand All @@ -20,7 +21,6 @@ analysis:
migrations: file://storage/migrations

server:
chain_id: oasis-3
chain_name: mainnet
endpoint: localhost:8008
storage:
Expand Down
Loading

0 comments on commit dcc64ff

Please sign in to comment.