English | 简体中文
Migrate data between different databases with ease.
- Plugin architecture — Add new databases by implementing interfaces, no core changes needed
- Dynamic plugin loading — Load
.soplugins at runtime (Gopluginpackage), hot-reload with SIGHUP - Streaming migration — Batch-based processing, won't OOM on large tables
- Checkpoint & resume — Interrupted migrations can continue from where they left off
- Parallel table migration — Multiple tables migrated concurrently with configurable parallelism
- Lifecycle hooks — Inject custom logic at pipeline/table/batch stages (e.g., TimescaleDB hypertable setup)
- Schema mapping — Automatic type conversion between source and sink databases
- Debug & diagnostics — Optional pprof profile collection for performance analysis
| Database | Source | Sink |
|---|---|---|
| MySQL | ✓ | ✓ |
| PostgreSQL | ✓ | ✓ |
| InfluxDB | ✓ | ✓ |
| ClickHouse | ✓ | ✓ |
| MongoDB | ✓ | ✓ |
| Redis | ✓ | ✓ |
| Kafka | ✓ | ✓ |
go install github.com/silves-xiang/data-bridge/cmd/databridge@latest# Run a migration
databridge migrate -c config.yaml
# Validate config without running
databridge validate -c config.yaml
# List available connectors and hooks
databridge list
# Show version
databridge versionCreate a YAML config file:
task:
name: "my-migration"
mode: full
source:
type: mysql
connection:
host: "127.0.0.1"
port: 3306
user: "root"
password: "${MYSQL_PASSWORD}"
database: "source_db"
sink:
type: postgresql
connection:
host: "127.0.0.1"
port: 5432
user: "postgres"
password: "${PG_PASSWORD}"
database: "target_db"
ssl_mode: "disable"
tables:
- source: "users"
target: "users"
batch_size: 5000
parallelism: 4
checkpoint:
enabled: true
dir: "./.databridge/checkpoints"InfluxDB can be used as both source and sink. As a sink, you can configure which columns become tags and which column provides the timestamp:
source:
type: influxdb
connection:
url: "http://localhost:8086"
token: "${INFLUXDB_TOKEN}"
org: "myorg"
bucket: "mybucket"
sink:
type: influxdb
connection:
url: "http://localhost:8086"
token: "${INFLUXDB_TOKEN}"
org: "myorg"
bucket: "target_bucket"
params:
time_column: "created_at" # source column to use as timestamp
tag_columns: ["sensor_id"] # source columns to store as tagsSee examples/ for full configuration examples.
Source (MySQL) ──ReadBatch──> Pipeline ──WriteBatch──> Sink (PostgreSQL)
│
┌─────┼─────┐
│ │ │
Checkpoint Hooks Worker Pool
- Source — Reads tables and row batches from a source database
- Sink — Creates tables and writes row batches to a target database
- Hook — Lifecycle callbacks:
PipelineHook,TableHook,BatchHook
Compile-time (built-in):
- Implement
source.Sourceand/orsink.Sinkinterfaces - Implement schema mapping (
SourceTypeMapper/TargetTypeMapper) - Register in
init()viasource.Register("name", factory)/sink.Register("name", factory) - Import the plugin package in
cmd/databridge/main.go
Runtime (.so dynamic loading):
Plugins can be compiled as shared objects and loaded at runtime without
recompiling the main binary. Each .so must export a Register function:
package main
import _ "github.com/silves-xiang/data-bridge/plugins/myplugin"
func Register() {}Build with:
make plugin-myplugin # produces plugins/myplugin.soSet plugin_dir in your config, and plugins are loaded at startup:
plugin_dir: "./plugins"To hot-reload plugins after adding or removing .so files:
kill -SIGHUP $(pgrep databridge)Note: Go plugin requires the plugin and main binary to use the same Go version.
Only supported on Linux, FreeBSD, and macOS.
Hooks allow custom logic at migration lifecycle points:
- PipelineHook —
OnPipelineStart/OnPipelineEnd - TableHook —
OnTableStart/OnTableEnd(e.g., create TimescaleDB hypertable) - BatchHook —
OnBatchComplete(e.g., periodic aggregation)
hooks:
- name: "create-hypertables"
type: "timescale"
params:
partition_column: "created_at"
hypertable_interval: "7 days"
enable_compression: true
compression_after: "30 days"debug:
enabled: true
verbose_batch: true # Log every batch timing and row count
log_memory: true # Log memory usage per batch
pprof:
enabled: true
dir: "./.databridge/pprof"
interval: "5m" # Capture interval
profiles:
- "heap"
- "goroutine"
- "allocs"
cpu_duration: "30s"Analyze profiles with:
go tool pprof -http=:8080 .databridge/pprof/heap_20260101_120000.profMIT