Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
[workspace]
resolver = "2"
members = [
"api",
"pg_replicate",
"postgres",
"replicator",
"telemetry"
]
members = ["api", "supabase_etl", "postgres", "replicator", "telemetry"]

[workspace.dependencies]
api = { path = "api" }
pg_replicate = { path = "pg_replicate" }
supabase_etl = { path = "supabase_etl" }
postgres = { path = "postgres" }
replicator = { path = "replicator" }
telemetry = { path = "telemetry" }
Expand Down
126 changes: 63 additions & 63 deletions README.md

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions api/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# `pg_replicate` API
# `supabase_etl` API

This API service provides a RESTful interface for managing PostgreSQL replication pipelines. It enables you to:

- Create and manage replication pipelines between PostgreSQL sources and sinks
- Create and manage replication pipelines between PostgreSQL sources and destinations
- Handle multi-tenant replication configurations
- Manage publications and tables for replication
- Control pipeline lifecycle (start/stop/status)
- Secure configuration with encryption
- Deploy and manage replicators in Kubernetes

## Table of Contents

- [Local Setup](#local-setup)
- [Database Management](#database-management)
- [Development](#development)
Expand All @@ -18,35 +19,41 @@ This API service provides a RESTful interface for managing PostgreSQL replicatio
## Local Setup

### Prerequisites

Before you begin, ensure you have the following installed:

- PostgreSQL client (`psql`)
- SQLx CLI (`cargo install --version='~0.7' sqlx-cli --no-default-features --features rustls,postgres`)
- Rust toolchain

## Database Management

### Initial Setup

To set up and initialize the database, run the following command from the main directory:

```bash
./scripts/init_db.sh
```

This script will:

1. Check for required dependencies (psql and sqlx)
2. Start a PostgreSQL container if one isn't already running
3. Create the database if it doesn't exist
4. Run all migrations

### Environment Variables

You can customize the database setup using these environment variables:

| Variable | Description | Default |
|------------------------|--------------------------------|-----------------|
| ---------------------- | ------------------------------ | --------------- |
| `POSTGRES_DATA_VOLUME` | Data volume path | ./postgres_data |
| `SKIP_DOCKER` | Skip Docker container creation | false |

Example usage:

```bash
POSTGRES_DATA_VOLUME="~/postgres_data" ./scripts/init_db.sh
```
Expand All @@ -56,25 +63,33 @@ POSTGRES_DATA_VOLUME="~/postgres_data" ./scripts/init_db.sh
### Database Migrations

#### Adding a New Migration

To create a new migration file:

```bash
sqlx migrate add <migration-name>
```

#### Running Migrations

To apply all pending migrations:

```bash
sqlx migrate run
```

#### Resetting Database

To reset the database to its initial state:

```bash
sqlx migrate reset
```

#### Updating SQLx Metadata

After making changes to the database schema, update the SQLx metadata:

```bash
cargo sqlx prepare
```
```
2 changes: 1 addition & 1 deletion api/tests/common/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use sqlx::PgPool;
/// from the "./migrations" directory after creation. Returns a [`PgPool`]
/// connected to the newly created and migrated database. Panics if database
/// creation or migration fails.
pub async fn create_pg_replicate_api_database(options: &PgDatabaseOptions) -> PgPool {
pub async fn create_supabase_etl_api_database(options: &PgDatabaseOptions) -> PgPool {
let connection_pool = create_pg_database(options).await;

sqlx::migrate!("./migrations")
Expand Down
2 changes: 1 addition & 1 deletion api/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Common test utilities for pg_replicate API tests.
//! Common test utilities for supabase_etl API tests.
//!
//! This module provides shared functionality used across integration tests:
//!
Expand Down
4 changes: 2 additions & 2 deletions api/tests/common/test_app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::database::create_pg_replicate_api_database;
use crate::common::database::create_supabase_etl_api_database;
use api::{
configuration::{get_settings, Settings},
db::{pipelines::PipelineConfig, sinks::SinkConfig, sources::SourceConfig},
Expand Down Expand Up @@ -536,7 +536,7 @@ pub async fn spawn_test_app() -> TestApp {
// We use a random database name.
settings.database.name = Uuid::new_v4().to_string();

let connection_pool = create_pg_replicate_api_database(&settings.database).await;
let connection_pool = create_supabase_etl_api_database(&settings.database).await;

let key = generate_random_key::<32>().expect("failed to generate random key");
let encryption_key = encryption::EncryptionKey { id: 0, key };
Expand Down
2 changes: 1 addition & 1 deletion replicator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
pg_replicate = { workspace = true, features = ["bigquery"] }
supabase_etl = { workspace = true, features = ["bigquery"] }
postgres = { workspace = true, features = ["tokio"] }
telemetry = { workspace = true }

Expand Down
10 changes: 5 additions & 5 deletions replicator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use std::{io::BufReader, time::Duration, vec};
use configuration::{
get_configuration, BatchSettings, Settings, SinkSettings, SourceSettings, TlsSettings,
};
use pg_replicate::{
use postgres::tokio::options::PgDatabaseOptions;
use supabase_etl::{
pipeline::{
batching::{data_pipeline::BatchDataPipeline, BatchConfig},
sinks::bigquery::BigQueryBatchSink,
destinations::bigquery::BigQueryBatchDestination,
sources::postgres::{PostgresSource, TableNamesFrom},
PipelineAction,
},
SslMode,
};
use postgres::tokio::options::PgDatabaseOptions;
use telemetry::init_tracing;
use tracing::{info, instrument};

Expand Down Expand Up @@ -131,7 +131,7 @@ async fn start_replication(settings: Settings) -> anyhow::Result<()> {
max_staleness_mins,
} = settings.sink;

let bigquery_sink = BigQueryBatchSink::new_with_key(
let bigquery_destination = BigQueryBatchDestination::new_with_key(
project_id,
dataset_id,
&service_account_key,
Expand All @@ -147,7 +147,7 @@ async fn start_replication(settings: Settings) -> anyhow::Result<()> {
let batch_config = BatchConfig::new(max_size, Duration::from_secs(max_fill_secs));
let mut pipeline = BatchDataPipeline::new(
postgres_source,
bigquery_sink,
bigquery_destination,
PipelineAction::Both,
batch_config,
);
Expand Down
2 changes: 1 addition & 1 deletion scripts/init_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -eo pipefail

if [ ! -d "api/migrations" ]; then
echo >&2 "❌ Error: 'api/migrations' folder not found."
echo >&2 "Please run this script from the 'pg_replicate' directory."
echo >&2 "Please run this script from the 'supabase_etl' directory."
exit 1
fi

Expand Down
8 changes: 4 additions & 4 deletions pg_replicate/Cargo.toml → supabase_etl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "pg_replicate"
name = "supabase_etl"
version = "0.1.0"
edition = "2021"

Expand All @@ -18,7 +18,7 @@ name = "stdout"
required-features = ["stdout"]

[dependencies]
postgres = { workspace = true, features = ["tokio"]}
postgres = { workspace = true, features = ["tokio"] }

async-trait = { workspace = true }
bigdecimal = { workspace = true, features = ["std"] }
Expand Down Expand Up @@ -52,7 +52,7 @@ tracing = { workspace = true, default-features = true }
uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
postgres = { workspace = true, features = ["test-utils", "tokio"]}
postgres = { workspace = true, features = ["test-utils", "tokio"] }

clap = { workspace = true, default-features = true, features = [
"std",
Expand All @@ -69,4 +69,4 @@ duckdb = ["dep:duckdb"]
stdout = []
# When enabled converts unknown types to bytes
unknown_types_to_bytes = []
default = ["unknown_types_to_bytes"]
default = ["unknown_types_to_bytes"]
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::{error::Error, time::Duration};

use clap::{Args, Parser, Subcommand};
use pg_replicate::{
use postgres::schema::TableName;
use postgres::tokio::options::PgDatabaseOptions;
use supabase_etl::{
pipeline::{
batching::{data_pipeline::BatchDataPipeline, BatchConfig},
sinks::bigquery::BigQueryBatchSink,
destinations::bigquery::BigQueryBatchDestination,
sources::postgres::{PostgresSource, TableNamesFrom},
PipelineAction,
},
SslMode,
};
use postgres::schema::TableName;
use postgres::tokio::options::PgDatabaseOptions;
use tracing::error;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -155,7 +155,7 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
}
};

let bigquery_sink = BigQueryBatchSink::new_with_key_path(
let bigquery_destination = BigQueryBatchDestination::new_with_key_path(
bq_args.bq_project_id,
bq_args.bq_dataset_id,
&bq_args.bq_sa_key_file,
Expand All @@ -167,7 +167,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
bq_args.max_batch_size,
Duration::from_secs(bq_args.max_batch_fill_duration_secs),
);
let mut pipeline = BatchDataPipeline::new(postgres_source, bigquery_sink, action, batch_config);
let mut pipeline =
BatchDataPipeline::new(postgres_source, bigquery_destination, action, batch_config);

pipeline.start().await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::{error::Error, time::Duration};

use clap::{Args, Parser, Subcommand};
use pg_replicate::{
use postgres::schema::TableName;
use postgres::tokio::options::PgDatabaseOptions;
use supabase_etl::{
pipeline::{
batching::{data_pipeline::BatchDataPipeline, BatchConfig},
sinks::duckdb::DuckDbSink,
destinations::duckdb::DuckDbDestination,
sources::postgres::{PostgresSource, TableNamesFrom},
PipelineAction,
},
SslMode,
};
use postgres::schema::TableName;
use postgres::tokio::options::PgDatabaseOptions;
use tracing::error;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -149,22 +149,23 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
}
};

let duckdb_sink = match (
let duckdb_destination = match (
db_args.duckdb.duckdb_file,
db_args.duckdb.motherduck_access_token,
db_args.duckdb.motherduck_db_name,
) {
(Some(duckdb_file), None, None) => DuckDbSink::file(duckdb_file).await?,
(Some(duckdb_file), None, None) => DuckDbDestination::file(duckdb_file).await?,
(None, Some(access_token), Some(db_name)) => {
DuckDbSink::mother_duck(&access_token, &db_name).await?
DuckDbDestination::mother_duck(&access_token, &db_name).await?
}
_ => {
unreachable!()
}
};

let batch_config = BatchConfig::new(1000, Duration::from_secs(10));
let mut pipeline = BatchDataPipeline::new(postgres_source, duckdb_sink, action, batch_config);
let mut pipeline =
BatchDataPipeline::new(postgres_source, duckdb_destination, action, batch_config);

pipeline.start().await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::{error::Error, time::Duration};

use clap::{Args, Parser, Subcommand};
use pg_replicate::{
use postgres::schema::TableName;
use postgres::tokio::options::PgDatabaseOptions;
use supabase_etl::{
pipeline::{
batching::{data_pipeline::BatchDataPipeline, BatchConfig},
sinks::stdout::StdoutSink,
destinations::stdout::StdoutDestination,
sources::postgres::{PostgresSource, TableNamesFrom},
PipelineAction,
},
SslMode,
};
use postgres::schema::TableName;
use postgres::tokio::options::PgDatabaseOptions;
use tracing::error;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -125,10 +125,11 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
}
};

let stdout_sink = StdoutSink;
let stdout_destination = StdoutDestination;

let batch_config = BatchConfig::new(1000, Duration::from_secs(10));
let mut pipeline = BatchDataPipeline::new(postgres_source, stdout_sink, action, batch_config);
let mut pipeline =
BatchDataPipeline::new(postgres_source, stdout_destination, action, batch_config);

pipeline.start().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl BigQueryClient {
loop {
let (rows, num_processed_rows) =
StorageApi::create_rows(table_descriptor, table_rows, MAX_SIZE_BYTES);
let trace_id = "pg_replicate bigquery client".to_string();
let trace_id = "supabase_etl bigquery client".to_string();
let mut response_stream = self
.client
.storage_mut()
Expand Down Expand Up @@ -991,7 +991,7 @@ impl ArrayCell {
/// Converts a [`TableSchema`] to [`TableDescriptor`].
///
/// This function is defined here and doesn't use the [`From`] trait because it's not possible since
/// [`TableSchema`] is in another crate and we don't want to pollute the `postgres` crate with sink
/// [`TableSchema`] is in another crate and we don't want to pollute the `postgres` crate with destination
/// specific internals.
pub fn table_schema_to_descriptor(table_schema: &TableSchema) -> TableDescriptor {
let mut field_descriptors = Vec::with_capacity(table_schema.column_schemas.len());
Expand Down
Loading
Loading