Bidirectional Postgres sync tool. Reads tables from Postgres and writes Parquet/Iceberg files to local disk or S3, or ingests Parquet/CSV files from local disk or S3 back into Postgres. Supports incremental sync via watermark tracking and upsert-based ingestion.
pipx install rustream
# or
pip install rustreamgit clone https://github.com/kraftaa/rustream.git
cd rustream
cargo build --release
# binary is at target/release/rustreampip install maturin
maturin develop --release
# now `rustream` is on your PATH# Copy and edit the example config
cp config.example.yaml config.yaml
# Preview what will be synced (no files written)
rustream sync --config config.yaml --dry-run
# Run sync
rustream sync --config config.yaml# Preview what would be ingested
rustream ingest --config ingest_config.yaml --dry-run
# Run ingest
rustream ingest --config ingest_config.yamlEnable debug logging with RUST_LOG:
RUST_LOG=rustream=debug rustream sync --config config.yaml
RUST_LOG=rustream=debug rustream ingest --config ingest_config.yamlpostgres:
host: localhost
database: mydb
user: postgres
password: secret
output:
type: local
path: ./output
tables:
- name: users
incremental_column: updated_at
incremental_tiebreaker_column: id
columns: # optional: pick specific columns
- id
- email
- created_at
- updated_at
- name: orders
incremental_column: updated_at
incremental_tiebreaker_column: id
- name: products # no incremental_column = full sync every run
- name: events # append-only example (no updated_at)
incremental_column: id
incremental_column_is_unique: trueOmit tables to sync every table in the schema. Use exclude to skip some:
postgres:
host: localhost
database: mydb
user: postgres
output:
type: local
path: ./output
# schema: public # default
exclude:
- schema_migrations
- ar_internal_metadataoutput:
type: s3
bucket: my-data-lake
prefix: raw/postgres
region: us-east-1AWS credentials come from environment variables, ~/.aws/credentials, or IAM role.
output:
type: s3
bucket: my-data-lake
prefix: warehouse
region: us-east-1
format: iceberg
warehouse: s3://my-data-lake/warehouse
catalog:
type: filesystem # or "glue" (requires --features glue)
# glue_database: my_db # required when type=gluepostgres:
host: localhost
database: mydb
user: postgres
password: secret
ingest:
input:
type: s3
bucket: my-data-lake
prefix: raw/postgres/
region: us-east-1
pattern: "**/*.parquet"
file_format: parquet # "parquet" or "csv"
write_mode: upsert # "insert" | "upsert" | "truncate_insert"
batch_size: 5000
target_schema: public
tables:
- file_pattern: "users/*.parquet"
target_table: users
key_columns: [id]
create_if_missing: true
- file_pattern: "orders/*.parquet"
target_table: orders
key_columns: [id]ingest:
input:
type: local
path: ./parquet_files
pattern: "**/*.parquet"
file_format: parquet
write_mode: insert
batch_size: 5000If no tables are listed, the target table name is inferred from the parent directory or filename.
| Field | Description |
|---|---|
postgres.host |
Postgres host |
postgres.port |
Postgres port (default: 5432) |
postgres.database |
Database name |
postgres.user |
Database user |
postgres.password |
Database password (optional) |
output.type |
local or s3 |
output.path |
Local directory for Parquet files (when type=local) |
output.bucket |
S3 bucket (when type=s3) |
output.prefix |
S3 key prefix (when type=s3) |
output.region |
AWS region (when type=s3, optional) |
batch_size |
Rows per Parquet file (default: 10000) |
state_dir |
Directory for SQLite watermark state (default: .rustream_state) |
schema |
Schema to discover tables from (default: public) |
exclude |
List of table names to skip when using auto-discovery |
tables[].name |
Table name |
tables[].schema |
Schema name (default: public) |
tables[].columns |
Columns to sync (default: all) |
tables[].incremental_column |
Column for watermark-based incremental sync |
tables[].incremental_tiebreaker_column |
Stable cursor column for duplicate-safe incremental paging (required when incremental_column is set; recommended: primary key) |
tables[].incremental_column_is_unique |
Allow watermark-only incremental mode when incremental column is strictly unique/monotonic (e.g. append-only id) |
tables[].partition_by |
Partition output files: date, month, or year |
format |
Output format: parquet (default) or iceberg |
warehouse |
Warehouse path for Iceberg (required when format=iceberg) |
catalog.type |
Iceberg catalog: filesystem (default) or glue |
| Field | Description |
|---|---|
ingest.input.type |
local or s3 |
ingest.input.path |
Local directory (when type=local) |
ingest.input.bucket |
S3 bucket (when type=s3) |
ingest.input.prefix |
S3 key prefix (when type=s3) |
ingest.input.region |
AWS region (when type=s3, optional) |
ingest.input.pattern |
Glob pattern for file matching (default: **/*.parquet) |
ingest.file_format |
parquet (default) or csv |
ingest.write_mode |
insert (default), upsert, or truncate_insert |
ingest.batch_size |
Rows per INSERT statement (default: 5000) |
ingest.target_schema |
Postgres schema for target tables (default: public) |
ingest.tables[].file_pattern |
Glob pattern to match files to this table |
ingest.tables[].target_table |
Postgres table to write to |
ingest.tables[].key_columns |
Primary key columns (required for upsert mode) |
ingest.tables[].create_if_missing |
Auto-CREATE TABLE from file schema (default: false) |
Some DB-backed tests are optional and run only when RUSTREAM_IT_DB_URL is set.
Without this env var, those tests no-op/return early.
export RUSTREAM_IT_DB_URL="host=localhost port=5432 dbname=mydb user=postgres password=secret"
cargo test- Connects to Postgres and introspects each table's schema via
information_schema - Maps Postgres column types to Arrow types automatically
- Reads rows in batches, converting to Arrow RecordBatches
- Writes each batch as a Snappy-compressed Parquet file
- Tracks the high watermark (max value of
incremental_column) and optional cursor in local SQLite - Checkpoints incremental progress after each successfully written batch
- On next run, reads rows after the saved
(watermark, cursor)position
Tables without incremental_column do a full sync every run.
- Discovers files matching the glob pattern from local disk or S3
- Skips files already ingested (tracked in local SQLite)
- Reads each file into Arrow RecordBatches (Parquet or CSV with schema inference)
- Creates the target table if
create_if_missing: true(DDL from Arrow schema) - Writes rows via multi-row parameterized INSERT or INSERT...ON CONFLICT (upsert)
- Marks each file as ingested in SQLite to avoid reprocessing on next run
| Postgres | Arrow |
|---|---|
boolean |
Boolean |
smallint |
Int16 |
integer, serial |
Int32 |
bigint, bigserial |
Int64 |
real |
Float32 |
double precision |
Float64 |
numeric / decimal |
Utf8 (preserves precision) |
text, varchar, char |
Utf8 |
bytea |
Binary |
date |
Date32 |
timestamp |
Timestamp(Microsecond) |
timestamptz |
Timestamp(Microsecond, UTC) |
uuid |
Utf8 |
json, jsonb |
Utf8 |
| arrays | Utf8 (JSON serialized) |
The project uses maturin to package the Rust binary as a Python wheel (same approach as ruff, uv, etc). The CI workflow in .github/workflows/release.yml builds wheels for Linux, macOS, and Windows, then publishes to PyPI on tagged releases.
To publish manually:
# Build wheels for current platform
maturin build --release
# Upload to PyPI (needs PYPI_API_TOKEN)
maturin publishMIT