Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added indexer
Binary file not shown.
232 changes: 146 additions & 86 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,69 +467,11 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter, fields ...string) (Query
}

func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) {
tableName := c.getTableName(qf.ChainId, table)
// Build the SELECT clause with aggregates
selectColumns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ")
query := fmt.Sprintf("SELECT %s FROM %s.%s", selectColumns, c.cfg.Database, tableName)
if qf.ForceConsistentData {
query += " FINAL"
}

whereClauses := []string{}
// Apply filters
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
whereClauses = append(whereClauses, createFilterClause("chain_id", qf.ChainId.String()))
}
blockNumbersClause := createBlockNumbersClause(qf.BlockNumbers)
if blockNumbersClause != "" {
whereClauses = append(whereClauses, blockNumbersClause)
}
contractAddressClause := createContractAddressClause(table, qf.ContractAddress)
if contractAddressClause != "" {
whereClauses = append(whereClauses, contractAddressClause)
}
walletAddressClause := createWalletAddressClause(table, qf.WalletAddress)
if walletAddressClause != "" {
whereClauses = append(whereClauses, walletAddressClause)
}
fromAddressClause := createFromAddressClause(table, qf.FromAddress)
if fromAddressClause != "" {
whereClauses = append(whereClauses, fromAddressClause)
}
signatureClause := createSignatureClause(table, qf.Signature)
if signatureClause != "" {
whereClauses = append(whereClauses, signatureClause)
}
for key, value := range qf.FilterParams {
whereClauses = append(whereClauses, createFilterClause(key, strings.ToLower(value)))
}

// Add WHERE clause to query if there are any conditions
if len(whereClauses) > 0 {
query += " WHERE " + strings.Join(whereClauses, " AND ")
}

if len(qf.GroupBy) > 0 {
groupByColumns := strings.Join(qf.GroupBy, ", ")
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
}

// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}

// Add limit clause
if qf.Page >= 0 && qf.Limit > 0 {
offset := qf.Page * qf.Limit
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
} else if qf.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
}

if c.cfg.MaxQueryTime > 0 {
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
}
// Use the new query building logic
query := c.buildQuery(table, selectColumns, qf)

if err := common.ValidateQuery(query); err != nil {
return QueryResult[interface{}]{}, err
Expand Down Expand Up @@ -611,64 +553,177 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
}

func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string {
var query string

// Check if we need to handle wallet address with UNION for transactions
if table == "transactions" && qf.WalletAddress != "" {
query = c.buildUnionQuery(table, columns, qf)
} else {
query = c.buildStandardQuery(table, columns, qf)
}

// Apply post-query clauses to ALL queries
query = c.addPostQueryClauses(query, qf)

return query
}

func (c *ClickHouseConnector) buildStandardQuery(table, columns string, qf QueryFilter) string {
tableName := c.getTableName(qf.ChainId, table)
query := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName)
if qf.ForceConsistentData {
query += " FINAL"
}

whereClauses := c.buildWhereClauses(table, qf)

// Add WHERE clause to query if there are any conditions
if len(whereClauses) > 0 {
query += " WHERE " + strings.Join(whereClauses, " AND ")
}

return query
}

func (c *ClickHouseConnector) buildUnionQuery(table, columns string, qf QueryFilter) string {
tableName := c.getTableName(qf.ChainId, table)

// Build base where clauses (excluding wallet address)
baseWhereClauses := c.buildWhereClauses(table, qf)

// Create two separate queries for from_address and to_address
fromQuery := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName)
if qf.ForceConsistentData {
fromQuery += " FINAL"
}

toQuery := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName)
if qf.ForceConsistentData {
toQuery += " FINAL"
}

// Add base where clauses to both queries
if len(baseWhereClauses) > 0 {
baseWhereClause := strings.Join(baseWhereClauses, " AND ")
fromQuery += " WHERE " + baseWhereClause + " AND from_address = '" + strings.ToLower(qf.WalletAddress) + "'"
toQuery += " WHERE " + baseWhereClause + " AND to_address = '" + strings.ToLower(qf.WalletAddress) + "'"
} else {
fromQuery += " WHERE from_address = '" + strings.ToLower(qf.WalletAddress) + "'"
toQuery += " WHERE to_address = '" + strings.ToLower(qf.WalletAddress) + "'"
}

// Apply ORDER BY to both queries for consistent results
if qf.SortBy != "" {
fromQuery += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
toQuery += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}

// Apply LIMIT to each individual query to avoid loading too much data
// We use a higher limit to ensure we get enough results after UNION
individualLimit := qf.Limit * 2 // Double the limit to account for potential duplicates
if qf.Page >= 0 && qf.Limit > 0 {
offset := qf.Page * qf.Limit
fromQuery += fmt.Sprintf(" LIMIT %d OFFSET %d", individualLimit, offset)
toQuery += fmt.Sprintf(" LIMIT %d OFFSET %d", individualLimit, offset)
} else if qf.Limit > 0 {
fromQuery += fmt.Sprintf(" LIMIT %d", individualLimit)
toQuery += fmt.Sprintf(" LIMIT %d", individualLimit)
}

// Combine with UNION
unionQuery := fmt.Sprintf("(%s) UNION ALL (%s)", fromQuery, toQuery)

return unionQuery
}

func (c *ClickHouseConnector) addPostQueryClauses(query string, qf QueryFilter) string {
// Add GROUP BY clause if needed (for aggregations)
if len(qf.GroupBy) > 0 {
groupByClause := fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", "))
// For UNION queries, we need to wrap the entire query in a subquery to apply GROUP BY
if strings.Contains(query, "UNION ALL") {
query = fmt.Sprintf("SELECT * FROM (%s) %s", query, groupByClause)
} else {
// For standard queries, just append GROUP BY
query += groupByClause
}
}

// For UNION queries, ORDER BY and LIMIT are already applied to individual queries
// For standard queries, apply ORDER BY and LIMIT
if !strings.Contains(query, "UNION ALL") {
// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}

// Add limit clause
if qf.Page >= 0 && qf.Limit > 0 {
offset := qf.Page * qf.Limit
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
} else if qf.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
}
} else {
// For UNION queries, we need to apply final LIMIT after the UNION
// This ensures we get exactly the requested number of results
if qf.Page >= 0 && qf.Limit > 0 {
offset := qf.Page * qf.Limit
query = fmt.Sprintf("SELECT * FROM (%s) LIMIT %d OFFSET %d", query, qf.Limit, offset)
} else if qf.Limit > 0 {
query = fmt.Sprintf("SELECT * FROM (%s) LIMIT %d", query, qf.Limit)
}
}

// Add settings at the very end
if c.cfg.MaxQueryTime > 0 {
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
}

return query
}

func (c *ClickHouseConnector) buildWhereClauses(table string, qf QueryFilter) []string {
whereClauses := []string{}

if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
whereClauses = append(whereClauses, createFilterClause("chain_id", qf.ChainId.String()))
}

blockNumbersClause := createBlockNumbersClause(qf.BlockNumbers)
if blockNumbersClause != "" {
whereClauses = append(whereClauses, blockNumbersClause)
}

contractAddressClause := createContractAddressClause(table, qf.ContractAddress)
if contractAddressClause != "" {
whereClauses = append(whereClauses, contractAddressClause)
}
walletAddressClause := createWalletAddressClause(table, qf.WalletAddress)
if walletAddressClause != "" {
whereClauses = append(whereClauses, walletAddressClause)

// Skip wallet address clause for UNION queries as it's handled separately
if table != "transactions" && qf.WalletAddress != "" {
walletAddressClause := createWalletAddressClause(table, qf.WalletAddress)
if walletAddressClause != "" {
whereClauses = append(whereClauses, walletAddressClause)
}
}

fromAddressClause := createFromAddressClause(table, qf.FromAddress)
if fromAddressClause != "" {
whereClauses = append(whereClauses, fromAddressClause)
}

signatureClause := createSignatureClause(table, qf.Signature)
if signatureClause != "" {
whereClauses = append(whereClauses, signatureClause)
}

// Add filter params
for key, value := range qf.FilterParams {
whereClauses = append(whereClauses, createFilterClause(key, strings.ToLower(value)))
}

// Add WHERE clause to query if there are any conditions
if len(whereClauses) > 0 {
query += " WHERE " + strings.Join(whereClauses, " AND ")
}

// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}

// Add limit clause
if qf.Page >= 0 && qf.Limit > 0 {
offset := qf.Page * qf.Limit
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
} else if qf.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
}

if c.cfg.MaxQueryTime > 0 {
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
}

return query
return whereClauses
}

func createFilterClause(key, value string) string {
Expand Down Expand Up @@ -2064,3 +2119,8 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []

return blockData, nil
}

// Helper function to test query generation
func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string {
return c.buildQuery(table, columns, qf)
}
Loading