Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
9974e25
wip
abhi-airspace-intelligence Aug 26, 2025
67657e8
Actually get working
abhi-airspace-intelligence Aug 26, 2025
e2f8e0c
Get stuff working
abhi-airspace-intelligence Aug 26, 2025
8141bf6
Get integration tests to pass
abhi-airspace-intelligence Aug 26, 2025
f7e466c
Trigger CI
abhi-airspace-intelligence Aug 26, 2025
56bb6d1
Pass storage_options directly instead of reading from env
abhi-airspace-intelligence Aug 27, 2025
a650584
Clippy
abhi-airspace-intelligence Aug 27, 2025
db87503
Implement proper support for datatypes
abhi-airspace-intelligence Aug 27, 2025
af4a628
Clippy
abhi-airspace-intelligence Aug 28, 2025
59135d6
sigh
abhi-airspace-intelligence Aug 28, 2025
8145044
Bump deltalake and delta_kernel to latest
abhi-airspace-intelligence Aug 28, 2025
61d6291
Fix deprecated usage
abhi-airspace-intelligence Aug 28, 2025
079804b
Disable coveralls debug mode
abhi-airspace-intelligence Aug 28, 2025
cd94766
Correctly parse decimal/numeric types
abhi-airspace-intelligence Aug 28, 2025
15d7713
Vibecode some stuff to clean up tomorrow
abhi-airspace-intelligence Aug 28, 2025
7babad3
blah
abhi-airspace-intelligence Aug 28, 2025
3022994
Merge conflicts + remove explict delta_kernel dependency
abhi-airspace-intelligence Sep 2, 2025
025d545
Disable test temporarily
abhi-airspace-intelligence Sep 2, 2025
51854ca
rename `delta` -> `deltalake`
abhi-airspace-intelligence Sep 2, 2025
c8952ab
Temporarily disable table truncation
abhi-airspace-intelligence Sep 3, 2025
46351d7
Clean up dependencies
abhi-airspace-intelligence Sep 3, 2025
205c963
Feature flag deltalake
abhi-airspace-intelligence Sep 4, 2025
ab7f08e
rebase conflict
abhi-airspace-intelligence Sep 5, 2025
87b18a5
Clippy
abhi-airspace-intelligence Sep 10, 2025
56aeea4
fmt
abhi-airspace-intelligence Sep 10, 2025
86e6b6f
wip
abhi-airspace-intelligence Sep 13, 2025
4191a75
more wip
abhi-airspace-intelligence Sep 14, 2025
2143c3b
wip
abhi-airspace-intelligence Sep 14, 2025
f873ef6
reduce clone frequency
abhi-airspace-intelligence Sep 14, 2025
3b9b420
more cleanup
abhi-airspace-intelligence Sep 14, 2025
7865987
more wip
abhi-airspace-intelligence Sep 14, 2025
10692d3
clippy/rename
abhi-airspace-intelligence Sep 14, 2025
20b126e
Clean up dependencies
abhi-airspace-intelligence Sep 14, 2025
8c3e565
Fix schema mapping
abhi-airspace-intelligence Sep 14, 2025
7e011ca
Make merges work
abhi-airspace-intelligence Sep 14, 2025
8e2ec2b
Enable rustls feature
abhi-airspace-intelligence Sep 14, 2025
1262555
Fix merges and switch to snapshot testing
abhi-airspace-intelligence Sep 15, 2025
9f071f2
merge conflict
abhi-airspace-intelligence Sep 15, 2025
8b9aa8b
another merge conflict
abhi-airspace-intelligence Sep 15, 2025
659e647
Move arrow encoding to a separate module
abhi-airspace-intelligence Sep 15, 2025
a1e9a46
add back parquet features that were removed
abhi-airspace-intelligence Sep 15, 2025
f0d6837
fmt
abhi-airspace-intelligence Sep 15, 2025
83430b2
Implement delete from table
abhi-airspace-intelligence Sep 15, 2025
29dde11
Add tests for appends
abhi-airspace-intelligence Sep 15, 2025
09bf24b
fmt
abhi-airspace-intelligence Sep 15, 2025
201381f
wip add delta benchmark
abhi-airspace-intelligence Sep 15, 2025
fc13149
Add tpch seeder image
abhi-airspace-intelligence Sep 16, 2025
db8b797
Remove more unncessary clones
abhi-airspace-intelligence Sep 16, 2025
4f39731
cleanup integration tests a bit
abhi-airspace-intelligence Sep 17, 2025
373f3e0
Add zorder/compact support
abhi-airspace-intelligence Sep 17, 2025
d1fc642
Bump to git deltalake
abhi-airspace-intelligence Sep 22, 2025
1175b18
fmt
abhi-airspace-intelligence Sep 22, 2025
8cfb9f0
Merge conflicts
abhi-airspace-intelligence Sep 22, 2025
711d08a
Increase disk size for CI/CD
abhi-airspace-intelligence Sep 22, 2025
d9fbf37
Rework maintenance tasks
abhi-airspace-intelligence Sep 24, 2025
e38846a
Update delta-rs version
abhi-airspace-intelligence Sep 24, 2025
273ab47
Remove maximize build space job
abhi-airspace-intelligence Sep 24, 2025
87b2f68
Fix imports
abhi-airspace-intelligence Sep 24, 2025
c08885e
Remove arrow dependency
abhi-airspace-intelligence Sep 24, 2025
2fdf2f5
wip refactor arrow schema
abhi-airspace-intelligence Sep 24, 2025
3795682
Continue refactor
abhi-airspace-intelligence Sep 24, 2025
1b2db55
Mostly finish the refactor
abhi-airspace-intelligence Sep 25, 2025
9de4f17
Fix Decimal128 mapping
abhi-airspace-intelligence Sep 25, 2025
e3c3383
Improve data mapping integration test
abhi-airspace-intelligence Sep 25, 2025
7f85c76
Add compaction test
abhi-airspace-intelligence Sep 25, 2025
f1521ae
Ensure keys are qualified
abhi-airspace-intelligence Sep 25, 2025
89ab186
Add more tracing and instrumentation
abhi-airspace-intelligence Sep 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,3 @@ jobs:
fail-on-error: false
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: lcov.info
debug: true
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ pyvenv.cfg

# Log files
*.log

lcov.info
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ actix-web = { version = "4.11.0", default-features = false }
actix-web-httpauth = { version = "0.8.2", default-features = false }
actix-web-metrics = { version = "0.3.0", default-features = false }
anyhow = { version = "1.0.98", default-features = false }
arrow = { version = "55.0", default-features = false }
arrow = { version = "56.2.0", default-features = false }
async-trait = { version = "0.1.88" }
aws-lc-rs = { version = "1.13.3", default-features = false }
base64 = { version = "0.22.1", default-features = false }
Expand All @@ -43,6 +43,8 @@ clap = { version = "4.5.42", default-features = false }
config = { version = "0.14", default-features = false }
const-oid = { version = "0.9.6", default-features = false }
constant_time_eq = { version = "0.4.2" }
dashmap = { version = "6.1.0", default-features = false }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "d30b11f673b0111dbb0f904bf89d5b917ea652ed", default-features = false }
fail = { version = "0.5.1", default-features = false }
futures = { version = "0.3.31", default-features = false }
gcp-bigquery-client = { version = "0.27.0", default-features = false }
Expand All @@ -53,7 +55,7 @@ k8s-openapi = { version = "0.25.0", default-features = false }
kube = { version = "1.1.0", default-features = false }
metrics = { version = "0.24.2", default-features = false }
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
parquet = { version = "55.0", default-features = false }
parquet = { version = "55.0.0", default-features = false }
pg_escape = { version = "0.1.1", default-features = false }
pin-project-lite = { version = "0.2.16", default-features = false }
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
Expand Down Expand Up @@ -84,3 +86,7 @@ x509-cert = { version = "0.2.2", default-features = false }

[profile.bench]
debug = true

[profile.dev.package]
insta.opt-level = 3
similar.opt-level = 3
81 changes: 81 additions & 0 deletions etl-api/src/configs/destination.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use etl_config::SerializableSecretString;
use etl_config::shared::DestinationConfig;
use secrecy::ExposeSecret;
Expand All @@ -14,6 +16,7 @@ const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 8;

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum FullApiDestinationConfig {
Memory,
BigQuery {
Expand All @@ -30,6 +33,16 @@ pub enum FullApiDestinationConfig {
#[serde(skip_serializing_if = "Option::is_none")]
max_concurrent_streams: Option<usize>,
},
DeltaLake {
#[schema(example = "s3://my-bucket/my-path")]
base_uri: String,
#[schema(example = "{\"aws_access_key_id\": \"https://my-endpoint.com\"}")]
storage_options: Option<HashMap<String, String>>,
#[schema(example = "{\"my_table\": [\"date\"]}")]
partition_columns: Option<HashMap<String, Vec<String>>>,
#[schema(example = 100)]
optimize_after_commits: Option<u64>,
},
}

impl From<StoredDestinationConfig> for FullApiDestinationConfig {
Expand All @@ -49,6 +62,17 @@ impl From<StoredDestinationConfig> for FullApiDestinationConfig {
max_staleness_mins,
max_concurrent_streams: Some(max_concurrent_streams),
},
StoredDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
},
}
}
}
Expand All @@ -64,6 +88,12 @@ pub enum StoredDestinationConfig {
max_staleness_mins: Option<u16>,
max_concurrent_streams: usize,
},
DeltaLake {
base_uri: String,
storage_options: Option<HashMap<String, String>>,
partition_columns: Option<HashMap<String, Vec<String>>>,
optimize_after_commits: Option<u64>,
},
}

impl StoredDestinationConfig {
Expand All @@ -83,6 +113,17 @@ impl StoredDestinationConfig {
max_staleness_mins,
max_concurrent_streams,
},
Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => DestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
},
}
}
}
Expand All @@ -105,6 +146,17 @@ impl From<FullApiDestinationConfig> for StoredDestinationConfig {
max_concurrent_streams: max_concurrent_streams
.unwrap_or(DEFAULT_MAX_CONCURRENT_STREAMS),
},
FullApiDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
},
}
}
}
Expand Down Expand Up @@ -136,12 +188,24 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
max_concurrent_streams,
})
}
Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Ok(EncryptedStoredDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
}),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum EncryptedStoredDestinationConfig {
Memory,
BigQuery {
Expand All @@ -151,6 +215,12 @@ pub enum EncryptedStoredDestinationConfig {
max_staleness_mins: Option<u16>,
max_concurrent_streams: usize,
},
DeltaLake {
base_uri: String,
storage_options: Option<HashMap<String, String>>,
partition_columns: Option<HashMap<String, Vec<String>>>,
optimize_after_commits: Option<u64>,
},
}

impl Store for EncryptedStoredDestinationConfig {}
Expand Down Expand Up @@ -182,6 +252,17 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
max_concurrent_streams,
})
}
Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Ok(StoredDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
}),
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion etl-benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ rust-version.workspace = true
repository.workspace = true
homepage.workspace = true

[features]
default = ["bigquery", "deltalake"]
bigquery = ["etl-destinations/bigquery"]
deltalake = ["etl-destinations/deltalake"]

[dev-dependencies]
etl = { workspace = true, features = ["test-utils"] }
etl-config = { workspace = true }
etl-destinations = { workspace = true, features = ["bigquery"] }
etl-destinations = { workspace = true }
etl-postgres = { workspace = true, features = ["sqlx"] }
etl-telemetry = { workspace = true }

Expand Down
14 changes: 14 additions & 0 deletions etl-benchmarks/Dockerfile.tpch-seeder
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Builds an image that bundles go-tpc along with psql client tools for loading TPC-H data.
FROM golang:1.22 AS builder

ARG GO_TPC_VERSION=latest

RUN apt-get update \
&& apt-get install -y --no-install-recommends postgresql-client \
&& rm -rf /var/lib/apt/lists/*

RUN go install github.com/pingcap/go-tpc/cmd/go-tpc@${GO_TPC_VERSION}

ENV PATH="/go/bin:${PATH}"

ENTRYPOINT ["/bin/sh", "-c"]
46 changes: 45 additions & 1 deletion etl-benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Before running benchmarks, ensure you have:
- A Postgres database set up
- A publication created with the tables you want to benchmark
- For BigQuery benchmarks: GCP project, dataset, and service account key file
- For Delta Lake benchmarks: Accessible storage URI (e.g., `s3://bucket/path`) and any required object store credentials

## Quick Start

Expand Down Expand Up @@ -55,6 +56,23 @@ cargo bench --bench table_copies --features bigquery -- --log-target terminal ru
--bq-sa-key-file /path/to/service-account-key.json
```

### 4. Run Delta Lake Benchmark

Benchmark against a Delta Lake table store:

```bash
cargo bench --bench table_copies -- --log-target terminal run \
--host localhost --port 5432 --database bench \
--username postgres --password mypass \
--publication-name bench_pub \
--table-ids 1,2,3 \
--destination delta-lake \
--delta-base-uri s3://my-bucket/my-warehouse \
--delta-storage-option endpoint=http://localhost:9010 \
--delta-storage-option access_key_id=minio \
--delta-storage-option secret_access_key=minio-secret
```

## Command Reference

### Common Parameters
Expand All @@ -68,7 +86,7 @@ cargo bench --bench table_copies --features bigquery -- --log-target terminal ru
| `--password` | Postgres password | (optional) |
| `--publication-name` | Publication to replicate from | `bench_pub` |
| `--table-ids` | Comma-separated table IDs to replicate | (required) |
| `--destination` | Destination type (`null` or `big-query`) | `null` |
| `--destination` | Destination type (`null`, `big-query`, or `delta-lake`) | `null` |

### Performance Tuning Parameters

Expand All @@ -87,6 +105,13 @@ cargo bench --bench table_copies --features bigquery -- --log-target terminal ru
| `--bq-sa-key-file` | Service account key file path | Yes |
| `--bq-max-staleness-mins` | Max staleness in minutes | No |

### Delta Lake Parameters

| Parameter | Description | Required for Delta Lake |
| ------------------------- | ------------------------------------------------ | ----------------------- |
| `--delta-base-uri` | Base URI for Delta tables (e.g., `s3://bucket`) | Yes |
| `--delta-storage-option` | Extra storage option in `key=value` form. Repeat per option. | No |

### Logging Options

| Parameter | Description |
Expand Down Expand Up @@ -130,3 +155,22 @@ cargo bench --bench table_copies --features bigquery -- --log-target terminal ru
```

The benchmark will measure the time it takes to complete the initial table copy phase for all specified tables.

## Local Docker Environment

Start a ready-to-benchmark Postgres instance seeded with TPC-H data via Docker Compose:

```bash
cd etl-benchmarks
docker compose up postgres tpch-seeder
```

The `tpch-seeder` service builds a lightweight image (see `Dockerfile.tpch-seeder`) that bundles the [`go-tpc`](https://github.com/pingcap/go-tpc) binary and runs the TPC-H loader after Postgres becomes healthy. Adjust credentials, port mapping, scale factor, or the go-tpc version by exporting `POSTGRES_USER`, `POSTGRES_PASSWORD`, `POSTGRES_DB`, `POSTGRES_PORT`, `TPCH_SCALE_FACTOR`, or `GO_TPC_VERSION` before launching Compose. Pass `--build` (or `--pull`) when changing `GO_TPC_VERSION` so Compose rebuilds the seeder image.

To add an S3-compatible target for Delta Lake benchmarking, enable the optional `minio` profile:

```bash
docker compose --profile minio up postgres tpch-seeder minio minio-setup
```

This exposes MinIO on `http://localhost:9010` (console on `http://localhost:9011`) with credentials `minio-admin` / `minio-admin-password` and creates the bucket defined by `MINIO_BUCKET` (default `delta-dev-and-test`).
Loading
Loading