Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 105 additions & 34 deletions docs/features/sharding/resharding/hash.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,126 @@ icon: material/database-export-outline

# Move data

If you're using the `HASH` sharding function, adding a new node to the cluster will change the modulo number by 1. The number returned by the hash function is uniformly distributed across the entire integer range, which makes it considerably larger than the modulo. Therefore, changing it will more often than not result in most rows remapped to different shard numbers.
Moving data from the source to the destination database is done using logical replication. This is an online operation, and doesn't require a maintenance window or pausing query traffic.

You can visualize this phenomenon with a bit of Python:
The underlying mechanism is very similar to Postgres [subscriptions](https://www.postgresql.org/docs/current/sql-createsubscription.html), with some improvements, and happens in two steps:

=== "2 shards"
1. Copy data in the [publication](schema.md#publication) to the destination database
2. Stream row changes in real-time

```python
>>> list(map(lambda x: x % 2, [1000, 1001, 1002, 1003, 1004]))
[0, 1, 0, 1, 0]
```
Once the replication stream synchronizes the two database clusters, the data on the destination cluster will be identical, within a few milliseconds, to the source cluster.

=== "3 shards"
```python
>>> list(map(lambda x: x % 3, [1000, 1001, 1002, 1003, 1004]))
[1, 2, 0, 1, 2]
```
## CLI

PgDog has a command line interface you can call by running it directly:

```bash
pgdog data-sync \
--from-database <name> \
--from-user <name> \
--to-database <name> \
--to-user <name> \
--publication <publication>
```

Since most rows will have to be moved, resharding a cluster in-place would put a lot of load on an already overextended system.
Required (*) and optional parameters for this command are as follows:

PgDog's strategy for resharding is to **move data** from an existing cluster to a completely new one, while rehashing the rows in-flight. This process is parallelizable and fast, and since most of the work is done by the new cluster, production databases are not affected.
| Option | Description |
|-|-|
| `--from-database`* | Name of the source database cluster. |
| `--from-user`* | Name of the user configured in `users.toml` for the source database cluster. |
| `--to-database`* | Name of the destination database cluster. |
| `--to-user`* | Name of the user configured in `users.toml` for the destination database cluster. |
| `--publication`* | Name of the Postgres [publication](schema.md#publication) for tables to be copied and sharded. It should exist on the **source** database. |

## Data sync
## How it works

Moving data online is a 2-step process:
The first thing PgDog will do when data sync is started is create a replication slot on each primary database in the source cluster. This will prevent Postgres from removing the WAL, while we copy data for each table to the destination.

1. Copy data from tables using Postgres `COPY`
2. Stream real-time changes using logical replication
Next, each table will be copied, in parallel, to the destination database, using [sharded COPY](../copy.md). Once that's done, table changes are synchronized, in real-time, with logical replication from the replication slot created earlier.

To make sure no rows are lost in the process, PgDog follows a similar strategy used by Postgres in logical replication subscriptions, with some improvements.
The whole process happens entirely online, and doesn't require database reboots or pausing writes to the source database.

### Copying tables
### Replication slot

Copying table data from the source database cluster is done using Postgres `COPY` and logical replication slots. This is implemented in the `data-sync` command:
PostgreSQL replication works on the basis of slots. They are virtual annotations in the Write-Ahead Log which prevent Postgres from recycling WAL segments and deleting the history of changes made to the database.

```bash
pgdog data-sync --help
<center>
<img src="/images/resharding-slot-2.png" width="75%" alt="Cross-shard queries" />
</center>

With logical replication, any client that speaks the protocol (like PgDog) can connect to the server and stream changes made to the database, starting at the position marked by the slot.

Before copying table data, we create a slot to mark a consistent starting point for our replication process. The slot is **permanent**, so even if resharding is interrupted, Postgres won't lose any of the WAL segments we need to resume it.

!!! note "Unused replication slots"
Since permanent replication slots are not automatically removed by Postgres, if you choose to abort the resharding process, make sure to manually drop the replication slot to prevent excessive WAL accumulation on the source database.

Once the slot is created, PgDog starts copying data from all tables in the [publication](schema.md#publication), and resharding it in-flight.

### Copying data

Tables are copied from source to destination database using standard PostgreSQL `COPY` commands, with a few improvements.

#### Parallelization

If you are running PostgreSQL 16 or later and have configured replicas on the source database, PgDog can copy multiple tables in parallel, dramatically accelerating this process.

<center>
<img src="/images/resharding-16x.png" width="75%" alt="Cross-shard queries" />
</center>

To set this up, make sure to add your read replicas to [`pgdog.toml`](../../../configuration/pgdog.toml/databases.md), for example:

```toml
[[databases]]
name = "source"
host = "10.0.0.1"
role = "replica"

[[databases]]
name = "source"
host = "10.0.0.2"
role = "replica"
```

| Option | Description | Example |
|-|-|-|
| `--from-database` | Name of the **source** database cluster. | `prod` |
| `--from-user` | Name of the user configured in `users.toml` for the **source** database cluster. | `postgres` |
| `--to-database` | Name of the **destination** database cluster. | `prod-sharded` |
| `--to-user` | Name of the user configured in `users.toml` for the **destination** database cluster. | `postgres` |
| `--publication` | Name of the Postgres [publication](https://www.postgresql.org/docs/current/sql-createpublication.html) for tables to be copied and sharded. It should exist on the **source** database. | `all_tables` |
PgDog will distribute the table copy load evenly between all replicas in the configuration. The more replicas are available for resharding, the faster it will complete.

!!! note "Dedicated replicas"
To prevent the resharding process from impacting production queries, you can create a separate set of replicas just for resharding.

Managed clouds (e.g., AWS RDS) make this especially easy, but require a warm-up period to fetch all the data from the backup snapshot, before they can read data at full speed of their storage volumes.

#### Binary `COPY`

PgDog uses the binary `COPY` format to send and receive data, which has been shown to be consistently faster than text, because it avoids the (de)serialization overhead of sending tuples in text form. PgDog decodes tuples in-flight and splits them evenly between destination shards, using the [sharded COPY](../copy.md) implementation.

!!! note "Binary compatibility"
While the data format used by PostgreSQL databases hasn't changed in decades, binary `COPY` sends rows exactly as they are stored on disk.

Therefore, sending binary data between two PostgreSQL databases running different
versions of Postgres, however unlikely, could result in incompatibilities. We recommend to _not_ change major versions of the server while resharding.

Once all tables are copied and resharded on the destination database cluster, PgDog will begin streaming real-time row updates from the [replication slot](#replication-slot).

### Streaming updates

Once tables are copied over to the destination database, PgDog will stream any changes made to those tables from the [replication slot](#replication-slot) created previously. If it took a while to copy tables and the source database received a large volume of writes, this process could take some time.

You can check on the streaming process and estimate its ETA by querying the `pg_replication_slots` view on the __source__ database:

=== "Source database"
```postgresql
SELECT confirmed_flush_lsn, pg_current_wal_lsn() FROM pg_replication_slots;
```

| Column | Description |
|-|-|
| `confirmed_flush_lsn` | The transaction identifier that has been written to the destination database cluster. |
| `pg_current_wal_lsn()` | Current position in the Write-Ahead Log for this database. |

All databases and users must be configured in `pgdog.toml` and `users.toml`.
The replication delay between the two database clusters is measured in bytes. When that number reaches zero, the two databases are byte-for-byte identical, and traffic can be [cut over](cutover.md) to the destination database.

### Real time changes
## Next steps

After data sync is complete, changes for all tables in the publication will be streamed in real-time. Keep this connection
open until you are ready to cut traffic over to the new database cluster.
- [Traffic cutover](cutover.md)
24 changes: 14 additions & 10 deletions docs/features/sharding/resharding/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ icon: material/database-edit-outline
---
# Schema sync

PgDog can copy tables, indices and other entities from your production database to the new, sharded database automatically. This is faster than using `pg_dump`, because we separate this process into two parts:
PgDog can copy tables, indexes and other entities from your production database to the new, sharded database automatically. This is faster than using `pg_dump`, because we separate this process into two parts:

1. [Create tables](#tables-and-primary-keys), primary key indices, and sequences
2. Create [secondary indices](#secondary-indices)
1. [Create tables](#tables-and-primary-keys), primary key indexes, and sequences
2. Create [secondary indexes](#secondary-indexes)

The first step needs to be performed first, before [copying data](hash.md). The second step is performed once the data sync is almost complete.
The create tables step needs to be performed first, before [copying data](hash.md). The second step is performed once the data sync is almost complete.

## CLI

Expand All @@ -30,11 +30,11 @@ Required (*) and optional parameters for this command are as follows:
| `--publication`* | The name of the Postgres table [publication](#publication) with the tables you want to sync. |
| `--dry-run` | Print the SQL statements that will be executed on the destination database and exit. |
| `--ignore-errors` | Execute SQL statements and ignore any errors. |
| `--data-sync-complete` | Run the second step to create secondary indices and sequences. |
| `--data-sync-complete` | Run the second step to create secondary indexes and sequences. |

## Tables and primary keys

The first step in the schema sync copies over tables and their primary key indices from the source database to the new, resharded cluster. This has to be done separately, because Postgres's logical replication only copies data and doesn't manage table schemas.
The first step in the schema sync copies over tables and their primary key indexes from the source database to the new, resharded cluster. This has to be done separately, because Postgres's logical replication only copies data and doesn't manage table schemas.

### Primary keys

Expand All @@ -60,7 +60,7 @@ This will make sure _all_ tables in your database will be copied and resharded i

## Schema admin

Schema sync creates tables, indices, and other entities on the destination database. To make sure that's done with a user with sufficient privileges (e.g., `CREATE` permission on the database), you need to add it to [`users.toml`](../../../configuration/users.toml/users.md) and mark it as the schema administrator:
Schema sync creates tables, indexes, and other entities on the destination database. To make sure that's done with a user with sufficient privileges (e.g., `CREATE` permission on the database), you need to add it to [`users.toml`](../../../configuration/users.toml/users.md) and mark it as the schema administrator:

```toml
[[users]]
Expand All @@ -74,7 +74,7 @@ PgDog will use that user to connect to the source and destination databases, so

### `pg_dump` version

PgDog is using `pg_dump` under the hood to export schema definitions. Postgres requires the version of `pg_dump` and the server to be identical. Our [Docker image](../../../installation.md) comes with `pg_dump` for PostgreSQL 16, but your database server may run a different version.
PgDog is using `pg_dump` under the hood to export schema definitions. Postgres requires the version of `pg_dump` and the Postgres server to be identical. Our [Docker image](../../../installation.md) comes with `pg_dump` for PostgreSQL 16, but your database server may run a different version.

Before proceeding, make sure to install the correct version of `pg_dump` for your source database. If you have multiple versions of `pg_dump` installed on the same host, you can specify the path to the right one in `pgdog.toml`:

Expand All @@ -83,6 +83,10 @@ Before proceeding, make sure to install the correct version of `pg_dump` for you
pg_dump_path = "/path/to/pg_dump"
```

## Secondary indices
## Secondary indexes

This step is performed after [data sync](hash.md) is complete. Running this step will create secondary indices on all your tables, which will take some time.
This step is performed after [data sync](hash.md) is complete. Running this step will create secondary indexes on all your tables, which will take some time, depending on the number of indexes in your schema.

## Next steps

- [Move data](hash.md)
Binary file added docs/images/resharding-16x.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/resharding-slot-2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Support for sorting rows in [cross-shard](features/sharding/cross-shard.md) quer

| Feature | Status | Notes |
|-|-|-|
| [Data sync](features/sharding/resharding/hash.md#data-sync) | :material-wrench: | Sync table data with logical replication. Not benchmarked yet. |
| [Data sync](features/sharding/resharding/hash.md) | :material-wrench: | Sync table data with logical replication. Not benchmarked yet. |
| [Schema sync](features/sharding/resharding/schema.md) | :material-wrench: | Sync table, index and constraint definitions. Not benchmarked yet. |
| Online rebalancing | :material-calendar-check: | Not automated yet, requires manual orchestration. |

Expand Down
Loading