diff --git a/indexer b/indexer new file mode 100755 index 0000000..c27725c Binary files /dev/null and b/indexer differ diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index c3f4fbb..d914ced 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -467,69 +467,11 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter, fields ...string) (Query } func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) { - tableName := c.getTableName(qf.ChainId, table) // Build the SELECT clause with aggregates selectColumns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ") - query := fmt.Sprintf("SELECT %s FROM %s.%s", selectColumns, c.cfg.Database, tableName) - if qf.ForceConsistentData { - query += " FINAL" - } - - whereClauses := []string{} - // Apply filters - if qf.ChainId != nil && qf.ChainId.Sign() > 0 { - whereClauses = append(whereClauses, createFilterClause("chain_id", qf.ChainId.String())) - } - blockNumbersClause := createBlockNumbersClause(qf.BlockNumbers) - if blockNumbersClause != "" { - whereClauses = append(whereClauses, blockNumbersClause) - } - contractAddressClause := createContractAddressClause(table, qf.ContractAddress) - if contractAddressClause != "" { - whereClauses = append(whereClauses, contractAddressClause) - } - walletAddressClause := createWalletAddressClause(table, qf.WalletAddress) - if walletAddressClause != "" { - whereClauses = append(whereClauses, walletAddressClause) - } - fromAddressClause := createFromAddressClause(table, qf.FromAddress) - if fromAddressClause != "" { - whereClauses = append(whereClauses, fromAddressClause) - } - signatureClause := createSignatureClause(table, qf.Signature) - if signatureClause != "" { - whereClauses = append(whereClauses, signatureClause) - } - for key, value := range qf.FilterParams { - whereClauses = append(whereClauses, createFilterClause(key, strings.ToLower(value))) - } - // Add WHERE clause to query if there are any conditions - if len(whereClauses) > 0 { - query += " WHERE " + strings.Join(whereClauses, " AND ") - } - - if len(qf.GroupBy) > 0 { - groupByColumns := strings.Join(qf.GroupBy, ", ") - query += fmt.Sprintf(" GROUP BY %s", groupByColumns) - } - - // Add ORDER BY clause - if qf.SortBy != "" { - query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) - } - - // Add limit clause - if qf.Page >= 0 && qf.Limit > 0 { - offset := qf.Page * qf.Limit - query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset) - } else if qf.Limit > 0 { - query += fmt.Sprintf(" LIMIT %d", qf.Limit) - } - - if c.cfg.MaxQueryTime > 0 { - query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime) - } + // Use the new query building logic + query := c.buildQuery(table, selectColumns, qf) if err := common.ValidateQuery(query); err != nil { return QueryResult[interface{}]{}, err @@ -611,64 +553,177 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query } func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string { + var query string + + // Check if we need to handle wallet address with UNION for transactions + if table == "transactions" && qf.WalletAddress != "" { + query = c.buildUnionQuery(table, columns, qf) + } else { + query = c.buildStandardQuery(table, columns, qf) + } + + // Apply post-query clauses to ALL queries + query = c.addPostQueryClauses(query, qf) + + return query +} + +func (c *ClickHouseConnector) buildStandardQuery(table, columns string, qf QueryFilter) string { tableName := c.getTableName(qf.ChainId, table) query := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName) if qf.ForceConsistentData { query += " FINAL" } + whereClauses := c.buildWhereClauses(table, qf) + + // Add WHERE clause to query if there are any conditions + if len(whereClauses) > 0 { + query += " WHERE " + strings.Join(whereClauses, " AND ") + } + + return query +} + +func (c *ClickHouseConnector) buildUnionQuery(table, columns string, qf QueryFilter) string { + tableName := c.getTableName(qf.ChainId, table) + + // Build base where clauses (excluding wallet address) + baseWhereClauses := c.buildWhereClauses(table, qf) + + // Create two separate queries for from_address and to_address + fromQuery := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName) + if qf.ForceConsistentData { + fromQuery += " FINAL" + } + + toQuery := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName) + if qf.ForceConsistentData { + toQuery += " FINAL" + } + + // Add base where clauses to both queries + if len(baseWhereClauses) > 0 { + baseWhereClause := strings.Join(baseWhereClauses, " AND ") + fromQuery += " WHERE " + baseWhereClause + " AND from_address = '" + strings.ToLower(qf.WalletAddress) + "'" + toQuery += " WHERE " + baseWhereClause + " AND to_address = '" + strings.ToLower(qf.WalletAddress) + "'" + } else { + fromQuery += " WHERE from_address = '" + strings.ToLower(qf.WalletAddress) + "'" + toQuery += " WHERE to_address = '" + strings.ToLower(qf.WalletAddress) + "'" + } + + // Apply ORDER BY to both queries for consistent results + if qf.SortBy != "" { + fromQuery += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) + toQuery += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) + } + + // Apply LIMIT to each individual query to avoid loading too much data + // We use a higher limit to ensure we get enough results after UNION + individualLimit := qf.Limit * 2 // Double the limit to account for potential duplicates + if qf.Page >= 0 && qf.Limit > 0 { + offset := qf.Page * qf.Limit + fromQuery += fmt.Sprintf(" LIMIT %d OFFSET %d", individualLimit, offset) + toQuery += fmt.Sprintf(" LIMIT %d OFFSET %d", individualLimit, offset) + } else if qf.Limit > 0 { + fromQuery += fmt.Sprintf(" LIMIT %d", individualLimit) + toQuery += fmt.Sprintf(" LIMIT %d", individualLimit) + } + + // Combine with UNION + unionQuery := fmt.Sprintf("(%s) UNION ALL (%s)", fromQuery, toQuery) + + return unionQuery +} + +func (c *ClickHouseConnector) addPostQueryClauses(query string, qf QueryFilter) string { + // Add GROUP BY clause if needed (for aggregations) + if len(qf.GroupBy) > 0 { + groupByClause := fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", ")) + // For UNION queries, we need to wrap the entire query in a subquery to apply GROUP BY + if strings.Contains(query, "UNION ALL") { + query = fmt.Sprintf("SELECT * FROM (%s) %s", query, groupByClause) + } else { + // For standard queries, just append GROUP BY + query += groupByClause + } + } + + // For UNION queries, ORDER BY and LIMIT are already applied to individual queries + // For standard queries, apply ORDER BY and LIMIT + if !strings.Contains(query, "UNION ALL") { + // Add ORDER BY clause + if qf.SortBy != "" { + query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) + } + + // Add limit clause + if qf.Page >= 0 && qf.Limit > 0 { + offset := qf.Page * qf.Limit + query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset) + } else if qf.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", qf.Limit) + } + } else { + // For UNION queries, we need to apply final LIMIT after the UNION + // This ensures we get exactly the requested number of results + if qf.Page >= 0 && qf.Limit > 0 { + offset := qf.Page * qf.Limit + query = fmt.Sprintf("SELECT * FROM (%s) LIMIT %d OFFSET %d", query, qf.Limit, offset) + } else if qf.Limit > 0 { + query = fmt.Sprintf("SELECT * FROM (%s) LIMIT %d", query, qf.Limit) + } + } + + // Add settings at the very end + if c.cfg.MaxQueryTime > 0 { + query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime) + } + + return query +} + +func (c *ClickHouseConnector) buildWhereClauses(table string, qf QueryFilter) []string { whereClauses := []string{} + if qf.ChainId != nil && qf.ChainId.Sign() > 0 { whereClauses = append(whereClauses, createFilterClause("chain_id", qf.ChainId.String())) } + blockNumbersClause := createBlockNumbersClause(qf.BlockNumbers) if blockNumbersClause != "" { whereClauses = append(whereClauses, blockNumbersClause) } + contractAddressClause := createContractAddressClause(table, qf.ContractAddress) if contractAddressClause != "" { whereClauses = append(whereClauses, contractAddressClause) } - walletAddressClause := createWalletAddressClause(table, qf.WalletAddress) - if walletAddressClause != "" { - whereClauses = append(whereClauses, walletAddressClause) + + // Skip wallet address clause for UNION queries as it's handled separately + if table != "transactions" && qf.WalletAddress != "" { + walletAddressClause := createWalletAddressClause(table, qf.WalletAddress) + if walletAddressClause != "" { + whereClauses = append(whereClauses, walletAddressClause) + } } + fromAddressClause := createFromAddressClause(table, qf.FromAddress) if fromAddressClause != "" { whereClauses = append(whereClauses, fromAddressClause) } + signatureClause := createSignatureClause(table, qf.Signature) if signatureClause != "" { whereClauses = append(whereClauses, signatureClause) } + // Add filter params for key, value := range qf.FilterParams { whereClauses = append(whereClauses, createFilterClause(key, strings.ToLower(value))) } - // Add WHERE clause to query if there are any conditions - if len(whereClauses) > 0 { - query += " WHERE " + strings.Join(whereClauses, " AND ") - } - - // Add ORDER BY clause - if qf.SortBy != "" { - query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) - } - - // Add limit clause - if qf.Page >= 0 && qf.Limit > 0 { - offset := qf.Page * qf.Limit - query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset) - } else if qf.Limit > 0 { - query += fmt.Sprintf(" LIMIT %d", qf.Limit) - } - - if c.cfg.MaxQueryTime > 0 { - query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime) - } - - return query + return whereClauses } func createFilterClause(key, value string) string { @@ -2064,3 +2119,8 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] return blockData, nil } + +// Helper function to test query generation +func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string { + return c.buildQuery(table, columns, qf) +} diff --git a/internal/storage/clickhouse_connector_test.go b/internal/storage/clickhouse_connector_test.go index cad416a..fc17ced 100644 --- a/internal/storage/clickhouse_connector_test.go +++ b/internal/storage/clickhouse_connector_test.go @@ -3,8 +3,11 @@ package storage import ( "math/big" "reflect" + "strings" "testing" "time" + + config "github.com/thirdweb-dev/indexer/configs" ) // TestMapClickHouseTypeToGoType tests the mapClickHouseTypeToGoType function @@ -132,3 +135,143 @@ func TestMapClickHouseTypeToGoType(t *testing.T) { }) } } + +// TestUnionQueryLogic tests the UNION query logic for wallet addresses in transactions +func TestUnionQueryLogic(t *testing.T) { + // Create a mock config with valid connection details + cfg := &config.ClickhouseConfig{ + Database: "default", + Host: "localhost", + Port: 9000, + Username: "default", + Password: "", + MaxQueryTime: 30, + } + + // Create connector + connector, err := NewClickHouseConnector(cfg) + if err != nil { + // Skip test if we can't connect to ClickHouse (likely in CI environment) + t.Skipf("Skipping test - cannot connect to ClickHouse: %v", err) + } + + // Test case 1: Standard query without wallet address (should not use UNION) + t.Run("Standard query without wallet address", func(t *testing.T) { + qf := QueryFilter{ + ChainId: big.NewInt(8453), + Limit: 5, + SortBy: "block_number", + SortOrder: "DESC", + } + + query := connector.TestQueryGeneration("transactions", "*", qf) + + // Should not contain UNION ALL + if strings.Contains(query, "UNION ALL") { + t.Errorf("Standard query should not contain UNION ALL: %s", query) + } + + // Should contain standard WHERE clause + if !strings.Contains(query, "WHERE") { + t.Errorf("Query should contain WHERE clause: %s", query) + } + }) + + // Test case 2: UNION query with wallet address + t.Run("UNION query with wallet address", func(t *testing.T) { + qf := QueryFilter{ + ChainId: big.NewInt(8453), + WalletAddress: "0x0b230949b38fa651aefffcfa5e664554df8ae900", + Limit: 5, + SortBy: "block_number", + SortOrder: "DESC", + } + + query := connector.TestQueryGeneration("transactions", "*", qf) + + // Should contain UNION ALL + if !strings.Contains(query, "UNION ALL") { + t.Errorf("Query should contain UNION ALL: %s", query) + } + + // Should contain from_address and to_address conditions + if !strings.Contains(query, "from_address = '0x0b230949b38fa651aefffcfa5e664554df8ae900'") { + t.Errorf("Query should contain from_address condition: %s", query) + } + + if !strings.Contains(query, "to_address = '0x0b230949b38fa651aefffcfa5e664554df8ae900'") { + t.Errorf("Query should contain to_address condition: %s", query) + } + + // Should have proper ORDER BY and LIMIT at the end + if !strings.Contains(query, "ORDER BY block_number DESC") { + t.Errorf("Query should contain ORDER BY clause: %s", query) + } + + if !strings.Contains(query, "LIMIT 5") { + t.Errorf("Query should contain LIMIT clause: %s", query) + } + + // Should have SETTINGS at the very end + if !strings.Contains(query, "SETTINGS max_execution_time = 30") { + t.Errorf("Query should contain SETTINGS clause: %s", query) + } + }) + + // Test case 3: Standard query for logs table (should not use UNION) + t.Run("Standard query for logs table", func(t *testing.T) { + qf := QueryFilter{ + ChainId: big.NewInt(8453), + WalletAddress: "0x0b230949b38fa651aefffcfa5e664554df8ae900", + Limit: 5, + SortBy: "block_number", + SortOrder: "DESC", + } + + query := connector.TestQueryGeneration("logs", "*", qf) + + // Should not contain UNION ALL (logs table doesn't use UNION) + if strings.Contains(query, "UNION ALL") { + t.Errorf("Logs query should not contain UNION ALL: %s", query) + } + + // Logs table doesn't have wallet address clauses since it doesn't have from_address/to_address fields + // So it should just have the basic WHERE clause without wallet address + if !strings.Contains(query, "WHERE") { + t.Errorf("Logs query should contain WHERE clause: %s", query) + } + + // Should not contain wallet address conditions since logs don't have those fields + if strings.Contains(query, "from_address") || strings.Contains(query, "to_address") { + t.Errorf("Logs query should not contain address conditions: %s", query) + } + }) + + // Test case 4: UNION query with GROUP BY + t.Run("UNION query with GROUP BY", func(t *testing.T) { + qf := QueryFilter{ + ChainId: big.NewInt(8453), + WalletAddress: "0x0b230949b38fa651aefffcfa5e664554df8ae900", + GroupBy: []string{"block_number"}, + Limit: 5, + SortBy: "block_number", + SortOrder: "DESC", + } + + query := connector.TestQueryGeneration("transactions", "block_number, COUNT(*) as count", qf) + + // Should contain UNION ALL + if !strings.Contains(query, "UNION ALL") { + t.Errorf("Query should contain UNION ALL: %s", query) + } + + // Should contain GROUP BY wrapped in subquery + if !strings.Contains(query, "SELECT * FROM (") { + t.Errorf("Query should wrap UNION in subquery for GROUP BY: %s", query) + } + + if !strings.Contains(query, "GROUP BY block_number") { + t.Errorf("Query should contain GROUP BY clause: %s", query) + } + }) +}