Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Overview

The CDP Pipeline Workflow is a Stellar blockchain data processing pipeline built in Go. The system processes Stellar ledger data through a modular architecture consisting of Sources, Processors, and Consumers that can be chained together via YAML configuration files.

## Development Commands

### Building the Application
```bash
# Build locally (requires CGO and ZeroMQ libraries)
CGO_ENABLED=1 go build -o cdp-pipeline-workflow

# Build production Docker image
docker build -t obsrvr-flow-pipeline -f dockerfile .

# Build development Docker image
docker build -t cdp-pipeline-dev -f dockerfile.dev .
```

### Running the Application
```bash
# Run with default config
./cdp-pipeline-workflow -config pipeline_config.yaml

# Run with specific config file
./cdp-pipeline-workflow -config config/base/pipeline_config.yaml

# Run locally with Docker
./run-local.sh config/base/pipeline_config.yaml

# Run development ZeroMQ pipeline
./run-dev-zeromq.sh
```

### Testing
```bash
# Run Go tests
go test ./...

# Run tests with verbose output
go test -v ./...

# Test specific package
go test ./pkg/processor/...
```

## Architecture

### Core Components

1. **Sources** (`pkg/source/`, `source_adapter_*.go`): Data ingestion from various sources
- `CaptiveCoreInboundAdapter`: Direct Stellar Core connection
- `BufferedStorageSourceAdapter`: S3/GCS/filesystem storage
- `RPCSourceAdapter`: Stellar RPC endpoints
- `SorobanSourceAdapter`: Soroban smart contract events

2. **Processors** (`pkg/processor/`, `processor/`): Data transformation and filtering
- Payment processors: `FilterPayments`, `TransformToAppPayment`
- Account processors: `AccountData`, `CreateAccount`, `AccountTransaction`
- Contract processors: `ContractInvocation`, `ContractEvent`, `ContractLedgerReader`
- Market processors: `TransformToAppTrade`, `MarketMetricsProcessor`

3. **Consumers** (`pkg/consumer/`, `consumer/`): Data output and storage
- Database: `SaveToPostgreSQL`, `SaveToMongoDB`, `SaveToDuckDB`, `SaveToClickHouse`
- Cache: `SaveToRedis`, `SaveToRedisOrderbook`
- Files: `SaveToExcel`, `SaveToGCS`
- Messaging: `SaveToZeroMQ`, `SaveToWebSocket`

### Data Flow Pattern

```
Source -> Processor(s) -> Consumer(s)
```

- Sources capture blockchain events and emit `Message` objects
- Processors subscribe to sources/other processors and transform data
- Consumers subscribe to processors and persist/forward data
- All components implement the `Processor` interface for uniform chaining

### Configuration System

- Pipeline configurations are defined in YAML files under `config/base/`
- Each pipeline specifies a source, optional processors, and consumers
- Multiple pipelines can be defined in a single config file
- Factory pattern in `main.go` instantiates components based on config type strings

### Key Interfaces

- `types.Processor`: Core processing interface with `Process()` and `Subscribe()` methods
- `types.Message`: Wrapper for data payloads passed between components
- `SourceAdapter`: Interface for data sources with `Run()` and `Subscribe()` methods

## Development Notes

### Dependencies
- Requires CGO for DuckDB and ZeroMQ support
- Uses Stellar Go SDK for blockchain data structures
- Heavy use of YAML configuration with reflection-based component instantiation

### Code Organization
- Legacy code in root directory (`processor/`, `consumer/`, `source_adapter_*.go`)
- New modular code in `pkg/` directory with factory patterns
- Both entry points exist: `main.go` (legacy) and `cmd/pipeline/main.go` (new)

### Configuration Files
- Base configurations in `config/base/`
- Secret configurations (credentials) use `.secret.yaml` extension
- Template configurations available for common use cases

### Docker Development
- Development workflow uses `dockerfile.dev` with debugging tools
- Production builds use multi-stage `dockerfile` with minimal runtime
- `run-local.sh` script simplifies local Docker development
130 changes: 130 additions & 0 deletions MONOREPO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# CDP Pipeline Workflow Monorepo

This repository is structured as a monorepo to facilitate easier development, testing, and deployment of the CDP Pipeline Workflow components.

## Directory Structure

```
cdp-pipeline-workflow/
├── cmd/ # Command-line applications
│ ├── pipeline/ # Main pipeline application
│ └── tools/ # Utility tools
├── internal/ # Private application and library code
│ ├── common/ # Shared code used across the project
│ │ ├── types/ # Common type definitions
│ │ ├── utils/ # Utility functions
│ │ └── config/ # Configuration handling
│ └── platform/ # Platform-specific implementations
│ └── stellar/ # Stellar-specific code
├── pkg/ # Public library code
│ ├── processor/ # Core processor interfaces and implementations
│ │ ├── base/ # Base processor implementations
│ │ ├── contract/ # Contract-specific processors
│ │ │ ├── kale/ # Kale processor
│ │ │ ├── soroswap/ # Soroswap processor
│ │ │ └── common/ # Shared contract processor code
│ │ └── ledger/ # Ledger processors
│ ├── consumer/ # Consumer implementations
│ │ ├── database/ # Database consumers (PostgreSQL, etc.)
│ │ ├── messaging/ # Messaging consumers (ZeroMQ, Kafka, etc.)
│ │ ├── api/ # API consumers (REST, GraphQL, etc.)
│ │ └── websocket/ # WebSocket consumers
│ └── pluginapi/ # Plugin API interfaces and implementations
├── plugins/ # Plugin implementations
│ ├── kale/ # Kale plugin
│ └── soroswap/ # Soroswap plugin
├── examples/ # Example code
│ ├── kale/ # Kale examples
│ └── soroswap/ # Soroswap examples
├── docs/ # Documentation
│ ├── architecture/ # Architecture diagrams and descriptions
│ ├── api/ # API documentation
│ └── guides/ # User and developer guides
├── deployment/ # Deployment configurations
│ ├── kubernetes/ # Kubernetes manifests
│ └── docker/ # Dockerfiles and docker-compose files
├── tests/ # Integration and end-to-end tests
│ ├── integration/ # Integration tests
│ └── e2e/ # End-to-end tests
├── go.mod # Go modules definition
└── README.md # Project overview
```

## Package Structure

### cmd/

Contains the main executable applications of the project. Each subdirectory is a separate application.

### internal/

Contains code that's private to this repository and not meant to be imported by other projects. This is enforced by Go's module system.

### pkg/

Contains code that can be imported and used by other projects. This is where most of the reusable components live.

#### pkg/processor/

Contains all processor implementations, with interfaces defined in the `base` package.

- **base/**: Core interfaces and base implementations
- **contract/**: Processors for specific smart contracts
- **ledger/**: Processors for Stellar ledger data

#### pkg/consumer/

Contains all consumer implementations, with interfaces defined in the `base` package.

- **base/**: Core interfaces and base implementations
- **database/**: Consumers that save data to databases
- **messaging/**: Consumers that publish to message queues
- **api/**: Consumers that expose data via APIs
- **websocket/**: Consumers that send data over WebSockets

### plugins/

Contains plugin implementations that can be loaded dynamically.

### examples/

Contains example code showing how to use the various components.

## Key Interfaces

### Processor

```go
// Processor defines the interface for processing messages.
type Processor interface {
Process(context.Context, Message) error
Subscribe(Processor)
}
```

### Consumer

```go
// Consumer defines the interface for consuming messages.
type Consumer interface {
Process(context.Context, Message) error
Subscribe(Processor)
}
```

## Getting Started

See the examples directory for usage examples.

## Building and Testing

```bash
# Build all applications
go build ./cmd/...

# Run tests
go test ./...

# Run specific example
go run ./examples/kale/kale_processor_example.go
```
102 changes: 102 additions & 0 deletions README_NEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# CDP Pipeline Workflow

A flexible pipeline system for processing blockchain data, with a focus on Stellar and Soroban contracts.

## Overview

CDP Pipeline Workflow is a modular data processing system designed to ingest, process, and store blockchain data. The system uses a pipeline architecture with processors and consumers that can be configured to handle different types of data and operations.

## Features

- Modular design with pluggable processors and consumers
- Support for multiple blockchain data sources
- Built-in processors for Stellar and Soroban smart contracts
- Support for various data sinks (PostgreSQL, Redis, WebSockets, etc.)
- Flexible configuration via YAML
- Extensible plugin system

## Getting Started

### Prerequisites

- Go 1.23 or later
- Access to a Stellar/Soroban node or archive (for live data)

### Installation

```bash
# Clone the repository
git clone https://github.com/withObsrvr/cdp-pipeline-workflow.git
cd cdp-pipeline-workflow

# Build the main application
go build -o pipeline ./cmd/pipeline
```

### Basic Usage

1. Configure your pipeline in YAML:

```yaml
pipelines:
kale:
name: "Kale Processor Pipeline"
source:
type: "rpc"
config:
url: "https://your-soroban-rpc-endpoint"
processors:
- type: "contract_filter"
config:
contract_id: "CAKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
- type: "kale"
config:
contract_id: "CAKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
consumers:
- type: "postgresql"
config:
connection_string: "postgres://user:password@localhost:5432/database"
```

2. Run the pipeline:

```bash
./pipeline --config your_config.yaml
```

## Architecture

See [MONOREPO.md](MONOREPO.md) for details on the repository structure and architecture.

## Development

### Adding a New Processor

1. Create a new directory in `pkg/processor/` for your processor category
2. Implement the `base.Processor` interface
3. Register your processor in `pkg/processor/base/factory.go`

### Adding a New Consumer

1. Create a new directory in `pkg/consumer/` for your consumer category
2. Implement the `base.Consumer` interface
3. Register your consumer in `pkg/consumer/base/factory.go`

### Running Tests

```bash
go test ./...
```

## Examples

See the `examples/` directory for usage examples.

## License

[Specify your license here]

## Acknowledgments

- The Stellar Development Foundation
- [Other acknowledgments]
Loading