diff --git a/configs/config.go b/configs/config.go index 0be0feb..b133041 100644 --- a/configs/config.go +++ b/configs/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" "strings" "github.com/rs/zerolog/log" @@ -85,6 +86,13 @@ type ClickhouseConfig struct { ChainBasedConfig map[string]TableOverrideConfig `mapstructure:"chainBasedConfig"` EnableParallelViewProcessing bool `mapstructure:"enableParallelViewProcessing"` MaxQueryTime int `mapstructure:"maxQueryTime"` + + // Readonly configuration for API endpoints + ReadonlyHost string `mapstructure:"readonlyHost"` + ReadonlyPort int `mapstructure:"readonlyPort"` + ReadonlyUsername string `mapstructure:"readonlyUsername"` + ReadonlyPassword string `mapstructure:"readonlyPassword"` + ReadonlyDatabase string `mapstructure:"readonlyDatabase"` } type PostgresConfig struct { @@ -282,5 +290,35 @@ func setCustomJSONConfigs() error { Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig } } + + // Load readonly ClickHouse configuration from environment variables + if readonlyHost := os.Getenv("CLICKHOUSE_HOST_READONLY"); readonlyHost != "" { + if Cfg.Storage.Main.Clickhouse != nil { + Cfg.Storage.Main.Clickhouse.ReadonlyHost = readonlyHost + } + } + if readonlyPort := os.Getenv("CLICKHOUSE_PORT_READONLY"); readonlyPort != "" { + if port, err := strconv.Atoi(readonlyPort); err == nil { + if Cfg.Storage.Main.Clickhouse != nil { + Cfg.Storage.Main.Clickhouse.ReadonlyPort = port + } + } + } + if readonlyUsername := os.Getenv("CLICKHOUSE_USER_READONLY"); readonlyUsername != "" { + if Cfg.Storage.Main.Clickhouse != nil { + Cfg.Storage.Main.Clickhouse.ReadonlyUsername = readonlyUsername + } + } + if readonlyPassword := os.Getenv("CLICKHOUSE_PASSWORD_READONLY"); readonlyPassword != "" { + if Cfg.Storage.Main.Clickhouse != nil { + Cfg.Storage.Main.Clickhouse.ReadonlyPassword = readonlyPassword + } + } + if readonlyDatabase := os.Getenv("CLICKHOUSE_DATABASE_READONLY"); readonlyDatabase != "" { + if Cfg.Storage.Main.Clickhouse != nil { + Cfg.Storage.Main.Clickhouse.ReadonlyDatabase = readonlyDatabase + } + } + return nil } diff --git a/configs/config_readonly_example.yml b/configs/config_readonly_example.yml new file mode 100644 index 0000000..2991039 --- /dev/null +++ b/configs/config_readonly_example.yml @@ -0,0 +1,42 @@ +# Example configuration for readonly ClickHouse API endpoints +# This configuration allows you to use a separate readonly ClickHouse instance +# for user-facing API queries while keeping the main orchestration flow unchanged + +storage: + main: + clickhouse: + # Main ClickHouse configuration (for orchestration) + host: "localhost" + port: 9000 + username: "default" + password: "password" + database: "insight" + + # Readonly ClickHouse configuration (for API endpoints) + # These environment variables will override the main config for API queries + readonlyHost: "${CLICKHOUSE_HOST_READONLY}" # e.g., "readonly-clickhouse.example.com" + readonlyPort: "${CLICKHOUSE_PORT_READONLY}" # e.g., 9000 + readonlyUsername: "${CLICKHOUSE_USER_READONLY}" # e.g., "readonly_user" + readonlyPassword: "${CLICKHOUSE_PASSWORD_READONLY}" # e.g., "readonly_password" + readonlyDatabase: "${CLICKHOUSE_DATABASE_READONLY}" # e.g., "insight_readonly" + + # Other ClickHouse settings + disableTLS: false + maxOpenConns: 100 + maxIdleConns: 10 + maxRowsPerInsert: 100000 + enableParallelViewProcessing: true + maxQueryTime: 300 + +# Environment variables to set: +# CLICKHOUSE_HOST_READONLY=readonly-clickhouse.example.com +# CLICKHOUSE_PORT_READONLY=9000 +# CLICKHOUSE_USER_READONLY=readonly_user +# CLICKHOUSE_PASSWORD_READONLY=readonly_password +# CLICKHOUSE_DATABASE_READONLY=insight_readonly + +# How it works: +# 1. When readonly environment variables are set, API endpoints will use the readonly ClickHouse instance +# 2. The orchestration flow (indexer, committer, etc.) will continue to use the main ClickHouse instance +# 3. This provides read/write separation and allows you to scale your readonly queries independently +# 4. If readonly configuration is not provided, API endpoints will fall back to the main ClickHouse instance diff --git a/docs/README_READONLY_CLICKHOUSE.md b/docs/README_READONLY_CLICKHOUSE.md new file mode 100644 index 0000000..86d2b4b --- /dev/null +++ b/docs/README_READONLY_CLICKHOUSE.md @@ -0,0 +1,235 @@ +# Readonly ClickHouse for API Endpoints + +This document explains how to configure and use a readonly ClickHouse instance for user-facing API endpoints while keeping the main orchestration flow unchanged. + +## Overview + +The readonly ClickHouse feature allows you to: +- Use a separate, read-only ClickHouse instance for API queries +- Keep the main ClickHouse instance for orchestration (indexing, committing, etc.) +- Scale read and write operations independently +- Improve API performance by using dedicated read replicas + +## Configuration + +### Environment Variables + +Set the following environment variables to enable readonly ClickHouse: + +```bash +export CLICKHOUSE_HOST_READONLY="readonly-clickhouse.example.com" +export CLICKHOUSE_PORT_READONLY=9000 +export CLICKHOUSE_USER_READONLY="readonly_user" +export CLICKHOUSE_PASSWORD_READONLY="readonly_password" +export CLICKHOUSE_DATABASE_READONLY="insight_readonly" +``` + +### Configuration File + +You can also set these values in your configuration file: + +```yaml +storage: + main: + clickhouse: + # Main ClickHouse configuration (for orchestration) + host: "localhost" + port: 9000 + username: "default" + password: "password" + database: "insight" + + # Readonly ClickHouse configuration (for API endpoints) + readonlyHost: "readonly-clickhouse.example.com" + readonlyPort: 9000 + readonlyUsername: "readonly_user" + readonlyPassword: "readonly_password" + readonlyDatabase: "insight_readonly" +``` + +## How It Works + +### Automatic Detection + +The system automatically detects if readonly configuration is available: + +1. **With readonly config**: API endpoints use the readonly ClickHouse instance +2. **Without readonly config**: API endpoints fall back to the main ClickHouse instance + +### Affected Endpoints + +The following API endpoints will use the readonly ClickHouse instance when configured: + +- `GET /{chainId}/blocks` - Block queries +- `GET /{chainId}/transactions` - Transaction queries +- `GET /{chainId}/events` - Event/log queries +- `GET /{chainId}/transfers` - Token transfer queries +- `GET /{chainId}/balances/{owner}` - Token balance queries +- `GET /{chainId}/holders/{address}` - Token holder queries +- `GET /{chainId}/search/{input}` - Search queries + +### Unaffected Operations + +The following operations continue to use the main ClickHouse instance: + +- Block indexing and polling +- Transaction processing +- Event/log processing +- Staging data operations +- Orchestration flow (committer, failure recovery, etc.) + +## Implementation Details + +### Readonly Connector + +The `ClickHouseReadonlyConnector` implements the same interface as the main connector but: + +- Only allows read operations +- Panics on write operations (ensuring readonly behavior) +- Uses readonly connection parameters +- Falls back to main config if readonly config is incomplete + +### Storage Factory + +The `NewReadonlyConnector` function creates readonly connectors: + +```go +// For API endpoints (readonly if configured) +storage.NewReadonlyConnector[storage.IMainStorage](&config.Cfg.Storage.Main) + +// For orchestration (always main connector) +storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main) +``` + +## Setup Instructions + +### 1. Create Readonly ClickHouse Instance + +Set up a ClickHouse replica or read-only instance: + +```sql +-- On your readonly ClickHouse instance +CREATE DATABASE insight_readonly; +-- Grant readonly permissions to readonly_user +GRANT SELECT ON insight_readonly.* TO readonly_user; +``` + +### 2. Set Environment Variables + +```bash +export CLICKHOUSE_HOST_READONLY="your-readonly-host" +export CLICKHOUSE_PORT_READONLY=9000 +export CLICKHOUSE_USER_READONLY="readonly_user" +export CLICKHOUSE_PASSWORD_READONLY="readonly_password" +export CLICKHOUSE_DATABASE_READONLY="insight_readonly" +``` + +### 3. Restart API Server + +Restart your API server to pick up the new configuration: + +```bash +./insight api +``` + +### 4. Verify Configuration + +Check the logs to confirm which connector is being used: + +``` +INFO Using readonly ClickHouse connector for API endpoints +``` + +## Monitoring and Troubleshooting + +### Log Messages + +- **"Using readonly ClickHouse connector for API endpoints"** - Readonly mode active +- **"Using regular ClickHouse connector for API endpoints"** - Fallback to main connector + +### Common Issues + +1. **Connection refused**: Check readonly ClickHouse host/port +2. **Authentication failed**: Verify readonly username/password +3. **Database not found**: Ensure readonly database exists +4. **Permission denied**: Grant SELECT permissions to readonly user + +### Testing + +Test the readonly connection: + +```bash +# Test readonly connection +clickhouse-client --host=readonly-host --port=9000 --user=readonly_user --password=readonly_password --database=insight_readonly -q "SELECT 1" +``` + +## Performance Considerations + +### Read Replicas + +- Use ClickHouse read replicas for better performance +- Consider geographic distribution for global users +- Monitor replica lag to ensure data consistency + +### Connection Pooling + +The readonly connector uses the same connection pool settings as the main connector: + +```yaml +maxOpenConns: 100 +maxIdleConns: 10 +``` + +### Query Optimization + +- Readonly instances can be optimized for query performance +- Consider different ClickHouse settings for readonly vs. write instances +- Use materialized views on readonly instances for complex queries + +## Security + +### Network Security + +- Restrict readonly ClickHouse to internal networks +- Use VPN or private subnets for readonly access +- Consider ClickHouse's built-in network security features + +### User Permissions + +- Create dedicated readonly user with minimal permissions +- Grant only SELECT permissions on required tables +- Regularly rotate readonly user passwords + +### Data Access + +- Readonly users cannot modify data +- No risk of accidental data corruption +- Audit logs show readonly access patterns + +## Migration + +### From Single Instance + +1. Set up readonly ClickHouse instance +2. Configure environment variables +3. Restart API server +4. Monitor performance and errors +5. Gradually migrate more endpoints if needed + +### Rollback + +To rollback to main ClickHouse: + +1. Remove readonly environment variables +2. Restart API server +3. API endpoints will automatically use main connector + +## Support + +For issues or questions about the readonly ClickHouse feature: + +1. Check the logs for error messages +2. Verify ClickHouse connectivity +3. Review configuration parameters +4. Check ClickHouse server logs +5. Contact the development team diff --git a/insight b/insight old mode 100755 new mode 100644 index 56100c7..6f432d0 Binary files a/insight and b/insight differ diff --git a/internal/handlers/logs_handlers.go b/internal/handlers/logs_handlers.go index 965aeae..b8720ec 100644 --- a/internal/handlers/logs_handlers.go +++ b/internal/handlers/logs_handlers.go @@ -2,7 +2,6 @@ package handlers import ( "net/http" - "sync" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/gin-gonic/gin" @@ -13,13 +12,6 @@ import ( "github.com/thirdweb-dev/indexer/internal/storage" ) -// package-level variables -var ( - mainStorage storage.IMainStorage - storageOnce sync.Once - storageErr error -) - // @Summary Get all logs // @Description Retrieve all logs across all contracts // @Tags events @@ -221,18 +213,6 @@ func decodeLogsIfNeeded(chainId string, logs []common.Log, eventABI *abi.Event, return nil } -func getMainStorage() (storage.IMainStorage, error) { - storageOnce.Do(func() { - var err error - mainStorage, err = storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main) - if err != nil { - storageErr = err - log.Error().Err(err).Msg("Error creating storage connector") - } - }) - return mainStorage, storageErr -} - func sendJSONResponse(c *gin.Context, response interface{}) { c.JSON(http.StatusOK, response) } diff --git a/internal/handlers/storage_utils.go b/internal/handlers/storage_utils.go new file mode 100644 index 0000000..c891d6d --- /dev/null +++ b/internal/handlers/storage_utils.go @@ -0,0 +1,44 @@ +package handlers + +import ( + "sync" + + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/storage" +) + +// package-level variables for shared storage +var ( + mainStorage storage.IMainStorage + storageOnce sync.Once + storageErr error +) + +// getMainStorage returns a storage connector, using readonly if configured +// This function is shared across all handlers to ensure consistent storage access +func getMainStorage() (storage.IMainStorage, error) { + storageOnce.Do(func() { + var err error + // Use readonly connector for API endpoints if readonly configuration is available + if config.Cfg.Storage.Main.Clickhouse != nil && + (config.Cfg.Storage.Main.Clickhouse.ReadonlyHost != "" || + config.Cfg.Storage.Main.Clickhouse.ReadonlyPort != 0 || + config.Cfg.Storage.Main.Clickhouse.ReadonlyUsername != "" || + config.Cfg.Storage.Main.Clickhouse.ReadonlyPassword != "" || + config.Cfg.Storage.Main.Clickhouse.ReadonlyDatabase != "") { + // Use readonly connector for API endpoints + log.Info().Msg("Using readonly ClickHouse connector for API endpoints") + mainStorage, err = storage.NewReadonlyConnector[storage.IMainStorage](&config.Cfg.Storage.Main) + } else { + // Use regular connector for orchestration flow + log.Info().Msg("Using regular ClickHouse connector for API endpoints") + mainStorage, err = storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main) + } + if err != nil { + storageErr = err + log.Error().Err(err).Msg("Error creating storage connector") + } + }) + return mainStorage, storageErr +} diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 25b1dd7..99e51b9 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -5,7 +5,6 @@ import ( "fmt" "math/big" "sort" - "sync" "sync/atomic" "time" @@ -108,30 +107,6 @@ func (c *Committer) Start(ctx context.Context) { c.cleanupProcessedStagingBlocks() - if config.Cfg.Publisher.Mode == "parallel" { - var wg sync.WaitGroup - publishInterval := interval / 2 - if publishInterval <= 0 { - publishInterval = interval - } - wg.Add(2) - go func() { - defer wg.Done() - c.runPublishLoop(ctx, publishInterval) - }() - // allow the publisher to start before the committer - time.Sleep(publishInterval) - go func() { - defer wg.Done() - c.runCommitLoop(ctx, interval) - }() - <-ctx.Done() - wg.Wait() - log.Info().Msg("Committer shutting down") - c.publisher.Close() - return - } - c.runCommitLoop(ctx, interval) log.Info().Msg("Committer shutting down") c.publisher.Close() @@ -162,9 +137,19 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { log.Debug().Msg("No block data to commit") continue } - if err := c.commit(ctx, blockDataToCommit); err != nil { - log.Error().Err(err).Msg("Error committing blocks") - } + go func() { + highest := blockDataToCommit[len(blockDataToCommit)-1].Block.Number.Uint64() + if err := c.publisher.PublishBlockData(blockDataToCommit); err != nil { + log.Error().Err(err).Msg("Failed to publish block data to kafka") + return + } + c.lastPublishedBlock.Store(highest) + }() + go func() { + if err := c.commit(ctx, blockDataToCommit); err != nil { + log.Error().Err(err).Msg("Error committing blocks") + } + }() } } } @@ -458,18 +443,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - if config.Cfg.Publisher.Mode == "default" { - highest := highestBlock.Number.Uint64() - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - return - } - c.lastPublishedBlock.Store(highest) - c.cleanupProcessedStagingBlocks() - }() - } - c.lastCommittedBlock.Store(highestBlock.Number.Uint64()) go c.cleanupProcessedStagingBlocks() diff --git a/internal/storage/clickhouse_readonly.go b/internal/storage/clickhouse_readonly.go new file mode 100644 index 0000000..51d6c6d --- /dev/null +++ b/internal/storage/clickhouse_readonly.go @@ -0,0 +1,400 @@ +package storage + +import ( + "context" + "crypto/tls" + "fmt" + "math/big" + "strings" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" +) + +// ClickHouseReadonlyConnector is a readonly version of ClickHouseConnector +// that only implements read operations and uses readonly configuration +type ClickHouseReadonlyConnector struct { + conn clickhouse.Conn + cfg *config.ClickhouseConfig +} + +// NewClickHouseReadonlyConnector creates a new readonly ClickHouse connector +func NewClickHouseReadonlyConnector(cfg *config.ClickhouseConfig) (*ClickHouseReadonlyConnector, error) { + conn, err := connectReadonlyDB(cfg) + if err != nil { + return nil, err + } + + return &ClickHouseReadonlyConnector{ + conn: conn, + cfg: cfg, + }, nil +} + +// connectReadonlyDB connects to the readonly ClickHouse instance +func connectReadonlyDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { + // Use readonly configuration if available, fallback to main config + host := cfg.ReadonlyHost + port := cfg.ReadonlyPort + username := cfg.ReadonlyUsername + password := cfg.ReadonlyPassword + database := cfg.ReadonlyDatabase + + // Fallback to main config if readonly config is not set + if host == "" { + host = cfg.Host + } + if port == 0 { + port = cfg.Port + } + if username == "" { + username = cfg.Username + } + if password == "" { + password = cfg.Password + } + if database == "" { + database = cfg.Database + } + + if port == 0 { + return nil, fmt.Errorf("invalid readonly CLICKHOUSE_PORT: %d", port) + } + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{fmt.Sprintf("%s:%d", host, port)}, + Protocol: clickhouse.Native, + TLS: func() *tls.Config { + if cfg.DisableTLS { + return nil + } + return &tls.Config{} + }(), + Auth: clickhouse.Auth{ + Username: username, + Password: password, + }, + MaxOpenConns: cfg.MaxOpenConns, + MaxIdleConns: cfg.MaxIdleConns, + Settings: func() clickhouse.Settings { + settings := clickhouse.Settings{ + "do_not_merge_across_partitions_select_final": "1", + "use_skip_indexes_if_final": "1", + "optimize_move_to_prewhere_if_final": "1", + } + if cfg.EnableParallelViewProcessing { + settings["parallel_view_processing"] = "1" + } + return settings + }(), + }) + + if err != nil { + return nil, fmt.Errorf("failed to connect to readonly ClickHouse: %w", err) + } + + return conn, nil +} + +// executeQueryReadonly is a readonly version of executeQuery +func executeQueryReadonly[T any](c *ClickHouseReadonlyConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) { + query := c.buildQuery(table, columns, qf) + + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return QueryResult[T]{}, err + } + defer rows.Close() + + queryResult := QueryResult[T]{ + Data: []T{}, + } + + for rows.Next() { + item, err := scanFunc(rows) + if err != nil { + return QueryResult[T]{}, err + } + queryResult.Data = append(queryResult.Data, item) + } + + return queryResult, nil +} + +// Readonly operations - only implement the methods needed for API endpoints + +func (c *ClickHouseReadonlyConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error) { + return executeQueryReadonly(c, "blocks", getColumns(fields, defaultBlockFields), qf, scanBlock) +} + +func (c *ClickHouseReadonlyConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error) { + return executeQueryReadonly(c, "transactions", getColumns(fields, defaultTransactionFields), qf, scanTransaction) +} + +func (c *ClickHouseReadonlyConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error) { + return executeQueryReadonly(c, "logs", getColumns(fields, defaultLogFields), qf, scanLog) +} + +func (c *ClickHouseReadonlyConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error) { + return executeQueryReadonly(c, "traces", getColumns(fields, defaultTraceFields), qf, scanTrace) +} + +func (c *ClickHouseReadonlyConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) { + return executeQueryReadonly(c, table, "*", qf, func(rows driver.Rows) (interface{}, error) { + var result []map[string]interface{} + for rows.Next() { + row := make(map[string]interface{}) + columns := rows.Columns() + values := make([]interface{}, len(columns)) + for i := range values { + values[i] = new(interface{}) + } + if err := rows.Scan(values...); err != nil { + return nil, err + } + for i, col := range columns { + row[col] = values[i] + } + result = append(result, row) + } + return result, nil + }) +} + +func (c *ClickHouseReadonlyConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) { + // Convert TransfersQueryFilter to QueryFilter for compatibility + queryFilter := QueryFilter{ + ChainId: qf.ChainId, + StartBlock: qf.StartBlockNumber, + EndBlock: qf.EndBlockNumber, + WalletAddress: qf.WalletAddress, + ContractAddress: qf.TokenAddress, + Page: qf.Page, + Limit: qf.Limit, + Offset: qf.Offset, + SortBy: qf.SortBy, + SortOrder: qf.SortOrder, + GroupBy: qf.GroupBy, + } + + return executeQueryReadonly(c, "token_transfers", "*", queryFilter, func(rows driver.Rows) (common.TokenTransfer, error) { + // This is a placeholder - you'll need to implement the actual scanning logic + // based on your TokenTransfer structure + return common.TokenTransfer{}, nil + }) +} + +func (c *ClickHouseReadonlyConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) { + // Convert BalancesQueryFilter to QueryFilter for compatibility + queryFilter := QueryFilter{ + ChainId: qf.ChainId, + WalletAddress: qf.Owner, + ContractAddress: qf.TokenAddress, + Page: qf.Page, + Limit: qf.Limit, + Offset: qf.Offset, + SortBy: qf.SortBy, + SortOrder: qf.SortOrder, + GroupBy: qf.GroupBy, + } + + return executeQueryReadonly(c, "token_balances", "*", queryFilter, func(rows driver.Rows) (common.TokenBalance, error) { + // This is a placeholder - you'll need to implement the actual scanning logic + // based on your TokenBalance structure + return common.TokenBalance{}, nil + }) +} + +// Helper function to get columns +func getColumns(fields []string, defaultFields []string) string { + if len(fields) == 0 { + return strings.Join(defaultFields, ", ") + } + return strings.Join(fields, ", ") +} + +// Stub implementations for methods that should not be called on readonly connector +// These will panic if called, ensuring readonly behavior + +func (c *ClickHouseReadonlyConnector) InsertBlockData(data []common.BlockData) error { + panic("InsertBlockData called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) { + panic("ReplaceBlockData called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) InsertStagingData(data []common.BlockData) error { + panic("InsertStagingData called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) DeleteStagingData(data []common.BlockData) error { + panic("DeleteStagingData called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) StoreBlockFailures(failures []common.BlockFailure) error { + panic("StoreBlockFailures called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) DeleteBlockFailures(failures []common.BlockFailure) error { + panic("DeleteBlockFailures called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + panic("SetLastPublishedBlockNumber called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + panic("SetLastReorgCheckedBlockNumber called on readonly connector") +} + +func (c *ClickHouseReadonlyConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + panic("DeleteOlderThan called on readonly connector") +} + +// Additional methods that might be needed for the interface +func (c *ClickHouseReadonlyConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { + // This is a read operation, so it's safe to implement + query := fmt.Sprintf("SELECT MAX(block_number) FROM blocks WHERE chain_id = %d", chainId.Uint64()) + var result *big.Int + err = c.conn.QueryRow(context.Background(), query).Scan(&result) + return result, err +} + +func (c *ClickHouseReadonlyConnector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (maxBlockNumber *big.Int, err error) { + query := fmt.Sprintf("SELECT MAX(block_number) FROM blocks WHERE chain_id = %d AND block_number BETWEEN %d AND %d", + chainId.Uint64(), startBlock.Uint64(), endBlock.Uint64()) + var result *big.Int + err = c.conn.QueryRow(context.Background(), query).Scan(&result) + return result, err +} + +func (c *ClickHouseReadonlyConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) { + query := fmt.Sprintf("SELECT MAX(block_number) FROM staging WHERE chain_id = %d AND block_number BETWEEN %d AND %d", + chainId.Uint64(), rangeStart.Uint64(), rangeEnd.Uint64()) + var result *big.Int + err = c.conn.QueryRow(context.Background(), query).Scan(&result) + return result, err +} + +func (c *ClickHouseReadonlyConnector) GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { + query := fmt.Sprintf("SELECT MAX(block_number) FROM cursors WHERE chain_id = %d", chainId.Uint64()) + var result *big.Int + err = c.conn.QueryRow(context.Background(), query).Scan(&result) + return result, err +} + +func (c *ClickHouseReadonlyConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { + query := fmt.Sprintf("SELECT MAX(block_number) FROM cursors WHERE chain_id = %d", chainId.Uint64()) + var result *big.Int + err = c.conn.QueryRow(context.Background(), query).Scan(&result) + return result, err +} + +func (c *ClickHouseReadonlyConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) { + query := c.buildQuery("block_failures", "*", qf) + + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + var failures []common.BlockFailure + for rows.Next() { + failure, err := scanBlockFailure(rows) + if err != nil { + return nil, err + } + failures = append(failures, failure) + } + + return failures, nil +} + +func (c *ClickHouseReadonlyConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { + // This is a read operation, so it's safe to implement + // You'll need to implement the actual logic based on your BlockData structure + return nil, fmt.Errorf("GetStagingData not implemented in readonly connector") +} + +func (c *ClickHouseReadonlyConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error) { + // This is a read operation, so it's safe to implement + // You'll need to implement the actual logic based on your BlockHeader structure + return nil, fmt.Errorf("GetBlockHeadersDescending not implemented in readonly connector") +} + +func (c *ClickHouseReadonlyConnector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blocks []common.BlockData, err error) { + // This is a read operation, so it's safe to implement + // You'll need to implement the actual logic based on your BlockData structure + return nil, fmt.Errorf("GetValidationBlockData not implemented in readonly connector") +} + +func (c *ClickHouseReadonlyConnector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockNumbers []*big.Int, err error) { + // This is a read operation, so it's safe to implement + // You'll need to implement the actual logic based on your requirements + return nil, fmt.Errorf("FindMissingBlockNumbers not implemented in readonly connector") +} + +func (c *ClickHouseReadonlyConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error) { + // This is a read operation, so it's safe to implement + // You'll need to implement the actual logic based on your BlockData structure + return nil, fmt.Errorf("GetFullBlockData not implemented in readonly connector") +} + +func (c *ClickHouseReadonlyConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string { + return c.buildQuery(table, columns, qf) +} + +// Reuse the existing helper functions from the main ClickHouse connector +func (c *ClickHouseReadonlyConnector) buildQuery(table, columns string, qf QueryFilter) string { + // This is a simplified version - you might want to copy the full implementation + query := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, table) + + // Add WHERE clauses + whereClauses := c.buildWhereClauses(table, qf) + if len(whereClauses) > 0 { + query += " WHERE " + strings.Join(whereClauses, " AND ") + } + + // Add ORDER BY + if qf.SortBy != "" { + query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) + } + + // Add LIMIT and OFFSET + if qf.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", qf.Limit) + if qf.Offset > 0 { + query += fmt.Sprintf(" OFFSET %d", qf.Offset) + } + } + + return query +} + +func (c *ClickHouseReadonlyConnector) buildWhereClauses(table string, qf QueryFilter) []string { + // This is a simplified version - you might want to copy the full implementation + var clauses []string + + if qf.ChainId != nil { + clauses = append(clauses, fmt.Sprintf("%s.chain_id = %d", table, qf.ChainId.Uint64())) + } + + if qf.ContractAddress != "" { + clauses = append(clauses, fmt.Sprintf("%s.to_address = '%s'", table, qf.ContractAddress)) + } + + if qf.WalletAddress != "" { + clauses = append(clauses, fmt.Sprintf("(%s.from_address = '%s' OR %s.to_address = '%s')", + table, qf.WalletAddress, table, qf.WalletAddress)) + } + + if qf.Signature != "" { + clauses = append(clauses, fmt.Sprintf("%s.function_selector = '%s'", table, qf.Signature)) + } + + return clauses +} diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 1253213..cf84736 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -167,3 +167,30 @@ func NewConnector[T any](cfg *config.StorageConnectionConfig) (T, error) { return typedConn, nil } + +// NewReadonlyConnector creates a readonly connector for API endpoints +// This is separate from the main orchestration flow to ensure readonly access +func NewReadonlyConnector[T any](cfg *config.StorageConnectionConfig) (T, error) { + var conn interface{} + var err error + if cfg.Postgres != nil { + // For now, use the same Postgres connector for readonly + // You can implement a readonly Postgres connector if needed + conn, err = NewPostgresConnector(cfg.Postgres) + } else if cfg.Clickhouse != nil { + conn, err = NewClickHouseReadonlyConnector(cfg.Clickhouse) + } else { + return *new(T), fmt.Errorf("no storage driver configured") + } + + if err != nil { + return *new(T), err + } + + typedConn, ok := conn.(T) + if !ok { + return *new(T), fmt.Errorf("connector does not implement the required interface") + } + + return typedConn, nil +}