Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ go test -v ./...
go test ./pkg/processor/...
```

Note: The codebase currently has no test files (*_test.go). Testing relies on Docker-based integration testing and manual configuration testing.

## Architecture

### Core Components
Expand Down Expand Up @@ -109,8 +111,18 @@ Source -> Processor(s) -> Consumer(s)
- Base configurations in `config/base/`
- Secret configurations (credentials) use `.secret.yaml` extension
- Template configurations available for common use cases
- Environment variables can override config values (e.g., AWS_ACCESS_KEY_ID, GOOGLE_APPLICATION_CREDENTIALS)

### Docker Development
- Development workflow uses `dockerfile.dev` with debugging tools
- Production builds use multi-stage `dockerfile` with minimal runtime
- `run-local.sh` script simplifies local Docker development
- `run-local.sh` script simplifies local Docker development
- Go version: 1.23 (production), 1.22.5 (development)
- Required system libraries: libzmq3-dev, libczmq-dev, libsodium-dev

### Common Pipeline Types
- **Ledger processing**: Process all ledger data from blockchain
- **Payment filtering**: Extract specific payment operations
- **Account monitoring**: Track account changes and transactions
- **Contract events**: Monitor Soroban smart contract events
- **Market data**: Process DEX trades and market metrics
72 changes: 68 additions & 4 deletions consumer/consumer_save_contract_invocations_to_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func initializeContractInvocationsSchema(db *sql.DB) error {
contract_id TEXT NOT NULL,
invoking_account TEXT NOT NULL,
function_name TEXT,
arguments JSONB,
arguments_decoded JSONB,
successful BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

Expand All @@ -92,6 +94,34 @@ func initializeContractInvocationsSchema(db *sql.DB) error {
CREATE INDEX IF NOT EXISTS idx_contract_invocations_ledger_sequence ON contract_invocations(ledger_sequence);
CREATE INDEX IF NOT EXISTS idx_contract_invocations_transaction_hash ON contract_invocations(transaction_hash);
CREATE INDEX IF NOT EXISTS idx_contract_invocations_function_name ON contract_invocations(function_name);
CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_gin ON contract_invocations USING gin(arguments);
Copy link

Copilot AI Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The GIN index creation for arguments appears both in the initial schema setup and again in the migration block. Consider consolidating or removing the redundancy to simplify the initialization logic.

Copilot uses AI. Check for mistakes.
CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_decoded_gin ON contract_invocations USING gin(arguments_decoded);
`)

if err != nil {
return fmt.Errorf("failed to create contract_invocations table: %w", err)
}

// Add migration for existing columns if they don't exist
_, err = db.Exec(`
DO $$
BEGIN
-- Add arguments column if it doesn't exist
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'contract_invocations' AND column_name = 'arguments') THEN
ALTER TABLE contract_invocations ADD COLUMN arguments JSONB;
END IF;

-- Add arguments_decoded column if it doesn't exist
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'contract_invocations' AND column_name = 'arguments_decoded') THEN
ALTER TABLE contract_invocations ADD COLUMN arguments_decoded JSONB;
END IF;
END $$;

-- Create GIN indexes for JSONB columns if they don't exist
CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_gin ON contract_invocations USING gin(arguments);
CREATE INDEX IF NOT EXISTS idx_contract_invocations_arguments_decoded_gin ON contract_invocations USING gin(arguments_decoded);
`)

if err != nil {
Expand Down Expand Up @@ -227,20 +257,45 @@ func (p *SaveContractInvocationsToPostgreSQL) Process(ctx context.Context, msg p
}
}()

// Prepare arguments for database insertion
var argumentsJSON, argumentsDecodedJSON interface{}

// Marshal arguments if present
if len(invocation.Arguments) > 0 {
argsBytes, err := json.Marshal(invocation.Arguments)
if err != nil {
log.Printf("Warning: failed to marshal arguments for invocation %s: %v", invocation.TransactionHash, err)
} else {
argumentsJSON = argsBytes
}
}

// Marshal decoded arguments if present
if len(invocation.ArgumentsDecoded) > 0 {
argsDecodedBytes, err := json.Marshal(invocation.ArgumentsDecoded)
if err != nil {
log.Printf("Warning: failed to marshal decoded arguments for invocation %s: %v", invocation.TransactionHash, err)
} else {
argumentsDecodedJSON = argsDecodedBytes
}
}

// Insert contract invocation
var invocationID int64
err = tx.QueryRowContext(
ctx,
`INSERT INTO contract_invocations (
timestamp, ledger_sequence, transaction_hash, contract_id,
invoking_account, function_name, successful
) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`,
invoking_account, function_name, arguments, arguments_decoded, successful
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id`,
invocation.Timestamp,
invocation.LedgerSequence,
invocation.TransactionHash,
invocation.ContractID,
invocation.InvokingAccount,
invocation.FunctionName,
argumentsJSON,
argumentsDecodedJSON,
invocation.Successful,
).Scan(&invocationID)

Expand Down Expand Up @@ -365,8 +420,17 @@ func (p *SaveContractInvocationsToPostgreSQL) Process(ctx context.Context, msg p
return fmt.Errorf("failed to commit transaction: %w", err)
}

log.Printf("Saved contract invocation: %s (contract: %s, function: %s)",
invocation.TransactionHash, invocation.ContractID, invocation.FunctionName)
// Enhanced logging with argument information
argCount := len(invocation.Arguments)
decodedArgCount := len(invocation.ArgumentsDecoded)

if argCount > 0 || decodedArgCount > 0 {
log.Printf("Saved contract invocation: %s (contract: %s, function: %s, args: %d, decoded: %d)",
invocation.TransactionHash, invocation.ContractID, invocation.FunctionName, argCount, decodedArgCount)
} else {
log.Printf("Saved contract invocation: %s (contract: %s, function: %s)",
invocation.TransactionHash, invocation.ContractID, invocation.FunctionName)
}

// Forward to next processor if any
for _, proc := range p.processors {
Expand Down