Framework-agnostic PostgreSQL backup library with advanced filtering support.
- ✅ Direct Streaming: No temp files, minimal RAM usage (works with TB-sized tables)
- ✅ COPY Format: Fast backup and restore using PostgreSQL COPY
- ✅ Flexible Filtering: Filter data per table with reusable filter classes
- ✅ Multiple Exporters: Local file, S3, or custom destinations
- ✅ Framework-Agnostic: Works with Django, Flask, FastAPI, or pure Python
- ✅ Production Ready: Error handling, logging, validation
- ✅ Multi-Schema Support: Backup from any schema, not just 'public'
- ✅ Automatic SQL Cleaning: Remove schema prefixes and psql meta-commands for clean restores
# Basic installation
pip install psycopg2-binary
# For S3 support
pip install psycopg2-binary boto3
# For Django integration
pip install psycopg2-binary djangofrom postgres_backup_plugin import PostgresBackupEngine, DatabaseConfig
# Configure database
db_config = DatabaseConfig(
host='localhost',
port=5432,
user='postgres',
password='secret',
database='mydb'
)
# Create backup engine
engine = PostgresBackupEngine(db_config)
# Create backup
result = engine.backup('/tmp/backup.sql')
if result.success:
print(f"Backup created: {result.file_path}")
print(f"Size: {result.size_bytes} bytes")
print(f"Tables: {result.tables_count}")
print(f"Duration: {result.duration_seconds}s")
else:
print(f"Backup failed: {result.error_message}")from postgres_backup_plugin import PostgresBackupEngine
# Automatically use Django database settings
engine = PostgresBackupEngine.from_django_settings(
excluded_tables=['django_session', 'django_admin_log']
)
result = engine.backup('/tmp/backup.sql')from postgres_backup_plugin import (
PostgresBackupEngine,
DateRangeFilter,
ForeignKeyFilter,
StatusFilter
)
engine = PostgresBackupEngine.from_django_settings()
# Define filters per table
filters = {
# Filter orders by customer IDs
'orders': ForeignKeyFilter('customer_id', [123, 456, 789]),
# Filter invoices by date range
'invoices': DateRangeFilter('created_at', '2024-01-01', '2024-12-31'),
# Filter users by status
'users': StatusFilter('status', allowed_statuses=['active', 'pending']),
# Raw SQL also works
'products': 'SELECT * FROM products WHERE price > 100'
}
result = engine.backup(
output_path='/tmp/filtered_backup.sql',
filters=filters,
schema_name='my_backup',
metadata={'purpose': 'customer_migration', 'ticket': 'PROJ-123'}
)from postgres_backup_plugin import PostgresBackupEngine, S3Exporter
engine = PostgresBackupEngine.from_django_settings()
# Create backup
result = engine.backup('/tmp/backup.sql')
if result.success:
# Export to S3
exporter = S3Exporter(
bucket='my-backups',
prefix='postgres/deliveries/',
delete_local=True # Delete local file after upload
)
s3_url = exporter.export(result.file_path, metadata=result.metadata)
print(f"Uploaded to: {s3_url}")Backup from any schema and automatically remove schema prefixes for clean, portable SQL:
from postgres_backup_plugin import PostgresBackupEngine, BackupConfig
# Configure automatic cleaning
backup_config = BackupConfig(
clean_output=True, # Enable automatic SQL cleaning
target_schema='new_schema' # Target schema for restore
)
engine = PostgresBackupEngine.from_django_settings(backup_config=backup_config)
# Backup from 'as_59' schema, remove all 'as_59.' prefixes
result = engine.backup(
output_path='/tmp/backup_cleaned.sql',
source_schema='as_59', # Backup FROM this schema
schema_name='new_schema', # Restore TO this schema
metadata={'source': 'as_59', 'target': 'new_schema'}
)
# Backup from one schema and restore to another
result = engine.backup(
output_path='/tmp/backup_migration.sql',
source_schema='old_schema', # Backup FROM this schema
schema_name='new_schema' # Restore TO this schema
)Use Cases:
- Multi-tenant databases with separate schemas per tenant
- Development/staging/production schemas in same database
- Schema-based data isolation
- Cross-schema migrations
By default, the plugin automatically cleans SQL output to make it portable and easy to restore:
from postgres_backup_plugin import BackupConfig, PostgresBackupEngine
# SQL cleaning is enabled by default
backup_config = BackupConfig(
clean_output=True, # Default: True
target_schema='my_target_schema' # Optional: specify target schema for restore
)
engine = PostgresBackupEngine(db_config, backup_config)
result = engine.backup('/tmp/clean_backup.sql')What gets cleaned:
- ✅ Source schema prefixes (e.g.,
as_59.users→users,public.orders→orders)- Works with ANY source schema, not just 'public'
- Removes prefix from CREATE TABLE, ALTER TABLE, COPY, REFERENCES, etc.
- ✅ Psql meta-commands (
\restrict,\unrestrict,\connect, etc.) - ✅ Unnecessary SET commands
- ✅ Empty lines and comments (optional)
⚠️ COPY data blocks are preserved - no data loss!
Benefits:
- Clean, portable SQL files
- Easy to restore to any schema (schema-independent backups)
- No psql-specific commands
- Smaller file size
- Perfect for schema migrations and multi-tenant applications
Disable cleaning if needed:
backup_config = BackupConfig(
clean_output=False # Keep raw SQL output
)Create reusable filter classes:
from postgres_backup_plugin.filters import FilterQuery
class AgeRangeFilter(FilterQuery):
def __init__(self, min_age, max_age):
self.min_age = min_age
self.max_age = max_age
def build(self, table_name, **params):
return f"""
SELECT * FROM {table_name}
WHERE age BETWEEN {self.min_age} AND {self.max_age}
"""
# Use it
filters = {
'users': AgeRangeFilter(18, 65)
}Combine multiple filters:
from postgres_backup_plugin.filters import CompositeFilter
date_filter = DateRangeFilter('created_at', '2024-01-01', '2024-12-31')
status_filter = StatusFilter('status', ['active'])
# Combine with AND
composite = CompositeFilter(date_filter, status_filter, operator='AND')
filters = {
'orders': composite
}engine = PostgresBackupEngine.from_django_settings()
filters = {
'orders': 'SELECT * FROM orders WHERE invalid_column = 123'
}
try:
engine.validate_filters(filters)
except FilterValidationError as e:
print(f"Invalid filter: {e}")engine = PostgresBackupEngine.from_django_settings()
estimates = engine.estimate_size(filters)
for table, row_count in estimates.items():
print(f"{table}: {row_count} rows")from postgres_backup_plugin import DatabaseConfig
db_config = DatabaseConfig(
host='localhost',
port=5432,
user='postgres',
password='secret',
database='mydb'
)from postgres_backup_plugin import BackupConfig
backup_config = BackupConfig(
# Table exclusion
excluded_tables=['temp_table', 'cache_table'],
# Performance
buffer_size=8192,
timeout=3600, # seconds
# SQL Cleaning (NEW!)
clean_output=True, # Enable automatic SQL cleaning (default: True)
target_schema='my_schema', # Target schema for cleaned output (optional)
# Encoding
encoding='utf-8',
# Restore optimization
disable_triggers=True, # Faster restore
disable_fsync=True, # Faster restore
# Output options
include_header=True,
verbose_logging=True,
# COPY format options
copy_delimiter='\\t', # Tab delimiter
copy_null_string='\\N' # NULL representation
)
engine = PostgresBackupEngine(db_config, backup_config)Configuration Details:
| Parameter | Type | Default | Description |
|---|---|---|---|
excluded_tables |
List[str] | [] |
Tables to skip during backup |
clean_output |
bool | True |
Enable automatic SQL cleaning |
target_schema |
Optional[str] | None |
Target schema for restore |
buffer_size |
int | 8192 |
Stream buffer size (bytes) |
timeout |
int | 3600 |
Connection timeout (seconds) |
encoding |
str | 'utf-8' |
File encoding |
disable_triggers |
bool | True |
Disable triggers during restore |
disable_fsync |
bool | True |
Disable fsync during restore |
include_header |
bool | True |
Include header comments in SQL |
verbose_logging |
bool | True |
Enable detailed logging |
To restore a backup created by this plugin:
# Simple restore
psql -U postgres -d mydb < backup.sql
# With specific schema
psql -U postgres -d mydb -f backup.sql
# Remote database
psql -h remote-host -U postgres -d mydb < backup.sql
# Restore with progress monitoring
pv backup.sql | psql -U postgres -d mydb
# Restore to a different schema (if backup includes schema setup)
psql -U postgres -d mydb < backup_with_schema.sqlBackup each tenant's schema separately:
from postgres_backup_plugin import PostgresBackupEngine
engine = PostgresBackupEngine.from_django_settings()
# List of tenant schemas
tenant_schemas = ['tenant_001', 'tenant_002', 'tenant_003']
for tenant_schema in tenant_schemas:
output_file = f'/backups/{tenant_schema}_backup.sql'
result = engine.backup(
output_path=output_file,
source_schema=tenant_schema,
metadata={'tenant': tenant_schema, 'backup_type': 'daily'}
)
if result.success:
print(f"✓ Backed up {tenant_schema}: {result.tables_count} tables, {result.total_rows:,} rows")
else:
print(f"✗ Failed to backup {tenant_schema}: {result.error_message}")Backup from production schema and restore to staging:
# Step 1: Backup from production schema
result = engine.backup(
output_path='/tmp/production_backup.sql',
source_schema='production', # FROM production
schema_name='staging' # TO staging (in restore)
)
# Step 2: Restore to staging schema (on same or different database)
# $ psql -U postgres -d mydb < /tmp/production_backup.sqlBackup with filtering and cleaning for safe dev/test usage:
from postgres_backup_plugin.filters import DateRangeFilter, StatusFilter
from datetime import datetime, timedelta
# Get last 30 days of data only
thirty_days_ago = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
today = datetime.now().strftime('%Y-%m-%d')
filters = {
'orders': DateRangeFilter('created_at', thirty_days_ago, today),
'users': StatusFilter('status', ['test', 'demo']), # Only test users
'payments': 'SELECT * FROM payments WHERE amount < 100' # Small transactions only
}
backup_config = BackupConfig(
clean_output=True,
target_schema='dev_data',
excluded_tables=['audit_logs', 'sessions', 'cache']
)
engine = PostgresBackupEngine(db_config, backup_config)
result = engine.backup(
output_path='/tmp/dev_safe_backup.sql',
source_schema='production',
schema_name='dev_data',
filters=filters,
metadata={'purpose': 'dev_environment', 'sanitized': True}
)import schedule
import time
from datetime import datetime
from postgres_backup_plugin import PostgresBackupEngine, S3Exporter
def daily_backup():
engine = PostgresBackupEngine.from_django_settings()
# Create timestamped backup
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f'/tmp/backup_{timestamp}.sql'
result = engine.backup(
output_path=backup_file,
metadata={'type': 'daily', 'timestamp': timestamp}
)
if result.success:
# Upload to S3
exporter = S3Exporter(
bucket='my-backups',
prefix=f'daily/{datetime.now().strftime("%Y/%m")}/',
delete_local=True
)
s3_url = exporter.export(backup_file)
print(f"✓ Daily backup completed: {s3_url}")
else:
print(f"✗ Backup failed: {result.error_message}")
# Send alert notification here
# Schedule daily at 2 AM
schedule.every().day.at("02:00").do(daily_backup)
while True:
schedule.run_pending()
time.sleep(60)| Method | Backup Time | RAM Usage | Disk I/O |
|---|---|---|---|
| This Plugin | ~30-35s | ~50MB | 1GB |
| INSERT statements | ~120s | ~50MB | 1GB |
| pg_dump (custom) | ~40s | ~50MB | 1GB |
| Temp CSV method | ~45s | ~50MB | 2GB |
- Direct streaming from PostgreSQL to file
- COPY format (native PostgreSQL bulk export)
- No intermediate files
- Minimal memory usage
postgres_backup_plugin/
├── core/
│ ├── backup_engine.py # Main backup logic
│ ├── stream_wrapper.py # Direct streaming
│ └── query_builder.py # SQL utilities
├── filters/
│ ├── base.py # Filter interface
│ └── common_filters.py # Pre-built filters
├── exporters/
│ ├── base.py # Exporter interface
│ ├── file_exporter.py # Local file
│ └── s3_exporter.py # AWS S3
├── config.py # Configuration classes
└── exceptions.py # Custom exceptions
def backup(
self,
output_path: str,
filters: Optional[Dict[str, Any]] = None,
schema_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
source_schema: Optional[str] = 'public'
) -> BackupResultParameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
output_path |
str | Required | Output file path for backup SQL |
filters |
Dict[str, Any] | None |
Per-table filters for selective backup |
schema_name |
Optional[str] | None |
Target schema name for restore (creates schema in SQL) |
metadata |
Optional[Dict] | None |
Custom metadata to include in backup header |
source_schema |
Optional[str] | 'public' |
Source schema to backup from |
Returns: BackupResult object with:
success: bool - Whether backup succeededfile_path: str - Path to backup filefile_size: int - Backup file size in bytestables_count: int - Number of tables backed uptotal_rows: int - Total rows backed upduration: float - Backup duration in secondserror_message: Optional[str] - Error message if failedmetadata: Dict - Backup metadata
Example:
result = engine.backup(
output_path='/tmp/backup.sql',
source_schema='production', # Backup FROM this schema
schema_name='staging', # Restore TO this schema
filters={'users': 'SELECT * FROM users WHERE active = true'},
metadata={'purpose': 'migration', 'ticket': 'PROJ-123'}
)Complete list of configuration options:
BackupConfig(
# Core options
excluded_tables: List[str] = [],
buffer_size: int = 8192,
timeout: int = 3600,
encoding: str = 'utf-8',
# SQL Cleaning options
clean_output: bool = True,
target_schema: Optional[str] = None,
# Performance options
disable_triggers: bool = True,
disable_fsync: bool = True,
# Output options
include_header: bool = True,
verbose_logging: bool = True,
# COPY format options
copy_delimiter: str = '\\t',
copy_null_string: str = '\\N',
copy_quote_char: str = '\\b',
copy_escape_char: str = '\\b'
)1. Permission Denied Error
PermissionError: [Errno 13] Permission denied: '/path/to/backup.sql'
Solution: Ensure the output directory exists and is writable:
import os
os.makedirs(os.path.dirname(output_path), exist_ok=True)2. Schema Does Not Exist
psycopg2.errors.InvalidSchemaName: schema "my_schema" does not exist
Solution: Verify the schema exists:
SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'my_schema';3. Connection Timeout
psycopg2.OperationalError: timeout expired
Solution: Increase timeout in configuration:
backup_config = BackupConfig(timeout=7200) # 2 hours4. Large Backup Files
File is too large (>2GB) for processing
Solution: Use compression or split by schema/table:
# Compress on-the-fly
python backup_script.py | gzip > backup.sql.gz
# Or split by schema
for schema in schema1 schema2 schema3; do
python backup_script.py --schema=$schema
done- Python 3.7+
- PostgreSQL 9.5+
- psycopg2 or psycopg2-binary
- boto3 (optional, for S3 export)
- schedule (optional, for automated backups)
- ✨ Added multi-schema support with
source_schemaparameter - ✨ Automatic SQL cleaning (remove schema prefixes, psql commands)
- 🔧 Improved COPY data preservation during cleaning
- 📝 Enhanced documentation with real-world examples
- 🐛 Fixed pg_dump psql meta-command handling
- 🎉 Initial release
- ⚡ Direct streaming with COPY format
- 🎯 Flexible per-table filtering
- 📤 S3 export support
- 🔧 Django integration
MIT License
Contributions welcome! Please open an issue or PR.
# Clone repository
git clone https://github.com/yourusername/postgres_backup_plugin.git
cd postgres_backup_plugin
# Install dependencies
pip install -r requirements.txt
# Run tests
python -m pytest tests/
# Run specific test
python test_source_schema_detailed.py