diff --git a/README.md b/README.md index b4e99413e..c036f681c 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@

- - - Supabase Logo + + + ETL by Supabase @@ -19,162 +19,85 @@

-**ETL** is a Rust framework by [Supabase](https://supabase.com) that enables you to build high-performance, real-time data replication applications for PostgreSQL. Whether you're creating ETL pipelines, implementing CDC (Change Data Capture), or building custom data synchronization solutions, ETL provides the building blocks you need. +ETL is a Rust framework by [Supabase](https://supabase.com) for building high‑performance, real‑time data replication apps on PostgreSQL. It sits on top of Postgres [logical replication](https://www.postgresql.org/docs/current/protocol-logical-replication.html) and gives you a clean, Rust‑native API for streaming changes to your own destinations. -Built on top of PostgreSQL's [logical streaming replication protocol](https://www.postgresql.org/docs/current/protocol-logical-replication.html), ETL handles the low-level complexities of database replication while providing a clean, Rust-native API that guides you towards the pit of success. +## Highlights -## Table of Contents +- 🚀 Real‑time replication: stream changes as they happen +- ⚡ High performance: batching and parallel workers +- 🛡️ Fault tolerant: retries and recovery built in +- 🔧 Extensible: implement custom stores and destinations +- 🧭 Typed, ergonomic Rust API -- [Features](#features) -- [Installation](#installation) -- [Quickstart](#quickstart) -- [Database Setup](#database-setup) -- [Running Tests](#running-tests) -- [Docker](#docker) -- [Architecture](#architecture) -- [Troubleshooting](#troubleshooting) -- [License](#license) +## Get Started -## Features - -**Core Capabilities:** - -- 🚀 **Real-time replication**: Stream changes from PostgreSQL as they happen -- 🔄 **Multiple destinations**: Support for various data warehouses and databases (coming soon) -- 🛡️ **Fault tolerance**: Built-in error handling, retries, and recovery mechanisms -- ⚡ **High performance**: Efficient batching and parallel processing -- 🔧 **Extensible**: Plugin architecture for custom destinations - -**Supported Destinations:** - -- [x] **BigQuery** - Google Cloud's data warehouse -- [ ] **Apache Iceberg** (planned) - Open table format for analytics -- [ ] **DuckDB** (planned) - In-process analytical database - -## Installation - -Add ETL to your Rust project via git dependencies in `Cargo.toml`: +Install via Git while we prepare for a crates.io release: ```toml [dependencies] etl = { git = "https://github.com/supabase/etl" } ``` -> **Note**: ETL is currently distributed via Git while we prepare for the initial crates.io release. - -## Quickstart - -Get up and running with ETL in minutes using the built-in memory destination: +Quick example using the in‑memory destination: ```rust -use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; -use etl::pipeline::Pipeline; -use etl::destination::memory::MemoryDestination; -use etl::store::both::memory::MemoryStore; +use etl::{ + config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}, + destination::memory::MemoryDestination, + pipeline::Pipeline, + store::both::memory::MemoryStore, +}; #[tokio::main] async fn main() -> Result<(), Box> { - // Configure PostgreSQL connection - let pg_connection_config = PgConnectionConfig { - host: "localhost".to_string(), + let pg = PgConnectionConfig { + host: "localhost".into(), port: 5432, - name: "mydb".to_string(), - username: "postgres".to_string(), + name: "mydb".into(), + username: "postgres".into(), password: Some("password".into()), - tls: TlsConfig { - trusted_root_certs: String::new(), - enabled: false, - }, + tls: TlsConfig { enabled: false, trusted_root_certs: String::new() }, }; - // Configure the pipeline - let pipeline_config = PipelineConfig { + let store = MemoryStore::new(); + let destination = MemoryDestination::new(); + + let config = PipelineConfig { id: 1, - publication_name: "my_publication".to_string(), - pg_connection: pg_connection_config, - batch: BatchConfig { - max_size: 1000, - max_fill_ms: 5000, - }, - table_error_retry_delay_ms: 10000, + publication_name: "my_publication".into(), + pg_connection: pg, + batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 }, + table_error_retry_delay_ms: 10_000, max_table_sync_workers: 4, }; - // Create in-memory store and destination for testing - let store = MemoryStore::new(); - let destination = MemoryDestination::new(); - - // Create and start the pipeline - let mut pipeline = Pipeline::new(1, pipeline_config, store, destination); + let mut pipeline = Pipeline::new(config, store, destination); pipeline.start().await?; + // pipeline.wait().await?; // Optional: block until completion Ok(()) } ``` -**Need production destinations?** Add the `etl-destinations` crate with specific features: +For tutorials and deeper guidance, see the [Documentation](https://supabase.github.io/etl) or jump into the [examples](etl-examples/README.md). -```toml -[dependencies] -etl = { git = "https://github.com/supabase/etl" } -etl-destinations = { git = "https://github.com/supabase/etl", features = ["bigquery"] } -``` - -For comprehensive examples and tutorials, visit the [etl-examples](etl-examples/README.md) crate and our [documentation](https://supabase.github.io/etl). +## Destinations -## Database Setup +ETL is designed to be extensible. You can implement your own destinations to send data to any destination you like, however it comes with a few built in destinations: -Before running the examples, tests, or the API and replicator components, you'll need to set up a PostgreSQL database. -We provide a convenient script to help you with this setup. For detailed instructions on how to use the database setup script, please refer to our [Database Setup Guide](docs/guides/database-setup.md). +- BigQuery -## Running Tests +Out-of-the-box destinations are available in the `etl-destinations` crate: -To run the test suite: - -```bash -cargo test --all-features -``` - -## Docker - -The repository includes Docker support for both the `replicator` and `api` components: - -```bash -# Build replicator image -docker build -f ./etl-replicator/Dockerfile . - -# Build api image -docker build -f ./etl-api/Dockerfile . -``` - -## Architecture - -For a detailed explanation of the ETL architecture and design decisions, please refer to our [Design Document](docs/design/etl-crate-design.md). - -## Troubleshooting - -### Too Many Open Files Error - -If you see the following error when running tests on macOS: - -``` -called `Result::unwrap()` on an `Err` value: Os { code: 24, kind: Uncategorized, message: "Too many open files" } -``` - -Raise the limit of open files per process with: - -```bash -ulimit -n 10000 +```toml +[dependencies] +etl = { git = "https://github.com/supabase/etl" } +etl-destinations = { git = "https://github.com/supabase/etl", features = ["bigquery"] } ``` -### Performance Considerations - -Currently, the system parallelizes the copying of different tables, but each individual table is still copied in sequential batches. -This limits performance for large tables. We plan to address this once the ETL system reaches greater stability. - ## License -Distributed under the Apache-2.0 License. See `LICENSE` for more information. +Apache‑2.0. See `LICENSE` for details. --- diff --git a/docs/explanation/architecture.md b/docs/explanation/architecture.md index b62abfa82..d5ec59f3c 100644 --- a/docs/explanation/architecture.md +++ b/docs/explanation/architecture.md @@ -1,4 +1,238 @@ -# ETL Architecture -!!! info "Coming Soon" - This page is under development. \ No newline at end of file +# ETL Architecture Overview + +**Understanding how ETL components work together to replicate data from PostgreSQL** + +ETL's architecture centers around four core abstractions that work together to provide reliable, high-performance data replication: `Pipeline`, `Destination`, `SchemaStore`, and `StateStore`. This document explains how these components interact and coordinate data flow from PostgreSQL logical replication to target systems. + +A diagram of the overall architecture is shown below: + +```mermaid +flowchart LR + subgraph PostgreSQL + A["WAL Stream
Publications
Replication Slots"] + end + + subgraph ETL_Pipeline[ETL Pipeline] + subgraph ApplyWorker[Apply Worker] + B1["CDC Events Processing and Tables Synchronization"] + end + + subgraph TableSyncWorkers[Table Sync Workers] + B2["Table 1 Sync + CDC"] + B3["Table 2 Sync + CDC"] + B4["Table N Sync + CDC"] + end + end + + subgraph Destination[Destination] + Dest["BigQuery
Custom API
Memory"] + end + + subgraph Store[Store] + subgraph StateStore[State Store] + D1["Memory
PostgreSQL"] + end + + subgraph SchemaStore[Schema Store] + D2["Memory
PostgreSQL"] + end + end + + A --> ApplyWorker + ApplyWorker --> TableSyncWorkers + + ApplyWorker --> Destination + TableSyncWorkers --> Destination + + ApplyWorker --> Store + TableSyncWorkers --> Store +``` + +## Core Abstractions + +### Pipeline + +The Pipeline is ETL's central component that orchestrates all replication activity. It manages worker lifecycles, coordinates data flow, and handles error recovery. + +**Key responsibilities:** + +- Initializes the state of the pipeline +- Spawns the apply worker and table sync workers pool +- Tracks workers handles to wait for their termination +- Exposes the shutdown mechanism for gracefully terminating the pipeline + +### Destination + +The `Destination` trait defines how replicated data is delivered to target systems: + +```rust +pub trait Destination { + fn truncate_table(&self, table_id: TableId) -> impl Future> + Send; + + fn write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + ) -> impl Future> + Send; + + fn write_events(&self, events: Vec) -> impl Future> + Send; +} +``` + +The trait provides three operations: + +- `truncate_table`: clears destination tables before bulk loading. +- `write_table_rows`: handles bulk data insertion during initial synchronization. +- `write_events`: processes streaming replication changes. + +### SchemaStore + +The `SchemaStore` trait manages table schema information: + +```rust +pub trait SchemaStore { + fn get_table_schema( + &self, + table_id: &TableId, + ) -> impl Future>>> + Send; + + fn get_table_schemas(&self) -> impl Future>>> + Send; + + fn load_table_schemas(&self) -> impl Future> + Send; + + fn store_table_schema( + &self, + table_schema: TableSchema, + ) -> impl Future> + Send; +} +``` + +The store follows a cache-first pattern: `load_table_schemas` populates an in-memory cache at startup, while `get_table_schemas` methods read only from cache for performance. `store_table_schema` implements dual-write to both persistent storage and cache. + +### StateStore + +The `StateStore` trait manages replication state and table mappings: + +```rust +pub trait StateStore { + fn get_table_replication_state( + &self, + table_id: TableId, + ) -> impl Future>> + Send; + + fn get_table_replication_states( + &self, + ) -> impl Future>> + Send; + + fn load_table_replication_states(&self) -> impl Future> + Send; + + fn update_table_replication_state( + &self, + table_id: TableId, + state: TableReplicationPhase, + ) -> impl Future> + Send; + + fn rollback_table_replication_state( + &self, + table_id: TableId, + ) -> impl Future> + Send; + + fn get_table_mapping( + &self, + source_table_id: &TableId, + ) -> impl Future>> + Send; + + fn get_table_mappings( + &self, + ) -> impl Future>> + Send; + + fn load_table_mappings(&self) -> impl Future> + Send; + + fn store_table_mapping( + &self, + source_table_id: TableId, + destination_table_id: String, + ) -> impl Future> + Send; +} +``` + +Like `SchemaStore`, `StateStore` uses cache-first reads with `load_*` methods for startup population and dual-write patterns for updates. + +The store tracks both replication progress through `TableReplicationPhase` and source-to-destination table name mappings. + +## Data Flow Architecture + +### Worker Coordination + +ETL's data flow is orchestrated through two types of workers. + +#### Apply Worker + +- Processes PostgreSQL logical replication stream +- Spawns table sync workers when new table are discovered +- Coordinates with table sync workers through shared state +- Handles final event processing for tables in `Ready` state + +#### Table Sync Workers + +- Perform bulk copying of existing table data +- Coordinate handoff to apply worker when synchronization completes +- Multiple table sync workers run in parallel, limited by configured semaphore to bound number of connections + +### Worker Startup Sequence + +The Pipeline follows this startup sequence: + +1. **Pipeline Initialization**: Establishes PostgreSQL connection and loads cached state +2. **Apply Worker Launch**: Creates and starts the primary apply worker first +3. **Table Discovery**: Apply worker identifies tables requiring synchronization +4. **Table Sync Spawning**: Apply worker spawns table sync workers for tables in `Init` state +5. **Coordination**: Workers communicate through shared state store +6. **Streaming**: Apply worker starts streaming replication events of table in `Ready` state and at every commit point + checks for new tables to synchronize + +_The apply worker always starts first because it coordinates the overall replication process and spawns table sync workers on demand._ + +### Table Replication Phases + +Each table progresses through distinct phases during replication: + +```rust +pub enum TableReplicationPhase { + Init, + DataSync, + FinishedCopy, + SyncWait, + Catchup { lsn: PgLsn }, + SyncDone { lsn: PgLsn }, + Ready, + Errored { reason: String, solution: Option, retry_policy: RetryPolicy }, +} +``` + +**Phase Ownership and Transitions:** + +- **Init**: The table is discovered and ready to be copied +- **DataSync**: The table copy has started and is in progress +- **FinishedCopy**: The table has been fully copied and is ready to start CDC streaming +- **SyncWait**: The table is ready to start CDC streaming and is waiting for the apply worker to tell which LSN to catchup +- **Catchup**: The table is catching up to the the LSN specified by the apply worker +- **SyncDone**: The table has caught up to the LSN specified by the apply worker +- **Ready**: The table is now copied and caught up with the apply worker, now all events are processed by the apply worker for this table +- **Errored**: The table has encountered an error and is excluded from replication until a rollback is performed + +## Next Steps + +Now that you understand ETL's architecture: + +- **Build your first pipeline** → [First Pipeline Tutorial](../tutorials/first-pipeline.md) +- **Implement custom components** → [Custom Stores and Destinations](../tutorials/custom-implementations.md) +- **Configure PostgreSQL properly** → [Configure PostgreSQL for Replication](../how-to/configure-postgres.md) + +## See Also + +- [Build Your First ETL Pipeline](../tutorials/first-pipeline.md) - Hands-on tutorial using these components +- [Custom Stores and Destinations](../tutorials/custom-implementations.md) - Implement your own stores and destinations +- [API Reference](../reference/index.md) - Complete trait documentation +- [Configure PostgreSQL for Replication](../how-to/configure-postgres.md) - Set up the source database \ No newline at end of file diff --git a/docs/explanation/crate-structure.md b/docs/explanation/crate-structure.md deleted file mode 100644 index 4450f2242..000000000 --- a/docs/explanation/crate-structure.md +++ /dev/null @@ -1,4 +0,0 @@ -# Crate Structure - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/explanation/design.md b/docs/explanation/design.md deleted file mode 100644 index 1e0d3261a..000000000 --- a/docs/explanation/design.md +++ /dev/null @@ -1,4 +0,0 @@ -# Design Philosophy - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/explanation/index.md b/docs/explanation/index.md index 445116398..705e6bcb7 100644 --- a/docs/explanation/index.md +++ b/docs/explanation/index.md @@ -1,4 +1,36 @@ -# Explanation -!!! info "Coming Soon" - This page is under development. \ No newline at end of file +# Explanations + +**Deep dives into ETL concepts, architecture, and design decisions** + +Explanations help you build mental models of how ETL works and why it's designed the way it is. These topics provide background knowledge, compare alternatives, and explore the reasoning behind key architectural choices. + +## Core Concepts + +### [ETL Architecture Overview](architecture.md) +**The big picture of how ETL components work together** + +Understand the relationship between pipelines, destinations, stores, and the PostgreSQL replication protocol. Learn how data flows through the system and where extension points exist. + +*Topics covered:* Component architecture, data flow, extension patterns, scalability considerations. + +## Reading Guide + +**New to ETL?** Start with the [ETL Architecture](architecture.md) to understand how the system works. + +**Planning a production deployment?** Read [Architecture](architecture.md) to understand system behavior. + +**Building extensions?** Check out the [Custom Implementations Tutorial](../tutorials/custom-implementations.md). + +## Next Steps + +After building a conceptual understanding: + +- **Start building** → [Tutorials](../tutorials/index.md) +- **Solve specific problems** → [How-To Guides](../how-to/index.md) +- **Look up technical details** → [Reference](../reference/index.md) + +## Contributing to Explanations + +Found gaps in these explanations? See something that could be clearer? +[Open an issue](https://github.com/supabase/etl/issues) or contribute improvements to help other users build better mental models of ETL. \ No newline at end of file diff --git a/docs/explanation/performance.md b/docs/explanation/performance.md deleted file mode 100644 index c963daad9..000000000 --- a/docs/explanation/performance.md +++ /dev/null @@ -1,4 +0,0 @@ -# Performance Model - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/explanation/replication.md b/docs/explanation/replication.md deleted file mode 100644 index 04613fb6e..000000000 --- a/docs/explanation/replication.md +++ /dev/null @@ -1,4 +0,0 @@ -# Replication Protocol - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/getting-started/first-pipeline.md b/docs/getting-started/first-pipeline.md deleted file mode 100644 index 002d829a3..000000000 --- a/docs/getting-started/first-pipeline.md +++ /dev/null @@ -1,4 +0,0 @@ -# Your First Pipeline - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md deleted file mode 100644 index a02e348a4..000000000 --- a/docs/getting-started/installation.md +++ /dev/null @@ -1,4 +0,0 @@ -# Installation - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md deleted file mode 100644 index 56ac12024..000000000 --- a/docs/getting-started/quickstart.md +++ /dev/null @@ -1,4 +0,0 @@ -# Quick Start - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/how-to/configure-postgres.md b/docs/how-to/configure-postgres.md index b208e601d..3d0d0c263 100644 --- a/docs/how-to/configure-postgres.md +++ b/docs/how-to/configure-postgres.md @@ -1,4 +1,177 @@ -# Configure PostgreSQL +# Configure PostgreSQL for Replication -!!! info "Coming Soon" - This page is under development. \ No newline at end of file +**Set up PostgreSQL with the correct permissions and settings for ETL logical replication** + +This guide covers the essential PostgreSQL concepts and configuration needed for logical replication with ETL. + +## Prerequisites + +- PostgreSQL 10 or later +- Superuser access to the PostgreSQL server +- Ability to restart PostgreSQL server (for configuration changes) + +## Understanding WAL Logical + +PostgreSQL's Write-Ahead Log (WAL) is the foundation of logical replication. When `wal_level = logical`, PostgreSQL: + +- Records detailed information about data changes (not just physical changes) +- Includes enough metadata to reconstruct logical changes +- Allows external tools to decode and stream these changes + +```ini +# Enable logical replication in postgresql.conf +wal_level = logical +``` + +**Restart PostgreSQL** after changing this setting: +```bash +sudo systemctl restart postgresql +``` + +## Replication Slots + +Replication slots ensure that PostgreSQL retains WAL data for logical replication consumers, even if they disconnect temporarily. + +### What are Replication Slots? + +- **Persistent markers** in PostgreSQL that track replication progress +- **Prevent WAL cleanup** until the consumer catches up +- **Guarantee data consistency** across disconnections + +### Creating Replication Slots + +```sql +-- Create a logical replication slot +SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput'); +``` + +### Viewing Replication Slots + +```sql +-- See all replication slots +SELECT slot_name, slot_type, active, restart_lsn +FROM pg_replication_slots; +``` + +### Deleting Replication Slots + +```sql +-- Drop a replication slot when no longer needed +SELECT pg_drop_replication_slot('my_slot'); +``` + +**Warning:** Only delete slots when you're sure they're not in use. Deleting an active slot can cause data loss. + +## Max Replication Slots + +Controls how many replication slots PostgreSQL can maintain simultaneously. + +```ini +# Increase max replication slots (default is 10) +max_replication_slots = 20 +``` + +ETL uses a **single replication slot** for its main apply worker. However, additional slots may be created for parallel table +copies when the pipeline is initialized or when a new table is added to the publication. The `max_table_sync_workers` parameter +controls the number of these parallel copies, ensuring that the total replication slots used by ETL never exceed `max_table_sync_workers + 1`. + +**When to increase:** + +- Running multiple ETL pipelines +- Development/testing with frequent slot creation + +## WAL Keep Size + +Determines how much WAL data to retain on disk, providing a safety buffer for replication consumers. + +```ini +# Keep 1GB of WAL data (PostgreSQL 13+) +wal_keep_size = 1GB + +# For PostgreSQL 12 and earlier, use: +# wal_keep_segments = 256 # Each segment is typically 16MB +``` + +**Purpose:** + +- Prevents WAL deletion when replication consumers fall behind +- Provides recovery time if ETL pipelines temporarily disconnect +- Balances disk usage with replication reliability + +## Publications + +Publications define which tables and operations to replicate. + +### Creating Publications + +```sql +-- Create publication for specific tables +CREATE PUBLICATION my_publication FOR TABLE users, orders; + +-- Create publication for all tables (use with caution) +CREATE PUBLICATION all_tables FOR ALL TABLES; + +-- Include only specific operations +CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert'); +``` + +### Managing Publications + +```sql +-- View existing publications +SELECT * FROM pg_publication; + +-- See which tables are in a publication +SELECT * FROM pg_publication_tables WHERE pubname = 'my_publication'; + +-- Add tables to existing publication +ALTER PUBLICATION my_publication ADD TABLE products; + +-- Remove tables from publication +ALTER PUBLICATION my_publication DROP TABLE products; + +-- Drop publication +DROP PUBLICATION my_publication; +``` + +## Complete Configuration Example + +Here's a minimal `postgresql.conf` setup: + +```ini +# Enable logical replication +wal_level = logical + +# Increase replication capacity +max_replication_slots = 20 +max_wal_senders = 20 + +# Keep WAL data for safety +wal_keep_size = 1GB # PostgreSQL 13+ +# wal_keep_segments = 64 # PostgreSQL 12 and earlier +``` + +After editing the configuration: + +1. **Restart PostgreSQL** +2. **Create your publication**: + ```sql + CREATE PUBLICATION etl_publication FOR TABLE your_table; + ``` +3. **Verify the setup**: + ```sql + SHOW wal_level; + SHOW max_replication_slots; + SELECT * FROM pg_publication WHERE pubname = 'etl_publication'; + ``` + +## Next Steps + +- **Build your first pipeline** → [Build Your First ETL Pipeline](../tutorials/first-pipeline.md) +- **Build custom implementations** → [Custom Stores and Destinations](../tutorials/custom-implementations.md) + +## See Also + +- [Build Your First ETL Pipeline](../tutorials/first-pipeline.md) - Hands-on tutorial using these settings +- [ETL Architecture](../explanation/architecture.md) - Understanding how ETL uses these settings +- [API Reference](../reference/index.md) - All available connection options \ No newline at end of file diff --git a/docs/how-to/custom-destinations.md b/docs/how-to/custom-destinations.md deleted file mode 100644 index 2a4a34346..000000000 --- a/docs/how-to/custom-destinations.md +++ /dev/null @@ -1,4 +0,0 @@ -# Implement Custom Destinations - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/how-to/debugging.md b/docs/how-to/debugging.md deleted file mode 100644 index 199c490ef..000000000 --- a/docs/how-to/debugging.md +++ /dev/null @@ -1,4 +0,0 @@ -# Debug Replication Issues - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/how-to/index.md b/docs/how-to/index.md index ffd78f6d2..17e31e7d6 100644 --- a/docs/how-to/index.md +++ b/docs/how-to/index.md @@ -1,4 +1,28 @@ -# How-to Guides -!!! info "Coming Soon" - This page is under development. \ No newline at end of file +# How-To Guides + +**Practical solutions for common ETL tasks** + +How-to guides provide step-by-step instructions for accomplishing specific goals when working with ETL. Each guide assumes you're already familiar with ETL basics and focuses on the task at hand. + +## Database Configuration + +### [Configure PostgreSQL for Replication](configure-postgres.md) +Set up PostgreSQL with the correct settings, and publications for ETL pipelines. + +**When to use:** Setting up a new PostgreSQL source for replication. + +## Next Steps + +After solving your immediate problem: + +- **Learn more concepts** → [Explanations](../explanation/index.md) +- **Look up technical details** → [Reference](../reference/index.md) +- **Build foundational knowledge** → [Tutorials](../tutorials/index.md) + +## Need Help? + +If these guides don't cover your specific situation: +1. Check the [PostgreSQL configuration guide](configure-postgres.md) +2. Search existing [GitHub issues](https://github.com/supabase/etl/issues) +3. [Open a new issue](https://github.com/supabase/etl/issues/new) with details about your use case \ No newline at end of file diff --git a/docs/how-to/performance.md b/docs/how-to/performance.md deleted file mode 100644 index 7826051e4..000000000 --- a/docs/how-to/performance.md +++ /dev/null @@ -1,4 +0,0 @@ -# Optimize Performance - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/how-to/schema-changes.md b/docs/how-to/schema-changes.md deleted file mode 100644 index aabf712e3..000000000 --- a/docs/how-to/schema-changes.md +++ /dev/null @@ -1,4 +0,0 @@ -# Handle Schema Changes - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/how-to/testing.md b/docs/how-to/testing.md deleted file mode 100644 index 48234cc70..000000000 --- a/docs/how-to/testing.md +++ /dev/null @@ -1,4 +0,0 @@ -# Set Up Tests - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 160c6859d..d89ff6993 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,60 +1,92 @@ ---- -hide: - - navigation ---- -# ETL +# ETL Documentation -!!! info "Coming Soon" - ETL docs are coming soon! +**Build real-time Postgres replication applications in Rust** -Welcome to the ETL project, a Rust-based collection of tooling designed to build efficient and reliable Postgres replication applications. This documentation page provides an overview of the ETL project, the benefits of using ETL, the advantages of implementing it in Rust, and an introduction to Postgres logical replication. It also outlines the resources available in this documentation to help you get started. +ETL is a Rust framework by [Supabase](https://supabase.com) that enables you to build high-performance, real-time data replication applications for PostgreSQL. Whether you're creating ETL pipelines, implementing CDC (Change Data Capture), or building custom data synchronization solutions, ETL provides the building blocks you need. -## What is ETL +## Getting Started -ETL is a collection of Rust crates which can be used to build replication data pipelines on top of [Postgres's logical replication protocol](https://www.postgresql.org/docs/current/protocol-logical-replication.html). It provides a high-level API to work with Postgres logical replication, allowing developers to focus on building their applications without worrying about the low-level details of the replication protocol. The ETL crate abstracts away the complexities of managing replication slots, publications, and subscriptions, enabling you to create robust data pipelines that can continually copy data from Postgres to various destinations like BigQuery and other OLAP databases. +Choose your path based on your needs: -## What is Postgres Logical Replication? +### New to ETL? +Start with our **[Tutorials](tutorials/index.md)** to learn ETL through hands-on examples: -Postgres logical replication is a method for replicating data between PostgreSQL databases at the logical (table or row) level, rather than the physical (block-level) level. It allows selective replication of specific tables or data subsets, making it ideal for scenarios like data warehousing, real-time analytics, or cross-database synchronization. +- [Build your first ETL pipeline](tutorials/first-pipeline.md) - Complete beginner's guide (15 minutes) +- [Build custom stores and destinations](tutorials/custom-implementations.md) - Advanced patterns (30 minutes) -Logical replication uses a publish/subscribe model, where a source database (publisher) sends changes to a replication slot, and a destination system (subscriber) applies those changes to its own tables. This approach supports selective data replication and is compatible with different PostgreSQL versions or even external systems. +### Ready to solve specific problems? +Jump to our **[How-To Guides](how-to/index.md)** for practical solutions: -### How Does Postgres Logical Replication Work? +- [Configure PostgreSQL for replication](how-to/configure-postgres.md) +- More guides coming soon -Postgres logical replication operates through the following steps: +### Want to understand the bigger picture? +Read our **[Explanations](explanation/index.md)** for deeper insights: -**Publication Creation**: A publication is created in the source database, specifying which tables or data to replicate. For example: +- [ETL architecture overview](explanation/architecture.md) +- More explanations coming soon -```sql -create publication my_publication for table orders, customers; -``` +## Core Concepts -**Replication Slot**: A logical replication slot is created on the source database to track changes (inserts, updates, deletes) for the published tables. The slot ensures that changes are preserved until they are consumed by a subscriber. +**Postgres Logical Replication** streams data changes from PostgreSQL databases in real-time using the Write-Ahead Log (WAL). ETL builds on this foundation to provide: -**Subscription Setup**: The destination system (subscriber) creates a subscription that connects to the publication, specifying the source database and replication slot. For example: +- 🚀 **Real-time replication** - Stream changes as they happen +- 🔄 **Multiple destinations** - BigQuery and more coming soon +- 🛡️ **Fault tolerance** - Built-in error handling and recovery +- ⚡ **High performance** - Efficient batching and parallel processing +- 🔧 **Extensible** - Plugin architecture for custom destinations -```sql -create subscription my_subscription -connection 'host=localhost port=5432 dbname=postgres user=postgres password=password' -publication my_publication; -``` +## Quick Example -**Change Data Capture (CDC)**: The source database streams changes (via the Write-Ahead Log, or WAL) to the replication slot. The subscriber receives these changes and applies them to its tables, maintaining data consistency. +```rust +use etl::{ + config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}, + destination::memory::MemoryDestination, + pipeline::Pipeline, + store::both::memory::MemoryStore, +}; -This process enables real-time data synchronization with minimal overhead, making it suitable for ETL workflows where data needs to be transformed and loaded into destinations like data warehouses or analytical databases. +#[tokio::main] +async fn main() -> Result<(), Box> { + // Configure PostgreSQL connection + let pg_config = PgConnectionConfig { + host: "localhost".to_string(), + port: 5432, + name: "mydb".to_string(), + username: "postgres".to_string(), + password: Some("password".to_string().into()), + tls: TlsConfig { enabled: false, trusted_root_certs: String::new() }, + }; -## Why Use ETL + // Create memory-based store and destination for testing + let store = MemoryStore::new(); + let destination = MemoryDestination::new(); -ETL provides a set of building blocks to construct data pipelines which can continually copy data from Postgres to other systems. It abstracts away the low-level details of the logical replication protocol and provides a high-level API to work with. This allows developers to focus on building their applications without worrying about the intricacies of the replication protocol. + // Configure the pipeline + let config = PipelineConfig { + id: 1, + publication_name: "my_publication".to_string(), + pg_connection: pg_config, + batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 }, + table_error_retry_delay_ms: 10000, + max_table_sync_workers: 4, + }; -### Why is ETL Written in Rust? + // Create and start the pipeline + let mut pipeline = Pipeline::new(config, store, destination); + pipeline.start().await?; -The ETL crate is written in Rust to leverage the language's unique strengths, making it an ideal choice for building robust data pipelines: + // Pipeline will run until stopped + pipeline.wait().await?; + + Ok(()) +} +``` -- **Performance**: Rust's zero-cost abstractions and low-level control enable high-performance data processing, critical for handling large-scale ETL workloads. -- **Safety**: Rust's strong type system and memory safety guarantees minimize bugs and ensure reliable data handling, reducing the risk of data corruption or crashes. -- **Concurrency**: Rust’s ownership model and async capabilities allow efficient parallel processing, ideal for managing complex, high-throughput ETL pipelines. -- **Ecosystem Integration**: Rust’s growing ecosystem and compatibility with modern cloud and database technologies make it a natural fit for Postgres-focused infrastructure. +## Next Steps -By using Rust, the ETL crate provides a fast, safe, and scalable solution for building Postgres replication applications. +- **First time using ETL?** → Start with [Build your first pipeline](tutorials/first-pipeline.md) +- **Need PostgreSQL setup help?** → Check [Configure PostgreSQL for Replication](how-to/configure-postgres.md) +- **Need technical details?** → Check the [Reference](reference/index.md) +- **Want to understand the architecture?** → Read [ETL Architecture](explanation/architecture.md) diff --git a/docs/reference/index.md b/docs/reference/index.md index 1d8074836..df2c3c1b4 100644 --- a/docs/reference/index.md +++ b/docs/reference/index.md @@ -1,4 +1,14 @@ + # Reference -!!! info "Coming Soon" - This page is under development. \ No newline at end of file +Complete API documentation is available through Rust's built-in documentation system. We will publish comprehensive rustdoc documentation that covers all public APIs, traits, and configuration structures. +Right now the docs are accessible via the code or by running: +```shell +cargo doc --workspace --all-features --no-deps --open +``` + +## See Also + +- [How-to guides](../how-to/index.md) - Task-oriented instructions +- [Tutorials](../tutorials/index.md) - Learning-oriented lessons +- [Explanations](../explanation/index.md) - Understanding-oriented discussions \ No newline at end of file diff --git a/docs/tutorials/custom-implementations.md b/docs/tutorials/custom-implementations.md new file mode 100644 index 000000000..a0ef212dd --- /dev/null +++ b/docs/tutorials/custom-implementations.md @@ -0,0 +1,692 @@ + +# Build Custom Stores and Destinations in 30 minutes + +**Learn ETL's extension patterns by implementing working custom components** + +## What You'll Build + +By the end of this tutorial, you'll have: + +- A **working custom in-memory store** that logs all operations for debugging +- A **custom HTTP destination** that sends data with automatic retries +- A **complete pipeline** using your custom components that processes real data + +**Time required:** 30 minutes +**Prerequisites:** Advanced Rust knowledge, running PostgreSQL, basic HTTP knowledge + +## Step 1: Create Project Structure + +Create a new Rust project for your custom ETL components: + +```bash +cargo new etl-custom --lib +cd etl-custom +``` + +**Result:** You should see `Created library 'etl-custom' package` output. + +## Step 2: Add Dependencies + +Replace your `Cargo.toml` with the required dependencies: + +```toml +[package] +name = "etl-custom" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "main" +path = "src/main.rs" + +[dependencies] +etl = { git = "https://github.com/supabase/etl" } +tokio = { version = "1.0", features = ["full"] } +reqwest = { version = "0.11", features = ["json"] } +serde_json = "1.0" +tracing = "0.1" +tracing-subscriber = "0.3" +``` + +**Result:** Running `cargo check` should download dependencies without errors. + +## Step 3: Create Custom Store Implementation + +Create `src/custom_store.rs` with a dual-storage implementation: + +```rust +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::info; + +use etl::error::EtlResult; +use etl::state::table::TableReplicationPhase; +use etl::store::schema::SchemaStore; +use etl::store::state::StateStore; +use etl::types::{TableId, TableSchema}; + +// Represents data stored in our in-memory cache for fast access +#[derive(Debug, Clone)] +struct CachedEntry { + schema: Option>, // Table structure info + state: Option, // Current replication progress + mapping: Option, // Source -> destination table mapping +} + +// Represents data as it would be stored persistently (e.g., in files/database) +#[derive(Debug, Clone)] +struct PersistentEntry { + schema: Option, // Not Arc-wrapped in "persistent" storage + state: Option, + mapping: Option, +} + +#[derive(Debug, Clone)] +pub struct CustomStore { + // Fast in-memory cache for all read operations - this is what ETL queries + cache: Arc>>, + // Simulated persistent storage - in real implementation this might be files/database + persistent: Arc>>, +} + +impl CustomStore { + pub fn new() -> Self { + info!("Creating custom store with dual-layer architecture (cache + persistent)"); + Self { + cache: Arc::new(Mutex::new(HashMap::new())), + persistent: Arc::new(Mutex::new(HashMap::new())), + } + } + + // Helper to ensure we have a cache entry to work with - creates if missing + fn ensure_cache_slot<'a>( + cache: &'a mut HashMap, + id: TableId, + ) -> &'a mut CachedEntry { + cache.entry(id).or_insert_with(|| { + // Initialize empty entry if this table hasn't been seen before + CachedEntry { + schema: None, + state: None, + mapping: None + } + }) + } + + // Helper to ensure we have a persistent entry to work with - creates if missing + fn ensure_persistent_slot<'a>( + persistent: &'a mut HashMap, + id: TableId, + ) -> &'a mut PersistentEntry { + persistent.entry(id).or_insert_with(|| { + // Initialize empty persistent entry if this table hasn't been seen before + PersistentEntry { + schema: None, + state: None, + mapping: None + } + }) + } +} + +// Implementation of ETL's SchemaStore trait - handles table structure information +impl SchemaStore for CustomStore { + // ETL calls this frequently during data processing - must be fast (cache-only) + async fn get_table_schema(&self, table_id: &TableId) -> EtlResult>> { + let cache = self.cache.lock().await; + let result = cache.get(table_id).and_then(|e| e.schema.clone()); + info!("Schema cache read for table {}: found={}", table_id.0, result.is_some()); + Ok(result) + } + + // Return all cached schemas - used by ETL for bulk operations + async fn get_table_schemas(&self) -> EtlResult>> { + let cache = self.cache.lock().await; + let schemas: Vec<_> = cache.values() + .filter_map(|e| e.schema.clone()) // Only include entries that have schemas + .collect(); + info!("Retrieved {} schemas from cache", schemas.len()); + Ok(schemas) + } + + // Called at startup - load persistent data into cache for fast access + async fn load_table_schemas(&self) -> EtlResult { + info!("Loading schemas from persistent storage into cache (startup phase)"); + let persistent = self.persistent.lock().await; + let mut cache = self.cache.lock().await; + + let mut loaded = 0; + for (id, pentry) in persistent.iter() { + if let Some(schema) = &pentry.schema { + // Move schema from persistent storage to cache, wrapping in Arc for sharing + let centry = Self::ensure_cache_slot(&mut cache, *id); + centry.schema = Some(Arc::new(schema.clone())); + loaded += 1; + } + } + info!("Loaded {} schemas into cache from persistent storage", loaded); + Ok(loaded) + } + + // Store new schema - implements dual-write pattern (persistent first, then cache) + async fn store_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> { + let id = table_schema.id; + info!("Storing schema for table {} using dual-write pattern", id.0); + + // First write to persistent storage (this would be a file/database in reality) + { + let mut persistent = self.persistent.lock().await; + let p = Self::ensure_persistent_slot(&mut persistent, id); + p.schema = Some(table_schema.clone()); + } + // Then update cache for immediate availability + { + let mut cache = self.cache.lock().await; + let c = Self::ensure_cache_slot(&mut cache, id); + c.schema = Some(Arc::new(table_schema)); + } + Ok(()) + } +} + +// Implementation of ETL's StateStore trait - handles replication progress tracking +impl StateStore for CustomStore { + // Get current replication state for a table - cache-only for speed + async fn get_table_replication_state( + &self, + table_id: TableId, + ) -> EtlResult> { + let cache = self.cache.lock().await; + let result = cache.get(&table_id).and_then(|e| e.state.clone()); + info!("State cache read for table {}: {:?}", table_id.0, result); + Ok(result) + } + + // Get all replication states - used by ETL to understand overall progress + async fn get_table_replication_states( + &self, + ) -> EtlResult> { + let cache = self.cache.lock().await; + let states: HashMap<_, _> = cache.iter() + .filter_map(|(id, e)| e.state.clone().map(|s| (*id, s))) // Only include tables with state + .collect(); + info!("Retrieved {} table states from cache", states.len()); + Ok(states) + } + + // Load persistent states into cache at startup + async fn load_table_replication_states(&self) -> EtlResult { + info!("Loading replication states from persistent storage into cache"); + let persistent = self.persistent.lock().await; + let mut cache = self.cache.lock().await; + + let mut loaded = 0; + for (id, pentry) in persistent.iter() { + if let Some(state) = pentry.state.clone() { + // Move state from persistent to cache + let centry = Self::ensure_cache_slot(&mut cache, *id); + centry.state = Some(state); + loaded += 1; + } + } + info!("Loaded {} replication states into cache", loaded); + Ok(loaded) + } + + // Update replication state - critical for tracking progress, uses dual-write + async fn update_table_replication_state( + &self, + table_id: TableId, + state: TableReplicationPhase, + ) -> EtlResult<()> { + info!("Updating replication state for table {} to {:?} (dual-write)", table_id.0, state); + + // First persist the state (ensures durability) + { + let mut persistent = self.persistent.lock().await; + let p = Self::ensure_persistent_slot(&mut persistent, table_id); + p.state = Some(state.clone()); + } + // Then update cache (ensures immediate availability) + { + let mut cache = self.cache.lock().await; + let c = Self::ensure_cache_slot(&mut cache, table_id); + c.state = Some(state); + } + Ok(()) + } + + // Rollback state to previous version - not implemented in this simple example + async fn rollback_table_replication_state( + &self, + _table_id: TableId, + ) -> EtlResult { + // In a real implementation, you'd maintain state history and rollback to previous version + todo!("Implement state history tracking for rollback") + } + + // Get table name mapping from source to destination + async fn get_table_mapping(&self, source_table_id: &TableId) -> EtlResult> { + let cache = self.cache.lock().await; + let mapping = cache.get(source_table_id).and_then(|e| e.mapping.clone()); + info!("Mapping lookup for table {}: {:?}", source_table_id.0, mapping); + Ok(mapping) + } + + // Get all table mappings - used when ETL needs to understand all table relationships + async fn get_table_mappings(&self) -> EtlResult> { + let cache = self.cache.lock().await; + let mappings: HashMap<_, _> = cache.iter() + .filter_map(|(id, e)| e.mapping.clone().map(|m| (*id, m))) // Only include mapped tables + .collect(); + info!("Retrieved {} table mappings from cache", mappings.len()); + Ok(mappings) + } + + // Load persistent mappings into cache at startup + async fn load_table_mappings(&self) -> EtlResult { + info!("Loading table mappings from persistent storage into cache"); + let persistent = self.persistent.lock().await; + let mut cache = self.cache.lock().await; + + let mut loaded = 0; + for (id, pentry) in persistent.iter() { + if let Some(m) = &pentry.mapping { + // Load mapping into cache + let centry = Self::ensure_cache_slot(&mut cache, *id); + centry.mapping = Some(m.clone()); + loaded += 1; + } + } + info!("Loaded {} table mappings into cache", loaded); + Ok(loaded) + } + + // Store a new table mapping (source table -> destination table name) + async fn store_table_mapping( + &self, + source_table_id: TableId, + destination_table_id: String, + ) -> EtlResult<()> { + info!( + "Storing table mapping: {} -> {} (dual-write)", + source_table_id.0, destination_table_id + ); + + // First persist the mapping + { + let mut persistent = self.persistent.lock().await; + let p = Self::ensure_persistent_slot(&mut persistent, source_table_id); + p.mapping = Some(destination_table_id.clone()); + } + // Then update cache + { + let mut cache = self.cache.lock().await; + let c = Self::ensure_cache_slot(&mut cache, source_table_id); + c.mapping = Some(destination_table_id); + } + Ok(()) + } +} +``` + +**Result:** Your file should compile without errors when you run `cargo check`. + +## Step 4: Create HTTP Destination Implementation + +Create `src/http_destination.rs` with retry logic and proper error handling: + +```rust +use reqwest::{Client, Method}; +use serde_json::{Value, json}; +use std::time::Duration; +use tracing::{info, warn}; + +use etl::destination::Destination; +use etl::error::{ErrorKind, EtlError, EtlResult}; +use etl::types::{Event, TableId, TableRow}; +use etl::{bail, etl_error}; + +// Configuration constants for retry behavior +const MAX_RETRIES: usize = 3; // Try up to 3 times before giving up +const BASE_BACKOFF_MS: u64 = 500; // Start with 500ms delay, then exponential backoff + +pub struct HttpDestination { + client: Client, // HTTP client for making requests + base_url: String, // Base URL for the destination API (e.g., "https://api.example.com") +} + +impl HttpDestination { + /// Create a new HTTP destination that will send data to the specified base URL + pub fn new(base_url: String) -> EtlResult { + // Configure HTTP client with reasonable timeout + let client = Client::builder() + .timeout(Duration::from_secs(10)) // 10 second timeout for each request + .build() + .map_err(|e| etl_error!(ErrorKind::Unknown, "Failed to create HTTP client", e))?; + + info!("Created HTTP destination pointing to: {}", base_url); + Ok(Self { client, base_url }) + } + + /// Helper to construct full URLs by combining base URL with endpoint paths + fn url(&self, path: &str) -> String { + format!( + "{}/{}", + self.base_url.trim_end_matches('/'), // Remove trailing slash from base + path.trim_start_matches('/') // Remove leading slash from path + ) + } + + /// Generic HTTP sender with automatic retry logic and exponential backoff + /// This handles all the complex retry logic so individual methods can focus on data formatting + async fn send_json(&self, method: Method, path: &str, body: Option<&Value>) -> EtlResult<()> { + let url = self.url(path); + info!("Attempting HTTP {} to {}", method, url); + + // Retry loop with exponential backoff + for attempt in 0..MAX_RETRIES { + // Build the request + let mut req = self.client.request(method.clone(), &url); + if let Some(b) = body { + req = req.json(b); // Add JSON body if provided + } + + // Send the request and handle response + match req.send().await { + // Success case - 2xx status codes + Ok(resp) if resp.status().is_success() => { + info!( + "HTTP {} {} succeeded on attempt {}/{}", + method, url, attempt + 1, MAX_RETRIES + ); + return Ok(()); + } + // HTTP error response (4xx/5xx) + Ok(resp) => { + let status = resp.status(); + warn!( + "HTTP {} {} returned status {}, attempt {}/{}", + method, url, status, attempt + 1, MAX_RETRIES + ); + + // Don't retry client errors (4xx) - they won't succeed on retry + if status.is_client_error() { + bail!( + ErrorKind::Unknown, + "HTTP client error - not retrying", + format!("Status: {}", status) + ); + } + // Server errors (5xx) will be retried + } + // Network/connection errors - these are worth retrying + Err(e) => { + warn!( + "HTTP {} {} network error on attempt {}/{}: {}", + method, url, attempt + 1, MAX_RETRIES, e + ); + } + } + + // If this wasn't the last attempt, wait before retrying + if attempt + 1 < MAX_RETRIES { + // Exponential backoff: 500ms, 1s, 2s (attempt 0, 1, 2) + let delay = Duration::from_millis(BASE_BACKOFF_MS * 2u64.pow(attempt as u32)); + info!("Waiting {:?} before retry", delay); + tokio::time::sleep(delay).await; + } + } + + // All retries failed + bail!( + ErrorKind::Unknown, + "HTTP request failed after all retries", + format!("Exhausted {} attempts to {}", MAX_RETRIES, url) + ) + } +} + +// Implementation of ETL's Destination trait - this is what ETL calls to send data +impl Destination for HttpDestination { + /// Called when ETL needs to clear all data from a table (e.g., during full refresh) + async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> { + info!("Truncating destination table: {}", table_id.0); + + // Send DELETE request to truncate endpoint + self.send_json( + Method::DELETE, + &format!("tables/{}/truncate", table_id.0), // e.g., "tables/users/truncate" + None, // No body needed for truncate + ).await + } + + /// Called when ETL has a batch of rows to send to the destination + /// This is the main data flow method - gets called frequently during replication + async fn write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + ) -> EtlResult<()> { + // Skip empty batches - no work to do + if table_rows.is_empty() { + info!("Skipping empty batch for table {}", table_id.0); + return Ok(()); + } + + info!( + "Sending {} rows to destination table {}", + table_rows.len(), + table_id.0 + ); + + // Convert ETL's internal row format to JSON that our API expects + // In a real implementation, you'd format this according to your destination's schema + let rows_json: Vec<_> = table_rows + .iter() + .map(|row| { + json!({ + "values": row.values.iter() + .map(|v| format!("{:?}", v)) // Simple string conversion for demo + .collect::>() + }) + }) + .collect(); + + // Create the JSON payload our API expects + let payload = json!({ + "table_id": table_id.0, + "rows": rows_json + }); + + // Send POST request with the row data + self.send_json( + Method::POST, + &format!("tables/{}/rows", table_id.0), // e.g., "tables/users/rows" + Some(&payload), + ).await + } + + /// Called when ETL has replication events to send (e.g., transaction markers) + /// These are metadata events about the replication process itself + async fn write_events(&self, events: Vec) -> EtlResult<()> { + // Skip if no events to process + if events.is_empty() { + return Ok(()); + } + + info!("Sending {} replication events to destination", events.len()); + + // Convert events to JSON format + let events_json: Vec<_> = events + .iter() + .map(|event| { + json!({ + "event_type": format!("{:?}", event), // Convert event to string for demo + }) + }) + .collect(); + + let payload = json!({ "events": events_json }); + + // Send events to generic events endpoint + self.send_json(Method::POST, "events", Some(&payload)).await + } +} +``` + +**Result:** Run `cargo check` again - it should compile successfully with both your store and destination implementations. + +## Step 5: Create Working Pipeline Example + +Create `src/main.rs` that demonstrates your custom components in action: + +```rust +mod custom_store; +mod http_destination; + +use custom_store::CustomStore; +use http_destination::HttpDestination; +use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; +use etl::pipeline::Pipeline; +use tracing::{info, Level}; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging so we can see what our custom components are doing + tracing_subscriber::fmt() + .with_max_level(Level::INFO) + .init(); + + info!("=== Starting ETL Pipeline with Custom Components ==="); + + // Step 1: Create our custom store + // This will handle both schema storage and replication state tracking + info!("Creating custom dual-layer store (cache + persistent simulation)"); + let store = CustomStore::new(); + + // Step 2: Create our custom HTTP destination + // Using httpbin.org which echoes back what we send - perfect for testing + info!("Creating HTTP destination with retry logic"); + let destination = HttpDestination::new( + "https://httpbin.org/post".to_string() // This endpoint will show us what we sent + )?; + + // Step 3: Configure the PostgreSQL connection + // Update these values to match your local PostgreSQL setup + let pipeline_config = PipelineConfig { + id: 1, // Unique pipeline identifier + publication_name: "etl_demo_pub".to_string(), // PostgreSQL publication name + + // PostgreSQL connection details - CHANGE THESE to match your setup + pg_connection: PgConnectionConfig { + host: "localhost".to_string(), + port: 5432, + name: "postgres".to_string(), // Database name + username: "postgres".to_string(), // Database user + password: Some("postgres".to_string().into()), // Update with your password + tls: TlsConfig { + enabled: false, // Disable TLS for local development + trusted_root_certs: String::new() + }, + }, + + // Batching configuration - controls how ETL groups data for efficiency + batch: BatchConfig { + max_size: 100, // Send data when we have 100 rows + max_fill_ms: 5000 // Or send data every 5 seconds, whichever comes first + }, + + // Error handling configuration + table_error_retry_delay_ms: 10000, // Wait 10s before retrying failed tables + max_table_sync_workers: 2, // Use 2 workers for parallel table syncing + }; + + // Step 4: Create the pipeline with our custom components + // This combines your custom store and destination with ETL's core replication logic + info!("Creating ETL pipeline with custom store and HTTP destination"); + let mut pipeline = Pipeline::new(pipeline_config, store, destination); + + // Step 5: Start the pipeline + // This will: + // 1. Load any existing state from your custom store + // 2. Connect to PostgreSQL and start listening for changes + // 3. Begin replicating data through your custom destination + info!("Starting pipeline - this will connect to PostgreSQL and begin replication"); + pipeline.start().await?; + + // For demo purposes, let it run for 30 seconds then gracefully shut down + info!("Pipeline running! Watch the logs to see your custom components in action."); + info!("Will run for 30 seconds then shut down gracefully..."); + + tokio::time::sleep(Duration::from_secs(30)).await; + + info!("Shutting down pipeline gracefully..."); + // pipeline.shutdown().await?; // Uncomment if available in your ETL version + + // In production, you'd typically call: + // pipeline.wait().await?; // This blocks forever until manual shutdown + + info!("=== ETL Pipeline Demo Complete ==="); + Ok(()) +} +``` + +**Result:** Running `cargo run` should now start your pipeline and show detailed logs from your custom components. + +## Step 6: Test Your Implementation + +Verify your custom components work correctly: + +```bash +# Check that everything compiles +cargo check +``` + +**Result:** Should see "Finished dev [unoptimized + debuginfo] target(s)" + +```bash +# Run the pipeline (will fail without PostgreSQL setup, but shows component initialization) +cargo run +``` + +**Result:** You should see logs from your custom store being created and HTTP destination being configured. + +## Checkpoint: What You've Built + +You now have working custom ETL components: + +✅ **Custom Store**: Implements dual-layer caching with detailed logging +✅ **HTTP Destination**: Sends data via HTTP with automatic retry logic +✅ **Complete Pipeline**: Integrates both components with ETL's core engine +✅ **Proper Error Handling**: Follows ETL's error patterns and logging + +## Key Patterns You've Mastered + +**Store Architecture:** + +- Cache-first reads for performance +- Dual-write pattern for data consistency +- Startup loading from persistent storage +- Thread-safe concurrent access with Arc/Mutex + +**Destination Patterns:** + +- Exponential backoff retry logic +- Smart error classification (retry 5xx, fail 4xx) +- Efficient batching and empty batch handling +- Clean data transformation from ETL to API formats + +## Next Steps + +- **Connect to real PostgreSQL** → [Configure PostgreSQL for Replication](../how-to/configure-postgres.md) +- **Understand the architecture** → [ETL Architecture](../explanation/architecture.md) +- **Contribute to ETL** → [Open an issue](https://github.com/supabase/etl/issues) with your custom implementations + +## See Also + +- [ETL Architecture](../explanation/architecture.md) - Understanding the system design +- [API Reference](../reference/index.md) - Complete trait documentation +- [Build your first pipeline](first-pipeline.md) - Start with the basics if you haven't yet \ No newline at end of file diff --git a/docs/tutorials/first-pipeline.md b/docs/tutorials/first-pipeline.md new file mode 100644 index 000000000..0a902b08d --- /dev/null +++ b/docs/tutorials/first-pipeline.md @@ -0,0 +1,245 @@ + +# Build Your First ETL Pipeline + +**Learn the fundamentals by building a working pipeline in 15 minutes** + +By the end of this tutorial, you'll have a complete ETL pipeline that streams data changes from PostgreSQL to a memory destination in real-time. You'll see how to set up publications, configure pipelines, and handle live data replication. + +## What You'll Build + +A real-time data pipeline that: +- Monitors a PostgreSQL table for changes +- Streams INSERT, UPDATE, and DELETE operations +- Stores replicated data in memory for immediate access + +## Who This Tutorial Is For + +- Rust developers new to ETL +- Anyone interested in PostgreSQL logical replication +- Developers building data synchronization tools + +**Time required:** 15 minutes +**Difficulty:** Beginner + +## Safety Note + +This tutorial uses an isolated test database. To clean up, simply drop the test database when finished. No production data is affected. + +## Step 1: Set Up Your Environment + +Create a new Rust project for this tutorial: + +```bash +cargo new etl-tutorial +cd etl-tutorial +``` + +Add ETL to your dependencies in `Cargo.toml`: + +```toml +[dependencies] +etl = { git = "https://github.com/supabase/etl" } +tokio = { version = "1.0", features = ["full"] } +``` + +**Checkpoint:** Run `cargo check` - it should compile successfully. + +## Step 2: Prepare PostgreSQL + +Connect to your PostgreSQL server and create a test database: + +```sql +CREATE DATABASE etl_tutorial; +\c etl_tutorial + +-- Create a sample table +CREATE TABLE users ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL, + created_at TIMESTAMP DEFAULT NOW() +); + +-- Insert sample data +INSERT INTO users (name, email) VALUES + ('Alice Johnson', 'alice@example.com'), + ('Bob Smith', 'bob@example.com'); +``` + +Create a publication for replication: + +```sql +CREATE PUBLICATION my_publication FOR TABLE users; +``` + +**Checkpoint:** Verify the publication exists: +```sql +SELECT * FROM pg_publication WHERE pubname = 'my_publication'; +``` +You should see one row returned. + +## Step 3: Configure Your Pipeline + +Replace the contents of `src/main.rs`: + +```rust +use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; +use etl::pipeline::Pipeline; +use etl::destination::memory::MemoryDestination; +use etl::store::both::memory::MemoryStore; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Configure PostgreSQL connection. + let pg_connection_config = PgConnectionConfig { + host: "localhost".to_string(), + port: 5432, + name: "postgres".to_string(), + username: "postgres".to_string(), + password: Some("your_password".to_string().into()), + tls: TlsConfig { + trusted_root_certs: String::new(), + enabled: false, + }, + }; + + // Configure pipeline behavior. + let pipeline_config = PipelineConfig { + id: 1, + publication_name: "my_publication".to_string(), + pg_connection: pg_connection_config, + batch: BatchConfig { + max_size: 1000, + max_fill_ms: 5000, + }, + table_error_retry_delay_ms: 10000, + max_table_sync_workers: 4, + }; + + // Create stores and destination. + let store = MemoryStore::new(); + let destination = MemoryDestination::new(); + + // We spawn a task to periodically print the content of the destination. + let destination_clone = destination.clone(); + tokio::spawn(async move { + loop { + println!("Destination Contents At This Time\n"); + + // Table rows are the initial rows in the table that are copied. + for (table_id, table_rows) in destination_clone.table_rows().await { + println!("Table ({:?}): {:?}", table_id, table_rows); + } + + // Events are realtime events that are sent by Postgres after the table has been copied. + for event in destination_clone.events().await { + println!("Event: {:?}", event); + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + print!("\n\n"); + } + }); + + println!("Starting ETL pipeline..."); + + // Create and start the pipeline. + let mut pipeline = Pipeline::new(pipeline_config, store, destination); + pipeline.start().await?; + + println!("Waiting for pipeline to finish..."); + + // Wait for the pipeline to finish, without a shutdown signal it will continue to work until the + // connection is closed. + pipeline.wait().await?; + + Ok(()) +} +``` + +**Important:** Replace `"your_password"` with your PostgreSQL password. + +## Step 4: Start Your Pipeline + +Run your pipeline: + +```bash +cargo run +``` + +You should see output like: +``` +Starting ETL pipeline... +Waiting for pipeline to finish... + +Destination Contents At This Time + +Destination Contents At This Time + +Table (TableId(32341)): [TableRow { values: [I32(1), String("Alice"), String("alice@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }, TableRow { values: [I32(2), String("Bob"), String("bob@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }, TableRow { values: [I32(3), String("Charlie"), String("charlie@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }] +Table (TableId(245615)): [TableRow { values: [I32(1), Array(I32([Some(1), Some(2), None, Some(4)]))] }, TableRow { values: [I32(2), Array(I32([None, None, Some(3)]))] }, TableRow { values: [I32(3), Array(I32([Some(5), None]))] }, TableRow { values: [I32(4), Array(I32([None]))] }, TableRow { values: [I32(5), Null] }] +``` + +**Checkpoint:** Your pipeline is now running and has completed initial synchronization. + +## Step 5: Test Real-Time Replication + +With your pipeline running, open a new terminal and connect to PostgreSQL: + +```bash +psql -d etl_tutorial +``` + +Make some changes to test replication: + +```sql +-- Insert a new user +INSERT INTO users (name, email) VALUES ('Charlie Brown', 'charlie@example.com'); + +-- Update an existing user +UPDATE users SET name = 'Alice Cooper' WHERE email = 'alice@example.com'; + +-- Delete a user +DELETE FROM users WHERE email = 'bob@example.com'; +``` + +**Checkpoint:** In your pipeline terminal, you should see log messages indicating these changes were captured and processed. + +## Step 6: Verify Data Replication + +The data is now replicated in your memory destination. While this tutorial uses memory (perfect for testing), the same pattern works with BigQuery, DuckDB, or custom destinations. + +**Checkpoint:** You've successfully built and tested a complete ETL pipeline! + +## What You've Learned + +You've mastered the core ETL concepts: + +- **Publications** define which tables to replicate +- **Pipeline configuration** controls behavior and performance +- **Memory destinations** provide fast, local testing +- **Real-time replication** captures all data changes automatically + +## Cleanup + +Remove the test database: + +```sql +DROP DATABASE etl_tutorial; +``` + +## Next Steps + +Now that you understand the basics: + +- **Build custom implementations** → [Custom Stores and Destinations](custom-implementations.md) +- **Configure PostgreSQL properly** → [Configure PostgreSQL for Replication](../how-to/configure-postgres.md) +- **Understand the architecture** → [ETL Architecture](../explanation/architecture.md) + +## See Also + +- [Custom Implementation Tutorial](custom-implementations.md) - Advanced patterns +- [API Reference](../reference/index.md) - Complete configuration options +- [ETL Architecture](../explanation/architecture.md) - Understand the design \ No newline at end of file diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md index 9cc1257f1..9c19c52ce 100644 --- a/docs/tutorials/index.md +++ b/docs/tutorials/index.md @@ -1,4 +1,55 @@ + # Tutorials -!!! info "Coming Soon" - This page is under development. \ No newline at end of file +**Learn ETL through guided, hands-on experiences** + +Tutorials provide step-by-step learning paths that take you from zero knowledge to working applications. Each tutorial is designed to be completed successfully by following the exact steps provided. + +## Getting Started + +### [Build Your First ETL Pipeline](first-pipeline.md) +**15 minutes** • **Beginner** + +Create a complete ETL pipeline that replicates data from PostgreSQL to a memory destination. You'll learn the core concepts of publications, replication slots, and pipeline configuration. + +*What you'll build:* A working pipeline that streams changes from a sample PostgreSQL table to an in-memory destination. + + +## Advanced Topics + +### [Build Custom Stores and Destinations](custom-implementations.md) +**45 minutes** • **Advanced** + +Implement production-ready custom stores and destinations. Learn ETL's design patterns, build persistent SQLite storage, and create HTTP-based destinations with retry logic. + +*What you'll build:* Custom in-memory store for state/schema storage and HTTP destination. + +## Before You Start + +**Prerequisites for all tutorials:** + +- Rust installed (1.75 or later) +- PostgreSQL server (local or remote) +- Basic familiarity with Rust and SQL + +**What you'll need:** + +- A terminal/command line +- Your favorite text editor +- About 30-60 minutes total time + +## Next Steps + +After completing the tutorials: + +- **Solve specific problems** → [How-To Guides](../how-to/index.md) +- **Understand the architecture** → [ETL Architecture](../explanation/architecture.md) + +## Need Help? + +If you get stuck: + +1. Double-check the prerequisites +2. Ensure your PostgreSQL setup matches the requirements +3. Check the [PostgreSQL configuration guide](../how-to/configure-postgres.md) +4. [Open an issue](https://github.com/supabase/etl/issues) with your specific problem \ No newline at end of file diff --git a/docs/tutorials/memory-destination.md b/docs/tutorials/memory-destination.md deleted file mode 100644 index e97cc0889..000000000 --- a/docs/tutorials/memory-destination.md +++ /dev/null @@ -1,4 +0,0 @@ -# Memory Destination - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/docs/tutorials/testing-pipelines.md b/docs/tutorials/testing-pipelines.md deleted file mode 100644 index 44bd431da..000000000 --- a/docs/tutorials/testing-pipelines.md +++ /dev/null @@ -1,4 +0,0 @@ -# Testing Pipelines - -!!! info "Coming Soon" - This page is under development. \ No newline at end of file diff --git a/etl-api/README.md b/etl-api/README.md index 7286aa49f..721e6e5ae 100644 --- a/etl-api/README.md +++ b/etl-api/README.md @@ -1,4 +1,4 @@ -# `etl` API +# `etl` - API This API service provides a RESTful interface for managing PostgreSQL replication pipelines. It enables you to: @@ -26,7 +26,24 @@ This API service provides a RESTful interface for managing PostgreSQL replicatio ## Prerequisites -Before you begin, please refer to our [Database Setup Guide](../docs/guides/database-setup.md). +Before running the API, you must have: + +- A running PostgreSQL instance reachable via `DATABASE_URL`. +- The `etl-api` database schema applied (SQLx migrations). + +Quickest way: use the setup script to start Postgres (via Docker) and run migrations automatically. + +```bash +# Starts a local Postgres (if needed) and applies etl-api migrations +./scripts/init_db.sh +``` + +Alternative: if you already have a Postgres database, set `DATABASE_URL` and apply migrations manually: + +```bash +export DATABASE_URL=postgres://USER:PASSWORD@HOST:PORT/DB +sqlx migrate run --source etl-api/migrations +``` ## Development @@ -45,7 +62,7 @@ sqlx migrate add To apply all pending migrations: ```bash -sqlx migrate run +sqlx migrate run --source etl-api/migrations ``` #### Resetting Database diff --git a/etl-benchmarks/README.md b/etl-benchmarks/README.md index 59d1e122a..1fe11781f 100644 --- a/etl-benchmarks/README.md +++ b/etl-benchmarks/README.md @@ -9,7 +9,7 @@ Performance benchmarks for the ETL system to measure and track replication perfo ## Prerequisites Before running benchmarks, ensure you have: -- A PostgreSQL database set up (see [Database Setup Guide](../docs/guides/database-setup.md)) +- A PostgreSQL database set up - A publication created with the tables you want to benchmark - For BigQuery benchmarks: GCP project, dataset, and service account key file diff --git a/etl-examples/README.md b/etl-examples/README.md index 11c32885c..4f498a193 100644 --- a/etl-examples/README.md +++ b/etl-examples/README.md @@ -6,7 +6,7 @@ This crate contains practical examples demonstrating how to use the ETL system f - **BigQuery Integration**: Demonstrates replicating PostgreSQL data to Google BigQuery -## Quickstart +## Quick Start To quickly try out `etl`, you can run the BigQuery example. First, create a publication in Postgres which includes the tables you want to replicate: @@ -34,7 +34,7 @@ In the above example, `etl` connects to a Postgres database named `postgres` run ## Prerequisites -Before running the examples, you'll need to set up a PostgreSQL database. For detailed instructions on how to use the database setup script, please refer to our [Database Setup Guide](../docs/guides/database-setup.md). +Before running the examples, you'll need to set up a PostgreSQL database with logical replication enabled. ## BigQuery Setup @@ -44,4 +44,4 @@ To run the BigQuery example, you'll need: 2. A service account key file with BigQuery permissions 3. A BigQuery dataset created in your project -The example will automatically create tables in the specified dataset based on your PostgreSQL schema. \ No newline at end of file +The example will automatically create tables in the specified dataset based on your PostgreSQL schema. diff --git a/mkdocs.yaml b/mkdocs.yaml index 204444230..bb3d6145a 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -10,28 +10,16 @@ nav: - Home: index.md - Tutorials: - Overview: tutorials/index.md - - Installation: getting-started/installation.md - - Quick Start: getting-started/quickstart.md - - Your First Pipeline: getting-started/first-pipeline.md - - Working with Destinations: tutorials/memory-destination.md - - Testing Pipelines: tutorials/testing-pipelines.md + - Your First Pipeline: tutorials/first-pipeline.md + - Custom Stores and Destinations: tutorials/custom-implementations.md - How-to Guides: - Overview: how-to/index.md - Configure PostgreSQL: how-to/configure-postgres.md - - Handle Schema Changes: how-to/schema-changes.md - - Create Custom Destinations: how-to/custom-destinations.md - - Debug Replication Issues: how-to/debugging.md - - Optimize Performance: how-to/performance.md - - Test Your Pipelines: how-to/testing.md - Reference: - Overview: reference/index.md - Explanation: - Overview: explanation/index.md - Architecture: explanation/architecture.md - - Replication Protocol: explanation/replication.md - - Design Philosophy: explanation/design.md - - Performance Characteristics: explanation/performance.md - - Crate Structure: explanation/crate-structure.md theme: name: "material" @@ -73,6 +61,9 @@ theme: extra_css: - stylesheets/extra.css +extra_javascript: + - https://unpkg.com/mermaid@10.6.1/dist/mermaid.min.js + extra: social: - icon: fontawesome/brands/x-twitter @@ -91,7 +82,11 @@ markdown_extensions: guess_lang: false use_pygments: true pygments_style: default - - pymdownx.superfences + - pymdownx.superfences: + custom_fences: + - name: mermaid + class: mermaid + format: !!python/name:pymdownx.superfences.fence_code_format - pymdownx.tabbed: alternate_style: true - pymdownx.snippets