diff --git a/CLAUDE.md b/CLAUDE.md index 5bfb0b3..1a54a25 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -47,6 +47,8 @@ go test -v ./... go test ./pkg/processor/... ``` +Note: The codebase currently has no test files (*_test.go). Testing relies on Docker-based integration testing and manual configuration testing. + ## Architecture ### Core Components @@ -109,8 +111,18 @@ Source -> Processor(s) -> Consumer(s) - Base configurations in `config/base/` - Secret configurations (credentials) use `.secret.yaml` extension - Template configurations available for common use cases +- Environment variables can override config values (e.g., AWS_ACCESS_KEY_ID, GOOGLE_APPLICATION_CREDENTIALS) ### Docker Development - Development workflow uses `dockerfile.dev` with debugging tools - Production builds use multi-stage `dockerfile` with minimal runtime -- `run-local.sh` script simplifies local Docker development \ No newline at end of file +- `run-local.sh` script simplifies local Docker development +- Go version: 1.23 (production), 1.22.5 (development) +- Required system libraries: libzmq3-dev, libczmq-dev, libsodium-dev + +### Common Pipeline Types +- **Ledger processing**: Process all ledger data from blockchain +- **Payment filtering**: Extract specific payment operations +- **Account monitoring**: Track account changes and transactions +- **Contract events**: Monitor Soroban smart contract events +- **Market data**: Process DEX trades and market metrics \ No newline at end of file diff --git a/consumer/consumer_save_contract_invocations_to_postgresql.go b/consumer/consumer_save_contract_invocations_to_postgresql.go index d616bc0..3da446e 100644 --- a/consumer/consumer_save_contract_invocations_to_postgresql.go +++ b/consumer/consumer_save_contract_invocations_to_postgresql.go @@ -77,6 +77,8 @@ func initializeContractInvocationsSchema(db *sql.DB) error { contract_id TEXT NOT NULL, invoking_account TEXT NOT NULL, function_name TEXT, + arguments JSONB, + arguments_decoded JSONB, successful BOOLEAN NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), @@ -92,6 +94,34 @@ func initializeContractInvocationsSchema(db *sql.DB) error { CREATE INDEX IF NOT EXISTS idx_contract_invocations_ledger_sequence ON contract_invocations(ledger_sequence); CREATE INDEX IF NOT EXISTS idx_contract_invocations_transaction_hash ON contract_invocations(transaction_hash); CREATE INDEX IF NOT EXISTS idx_contract_invocations_function_name ON contract_invocations(function_name); + CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_gin ON contract_invocations USING gin(arguments); + CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_decoded_gin ON contract_invocations USING gin(arguments_decoded); + `) + + if err != nil { + return fmt.Errorf("failed to create contract_invocations table: %w", err) + } + + // Add migration for existing columns if they don't exist + _, err = db.Exec(` + DO $$ + BEGIN + -- Add arguments column if it doesn't exist + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'contract_invocations' AND column_name = 'arguments') THEN + ALTER TABLE contract_invocations ADD COLUMN arguments JSONB; + END IF; + + -- Add arguments_decoded column if it doesn't exist + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'contract_invocations' AND column_name = 'arguments_decoded') THEN + ALTER TABLE contract_invocations ADD COLUMN arguments_decoded JSONB; + END IF; + END $$; + + -- Create GIN indexes for JSONB columns if they don't exist + CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_gin ON contract_invocations USING gin(arguments); + CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_decoded_gin ON contract_invocations USING gin(arguments_decoded); `) if err != nil { @@ -227,20 +257,45 @@ func (p *SaveContractInvocationsToPostgreSQL) Process(ctx context.Context, msg p } }() + // Prepare arguments for database insertion + var argumentsJSON, argumentsDecodedJSON interface{} + + // Marshal arguments if present + if len(invocation.Arguments) > 0 { + argsBytes, err := json.Marshal(invocation.Arguments) + if err != nil { + log.Printf("Warning: failed to marshal arguments for invocation %s: %v", invocation.TransactionHash, err) + } else { + argumentsJSON = argsBytes + } + } + + // Marshal decoded arguments if present + if len(invocation.ArgumentsDecoded) > 0 { + argsDecodedBytes, err := json.Marshal(invocation.ArgumentsDecoded) + if err != nil { + log.Printf("Warning: failed to marshal decoded arguments for invocation %s: %v", invocation.TransactionHash, err) + } else { + argumentsDecodedJSON = argsDecodedBytes + } + } + // Insert contract invocation var invocationID int64 err = tx.QueryRowContext( ctx, `INSERT INTO contract_invocations ( timestamp, ledger_sequence, transaction_hash, contract_id, - invoking_account, function_name, successful - ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`, + invoking_account, function_name, arguments, arguments_decoded, successful + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id`, invocation.Timestamp, invocation.LedgerSequence, invocation.TransactionHash, invocation.ContractID, invocation.InvokingAccount, invocation.FunctionName, + argumentsJSON, + argumentsDecodedJSON, invocation.Successful, ).Scan(&invocationID) @@ -365,8 +420,17 @@ func (p *SaveContractInvocationsToPostgreSQL) Process(ctx context.Context, msg p return fmt.Errorf("failed to commit transaction: %w", err) } - log.Printf("Saved contract invocation: %s (contract: %s, function: %s)", - invocation.TransactionHash, invocation.ContractID, invocation.FunctionName) + // Enhanced logging with argument information + argCount := len(invocation.Arguments) + decodedArgCount := len(invocation.ArgumentsDecoded) + + if argCount > 0 || decodedArgCount > 0 { + log.Printf("Saved contract invocation: %s (contract: %s, function: %s, args: %d, decoded: %d)", + invocation.TransactionHash, invocation.ContractID, invocation.FunctionName, argCount, decodedArgCount) + } else { + log.Printf("Saved contract invocation: %s (contract: %s, function: %s)", + invocation.TransactionHash, invocation.ContractID, invocation.FunctionName) + } // Forward to next processor if any for _, proc := range p.processors {