Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@
"usage/use-case-examples/full-text-search",
"usage/use-case-examples/infinite-scrolling",
"usage/use-case-examples/offline-only-usage",
"usage/use-case-examples/postgis"
"usage/use-case-examples/postgis",
"usage/use-case-examples/custom-write-checkpoints"
]
},
{
Expand Down
198 changes: 198 additions & 0 deletions usage/use-case-examples/custom-write-checkpoints.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
---
title: "Data Pipelines"
sidebarTitle: "Data Pipelines"
description: "Use custom write checkpoints to handle asynchronous data uploads, as in chained data pipelines."
---

<Info>
**Availability**
Custom write checkpoints are available for customers on our [Team and Enterprise](https://www.powersync.com/pricing) plans.
</Info>

To ensure [consistency](/architecture/consistency), PowerSync relies on write checkpoints. These checkpoints ensure that clients have synced their own changes to the server before applying downloaded data from the server locally. The essential requirement is that the client must get a write checkpoint after the last write/upload. Then, when syncing data from the server, the client checks whether the write checkpoint is part of the largest [sync checkpoint](https://github.com/powersync-ja/powersync-service/blob/main/docs/sync-protocol.md). If it is, the client applies the server-side changes locally.

The default write checkpoints implementation relies on uploads being acknowledged synchronously, i.e. data persists in the source database before the `uploadData` call completes.

Problems occur if the upload is asynchronous. If the client's upload is meant to mutate the source database (and eventually does), but this process is delayed, it will effectively seem as if changes were reverted then applied again thereafter.

Chained *data pipelines* are a common example of asynchronous uploads -- e.g. data uploads are first written to a different database, or a separate queue for processing, and finally replicated to the source database.

For example, consider the following data pipeline:

1. The client makes a change locally and the local database is updated.
2. The client uploads this change to the server.
3. The server resolves the request and writes the change into an intermediate database (not the source database yet).
4. The client thinks the upload is complete. It requests a Write Checkpoint from the PowerSync Service.
5. The PowerSync Service increments the replication HEAD in the source database, and creates a Write Checkpoint for the user. The Write Checkpoint number is returned and recorded in the client.
6. The PowerSync Service replicates past the previous replication HEAD (but the changes are still not present in the source database).
7. It should be fine for the client to apply the state of the server to the local database. The server state does not include the upload's changes. This is the same as if the changes were not applied by the server. This results in the client reverting the changes.
8. Eventually the change is written to the source database and increments the replication HEAD.
9. The PowerSync Service replicates this change and sends it to the client. The client then reapplies the changes.

In the above case, the client may see the write checkpoint before the data has been replicated. This will cause the client to revert its changes, then apply them again later when it has actually replicated, causing data to “flicker” in the app.

For these use cases, custom write checkpoints should be implemented.

## Custom Write Checkpoints

Custom write checkpoints allow the developer to define write checkpoints and insert them into the replication stream directly, instead of relying on the PowerSync Service to create and return them. An example of this is having the backend persist write checkpoints to a dedicated table which is processed as part of the replication stream.

The PowerSync Service then needs to process the (ordered) replication events and correlate the checkpoint table changes to write checkpoint events.

## Example Implementation

A self-hosted Node.js demo with Postgres is available here:

<Card title="Custom Write Checkpoints (NodeJS + Postgres)" icon="github" href="https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs-custom-checkpoints/README.md" horizontal>
</Card>

## Implementation Details

This outlines what a custom write checkpoints implementation entails.

### Custom write checkpoint table

Create a dedicated `checkpoints` table, which should contain the following checkpoint payload information in some form:

```TypeScript
export type CheckpointPayload = {
/**
* The user account id
*/
user_id: string;
/**
* The client id relating to the user account.
* A single user can have multiple clients.
* A client is analogous to a device session.
* Checkpoints are tracked separately for each `user_id` + `client_id`.
*/
client_id: string;
/**
* A strictly increasing write checkpoint identifier.
* This number is generated by the application backend.
*/
checkpoint: bigint;
}
```

### Replication Requirements

Replication events for the custom write checkpoint table (`checkpoints` in this example) needs to enabled.

For Postgres this involves adding the table to the [PowerSync publication](/usage/installation/database-setup), for example:

```SQL
create publication powersync for table public.lists, public.todos, public.checkpoints;
```

### Sync Rules Requirements

You need to enable the `write_checkpoints` sync event in your sync rules configuration. This event should map the rows from the `checkpoints` table to the `CheckpointPayload` payload.

```YAML
# sync-rules.yaml

# Register the custom write_checkpoints event
event_definitions:
write_checkpoints:
payloads:
# This defines where the replicated custom write checkpoints should be extracted from
- SELECT user_id, checkpoint, client_id FROM checkpoints

# Define sync rules as usual
bucket_definitions:
global:
data:
...
```

### Application

Your application should handle custom write checkpoints on both the frontend and backend.

#### Frontend

Your client backend connector should make a call to the application backend to create a custom write checkpoint record after uploading items in the `uploadData` method. The write checkpoint number should be supplied to the CRUD transactions's `complete` method.

```TypeScript
async function uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
const transaction = await database.getNextCrudTransaction();
// Get the unique client ID from the PowerSync Database SQLite storage
const clientId = await db.getClientId();

for (const operation of transaction.crud) {
// Upload the items to application backend
// ....
}

await transaction.complete(await getCheckpoint(clientId));
}

async function getCheckpoint(clientId: string): string {
/**
* Should perform a request to the application backend which should create the write
* checkpoint record and return the corresponding checkpoint number.
*/
return "the Write Checkpoint number from the request";
}
```

#### Backend

The backend should create a write checkpoint record when the client requests it. The record should automatically increment the write checkpoint number for the associated `user_id` and `client_id`.

#### Postgres Example

With the following table defined in the database

```SQL
CREATE TABLE checkpoints (
user_id VARCHAR(255),
client_id VARCHAR(255),
checkpoint INTEGER,
PRIMARY KEY (user_id, client_id)
);
```

the backend should have a route which creates `checkpoints` records.

```TypeScript
router.put('/checkpoint', async (req, res) => {
if (!req.body) {
res.status(400).send({
message: 'Invalid body provided'
});
return;
}

const client = await pool.connect();

// These could be obtained from the session
const { user_id = 'UserID', client_id = '1' } = req.body;

const response = await client.query(
`
INSERT
INTO
checkpoints
(user_id, client_id, checkpoint)
VALUES
($1, $2, '1')
ON
CONFLICT (user_id, client_id)
DO
UPDATE
SET checkpoint = checkpoints.checkpoint + 1
RETURNING checkpoint;
`,
[user_id, client_id]
);
client.release();

// Return the Write Checkpoint number
res.status(200).send({
checkpoint: response.rows[0].checkpoint
});
});

```