An Apache Airflow pipeline for automated, full-refresh migrations from PostgreSQL source to PostgreSQL target. Built with the Astronomer framework for reliable orchestration and easy deployment.
- Schema Discovery: Automatically extract table structures, columns, indexes, and foreign keys from source PostgreSQL
- Streaming Data Transfer: Move data efficiently using server-side cursors, keyset pagination, and PostgreSQL's COPY protocol
- Validation: Standalone validation DAG verifies migration success through row count comparisons
- Parallelization: Transfer multiple tables concurrently using Airflow's dynamic task mapping
- Large Table Partitioning: Automatically partitions tables >1M rows into 8 parallel chunks by primary key range
- Docker Desktop (4GB+ RAM recommended)
- Astronomer CLI
docker-compose up -dThis starts:
- postgres-source: Source database on port 5434 (database:
source_db) - postgres-target: Target database on port 5435 (database:
target_db)
astro dev startAccess the Airflow UI at http://localhost:8080
Connections are auto-configured via .env file (environment variables).
./scripts/connect-databases.shOption A: Using pg_dump (recommended for simple migrations)
docker exec postgres-source pg_dump -U postgres -d source_db --schema=public --no-owner --no-acl | \
docker exec -i postgres-target psql -U postgres -d target_dbOption B: Using Airflow DAG (recommended for large datasets)
SCHEDULER=$(docker ps --format '{{.Names}}' | grep scheduler)
docker exec $SCHEDULER airflow dags trigger postgres_to_postgres_migrationSee docs/SETUP_GUIDE.md for detailed setup instructions including test data creation.
The pipeline executes as a single Airflow DAG with the following stages:
Extract Schema -> Create Target Schema -> Create Tables -> Transfer Data (parallel) -> Create Foreign Keys -> Validate -> Report
- Schema Extraction: Queries PostgreSQL system catalogs to discover all tables, columns, data types, indexes, and constraints
- DDL Generation: Generates PostgreSQL DDL statements for target tables
- Table Creation: Creates target tables (drops existing tables first)
- Data Transfer: Streams data using keyset pagination with PostgreSQL COPY protocol
- Foreign Key Creation: Adds foreign key constraints after all data is loaded
- Validation: Triggers standalone validation DAG that compares source and target row counts
The pipeline uses a streaming architecture optimized for large datasets:
- Keyset Pagination: Uses primary key ordering instead of OFFSET/FETCH for efficient chunking
- PostgreSQL COPY Protocol: Bulk loads data for maximum throughput
- Server-Side Cursors: Streams rows without loading entire result sets into memory
- Parallel Partitioning: Large tables split into parallel partitions by PK range
A standalone validate_migration_env DAG handles validation separately to avoid XCom serialization issues with large result sets.
| Service | URL/Port | Credentials |
|---|---|---|
| Airflow UI | http://localhost:8080 | No auth required locally |
| PostgreSQL Source | localhost:5434 | postgres / PostgresPassword123 |
| PostgreSQL Target | localhost:5435 | postgres / PostgresPassword123 |
postgres-to-postgres-pipeline/
├── dags/
│ ├── postgres_to_postgres_migration.py # Main migration DAG
│ └── validate_migration_env.py # Standalone validation DAG
├── include/
│ └── mssql_pg_migration/
│ ├── schema_extractor.py # PostgreSQL schema discovery
│ ├── type_mapping.py # Data type mapping (identity for PG-to-PG)
│ ├── ddl_generator.py # PostgreSQL DDL generation
│ ├── data_transfer.py # Streaming data transfer
│ └── validation.py # Migration validation
├── tests/
│ └── dags/
│ └── test_dag_example.py # DAG validation tests
├── docs/
│ └── SETUP_GUIDE.md # Detailed setup instructions
├── docker-compose.yml # Source and target PostgreSQL containers
├── Dockerfile # Astronomer Runtime image
├── requirements.txt # Python dependencies
└── airflow_settings.yaml # Local connections/variables
| Parameter | Default | Description |
|---|---|---|
source_conn_id |
postgres_source |
Airflow connection ID for source PostgreSQL |
target_conn_id |
postgres_target |
Airflow connection ID for target PostgreSQL |
source_schema |
public |
Schema to migrate from source |
target_schema |
public |
Target schema in destination |
chunk_size |
100000 |
Rows per batch during transfer |
exclude_tables |
[] |
Table patterns to skip |
use_unlogged_tables |
true |
Create tables as UNLOGGED for faster bulk inserts |
drop_existing_tables |
false |
Drop and recreate existing tables instead of truncating |
By default, the DAG truncates existing tables before loading data. This is faster but assumes the target table schema matches the source.
Set drop_existing_tables: true when:
- The source schema has changed (new/removed/modified columns)
- Switching to a different source database with different table structures
- You encounter errors like
column "X" of relation "Y" does not exist
Example: Trigger with clean target
astro dev run dags trigger postgres_to_postgres_migration \
--conf '{"drop_existing_tables": true}'astro dev parseastro dev pytest tests/astro dev logs -fastro dev stop
docker-compose down- Astronomer Runtime 3.1
- apache-airflow-providers-postgres >= 5.12.0
- psycopg2-binary
- pg8000 >= 1.30.0
See LICENSE file for details.