Skip to content

Conversation

@tmosleyIII
Copy link
Contributor

This pull request introduces several new files and updates to enhance the documentation, architecture, and functionality of the CDP Pipeline Workflow project. The changes include the addition of detailed documentation for the pipeline system, a new directory structure for the monorepo, a new main entry point for pipeline execution, and an example implementation for the Kale processor.

Documentation Enhancements:

  • CLAUDE.md: Added detailed guidance for working with the CDP Pipeline Workflow, including development commands, architecture overview, and configuration system.
  • MONOREPO.md: Documented the monorepo structure, package organization, and key interfaces for processors and consumers.
  • README_NEW.md: Provided an overview of the pipeline system, features, and instructions for getting started, including YAML-based configuration examples.

Codebase Updates:

  • cmd/pipeline/main.go: Introduced a new main entry point for executing pipelines, with support for YAML configuration, processor chaining, and consumer integration.

Example Implementation:

@tmosleyIII tmosleyIII requested a review from Copilot June 12, 2025 13:30
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds foundational interfaces and factories for sources, processors, and consumers; introduces a new CLI entry point for pipeline execution; and includes detailed documentation and an example for the Kale processor.

  • Define and register core source/processor/consumer interfaces and factories
  • Implement a new cmd/pipeline/main.go entry point with YAML-driven pipeline setup
  • Provide comprehensive documentation (CLAUDE.md, MONOREPO.md, README_NEW.md) and a Kale example

Reviewed Changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pkg/source/base/source.go Defines SourceAdapter interface and config struct
pkg/processor/processors.go Registers built-in processors (kale, soroswap, LatestLedger)
pkg/processor/ledger/latest_ledger.go Implements LatestLedgerProcessor with extended metrics
pkg/processor/contract/soroswap/processor.go Adds SoroswapProcessor and event handling logic
pkg/processor/contract/common/types.go Introduces shared ContractEvent and ContractInvocation types
pkg/processor/base/processor.go Backward-compatible aliases and helper functions
pkg/processor/base/factory.go Factory and plugin loader for processors
pkg/consumer/consumer.go Re-exports ConsumerConfig
pkg/consumer/base/factory.go Stub for consumer factory
pkg/consumer/base/consumer.go Re-exports Consumer interface
pkg/common/types/types.go Core Message, config types, and utilities
examples/kale/kale_processor_example.go Example showing Kale processor usage
cmd/pipeline/main.go New CLI entry point reading YAML config and wiring pipeline
README_NEW.md User-facing pipeline overview and quickstart guide
MONOREPO.md Monorepo structure and package organization
CLAUDE.md Guidance for Claude AI when working with this repo
Comments suppressed due to low confidence (1)

pkg/processor/processors.go:24

  • [nitpick] The processor type name "LatestLedger" is capitalized and inconsistent with the lowercase keys used for other processors (e.g., "kale"). Consider renaming it to a consistent lowercase key such as "latest_ledger".
base.RegisterProcessor("LatestLedger", func(config map[string]interface{}) (types.Processor, error) {

configFile := flag.String("config", "pipeline_config.yaml", "Path to pipeline configuration file")
flag.Parse()

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
Copy link

Copilot AI Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using os.Kill in signal.NotifyContext has no effect because SIGKILL cannot be caught. Replace os.Kill with a catchable signal like syscall.SIGTERM.

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +78
// Call the constructor
newFunc, ok := newSym.(func() interface{})
if !ok {
return nil, fmt.Errorf("processor plugin 'New' is not a function")
}

instance := newFunc()

// Check if the instance has an Initialize method
initializer, ok := instance.(interface {
Initialize(map[string]interface{}) error
})
if !ok {
return nil, fmt.Errorf("processor plugin instance does not implement Initialize method")
}

// Initialize the processor
if err := initializer.Initialize(config); err != nil {
return nil, fmt.Errorf("failed to initialize processor plugin: %w", err)
}

// Check if the instance implements the Processor interface
processor, ok := instance.(types.Processor)
if !ok {
return nil, fmt.Errorf("processor plugin instance does not implement Processor interface")
}

Copy link

Copilot AI Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin loader assumes a zero-argument constructor but then calls Initialize(config) on the instance. This mismatch may break plugin loading. Consider asserting a constructor signature like func(map[string]interface{}) (Processor, error) directly.

Suggested change
// Call the constructor
newFunc, ok := newSym.(func() interface{})
if !ok {
return nil, fmt.Errorf("processor plugin 'New' is not a function")
}
instance := newFunc()
// Check if the instance has an Initialize method
initializer, ok := instance.(interface {
Initialize(map[string]interface{}) error
})
if !ok {
return nil, fmt.Errorf("processor plugin instance does not implement Initialize method")
}
// Initialize the processor
if err := initializer.Initialize(config); err != nil {
return nil, fmt.Errorf("failed to initialize processor plugin: %w", err)
}
// Check if the instance implements the Processor interface
processor, ok := instance.(types.Processor)
if !ok {
return nil, fmt.Errorf("processor plugin instance does not implement Processor interface")
}
// Assert the constructor signature
newFunc, ok := newSym.(func(map[string]interface{}) (types.Processor, error))
if !ok {
return nil, fmt.Errorf("processor plugin 'New' does not match expected signature")
}
// Call the constructor with the configuration
processor, err := newFunc(config)
if err != nil {
return nil, fmt.Errorf("failed to create processor plugin: %w", err)
}

Copilot uses AI. Check for mistakes.
Comment on lines +297 to +309
// ContractEvent is defined here for compatibility
type ContractEvent struct {
Timestamp time.Time `json:"timestamp"`
LedgerSequence uint32 `json:"ledger_sequence"`
TransactionHash string `json:"transaction_hash"`
ContractID string `json:"contract_id"`
Type string `json:"type"`
Topic []xdr.ScVal `json:"topic"`
Data json.RawMessage `json:"data"`
InSuccessfulTx bool `json:"in_successful_tx"`
EventIndex int `json:"event_index"`
OperationIndex int `json:"operation_index"`
}
Copy link

Copilot AI Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This local ContractEvent duplicates the one defined in pkg/processor/contract/common/types.go. Import and use the shared type to avoid duplication.

Suggested change
// ContractEvent is defined here for compatibility
type ContractEvent struct {
Timestamp time.Time `json:"timestamp"`
LedgerSequence uint32 `json:"ledger_sequence"`
TransactionHash string `json:"transaction_hash"`
ContractID string `json:"contract_id"`
Type string `json:"type"`
Topic []xdr.ScVal `json:"topic"`
Data json.RawMessage `json:"data"`
InSuccessfulTx bool `json:"in_successful_tx"`
EventIndex int `json:"event_index"`
OperationIndex int `json:"operation_index"`
}
// ContractEvent is imported from the common types package
// Use the shared definition to avoid duplication
type ContractEvent = types.ContractEvent

Copilot uses AI. Check for mistakes.
Comment on lines +176 to +180
switch changeType := change.LedgerEntryChangeType(); changeType {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated, xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
return *change.Post, changeType, false, nil
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return *change.Pre, changeType, true, nil
Copy link

Copilot AI Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in ExtractEntryFromChange duplicates ExtractEntryFromIngestChange. Remove one implementation and consolidate to avoid duplicated code paths.

Suggested change
switch changeType := change.LedgerEntryChangeType(); changeType {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated, xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
return *change.Post, changeType, false, nil
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return *change.Pre, changeType, true, nil
return extractEntry(change.LedgerEntryChangeType(), change.Pre, change.Post)
}
// extractEntry is a shared function to extract the most recent state of an entry and determine if it was deleted
func extractEntry(changeType xdr.LedgerEntryChangeType, pre *xdr.LedgerEntry, post *xdr.LedgerEntry) (xdr.LedgerEntry, xdr.LedgerEntryChangeType, bool, error) {
switch changeType {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated, xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
return *post, changeType, false, nil
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return *pre, changeType, true, nil

Copilot uses AI. Check for mistakes.
@tmosleyIII tmosleyIII merged commit 09c2812 into main Jul 10, 2025
@tmosleyIII tmosleyIII deleted the chore/tmosley/updates-and-cleanup branch July 10, 2025 22:23
@claude claude bot mentioned this pull request Nov 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants