|
| 1 | +--- |
| 2 | +title: "Data Pipelines" |
| 3 | +sidebarTitle: "Data Pipelines" |
| 4 | +description: "Use custom write checkpoints to handle asynchronous data uploads, as in chained data pipelines." |
| 5 | +--- |
| 6 | + |
| 7 | +<Info> |
| 8 | +**Availability** |
| 9 | +Custom write checkpoints are available for customers on our [Team and Enterprise](https://www.powersync.com/pricing) plans. |
| 10 | +</Info> |
| 11 | + |
| 12 | +To ensure [consistency](/architecture/consistency), PowerSync relies on write checkpoints. These checkpoints ensure that clients have synced their own changes to the server before applying downloaded data from the server locally. The essential requirement is that the client must get a write checkpoint after the last write/upload. Then, when syncing data from the server, the client checks whether the write checkpoint is part of the largest [sync checkpoint](https://github.com/powersync-ja/powersync-service/blob/main/docs/sync-protocol.md). If it is, the client applies the server-side changes locally. |
| 13 | + |
| 14 | +The default write checkpoints implementation relies on uploads being acknowledged synchronously, i.e. data persists in the source database before the `uploadData` call completes. |
| 15 | + |
| 16 | +Problems occur if the upload is asynchronous. If the client's upload is meant to mutate the source database (and eventually does), but this process is delayed, it will effectively seem as if changes were reverted then applied again thereafter. |
| 17 | + |
| 18 | +Chained *data pipelines* are a common example of asynchronous uploads -- e.g. data uploads are first written to a different database, or a separate queue for processing, and finally replicated to the source database. |
| 19 | + |
| 20 | +For example, consider the following data pipeline: |
| 21 | + |
| 22 | +1. The client makes a change locally and the local database is updated. |
| 23 | +2. The client uploads this change to the server. |
| 24 | +3. The server resolves the request and writes the change into an intermediate database (not the source database yet). |
| 25 | +4. The client thinks the upload is complete. It requests a Write Checkpoint from the PowerSync Service. |
| 26 | +5. The PowerSync Service increments the replication HEAD in the source database, and creates a Write Checkpoint for the user. The Write Checkpoint number is returned and recorded in the client. |
| 27 | +6. The PowerSync Service replicates past the previous replication HEAD (but the changes are still not present in the source database). |
| 28 | +7. It should be fine for the client to apply the state of the server to the local database. The server state does not include the upload's changes. This is the same as if the changes were not applied by the server. This results in the client reverting the changes. |
| 29 | +8. Eventually the change is written to the source database and increments the replication HEAD. |
| 30 | +9. The PowerSync Service replicates this change and sends it to the client. The client then reapplies the changes. |
| 31 | + |
| 32 | +In the above case, the client may see the write checkpoint before the data has been replicated. This will cause the client to revert its changes, then apply them again later when it has actually replicated, causing data to “flicker” in the app. |
| 33 | + |
| 34 | +For these use cases, custom write checkpoints should be implemented. |
| 35 | + |
| 36 | +## Custom Write Checkpoints |
| 37 | + |
| 38 | +Custom write checkpoints allow the developer to define write checkpoints and insert them into the replication stream directly, instead of relying on the PowerSync Service to create and return them. An example of this is having the backend persist write checkpoints to a dedicated table which is processed as part of the replication stream. |
| 39 | + |
| 40 | +The PowerSync Service then needs to process the (ordered) replication events and correlate the checkpoint table changes to write checkpoint events. |
| 41 | + |
| 42 | +## Example Implementation |
| 43 | + |
| 44 | +A self-hosted Node.js demo with Postgres is available here: |
| 45 | + |
| 46 | +<Card title="Custom Write Checkpoints (NodeJS + Postgres)" icon="github" href="https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs-custom-checkpoints/README.md" horizontal> |
| 47 | +</Card> |
| 48 | + |
| 49 | +## Implementation Details |
| 50 | + |
| 51 | +This outlines what a custom write checkpoints implementation entails. |
| 52 | + |
| 53 | +### Custom write checkpoint table |
| 54 | + |
| 55 | +Create a dedicated `checkpoints` table, which should contain the following checkpoint payload information in some form: |
| 56 | + |
| 57 | +```TypeScript |
| 58 | +export type CheckpointPayload = { |
| 59 | + /** |
| 60 | + * The user account id |
| 61 | + */ |
| 62 | + user_id: string; |
| 63 | + /** |
| 64 | + * The client id relating to the user account. |
| 65 | + * A single user can have multiple clients. |
| 66 | + * A client is analogous to a device session. |
| 67 | + * Checkpoints are tracked separately for each `user_id` + `client_id`. |
| 68 | + */ |
| 69 | + client_id: string; |
| 70 | + /** |
| 71 | + * A strictly increasing write checkpoint identifier. |
| 72 | + * This number is generated by the application backend. |
| 73 | + */ |
| 74 | + checkpoint: bigint; |
| 75 | +} |
| 76 | +``` |
| 77 | +
|
| 78 | +### Replication Requirements |
| 79 | +
|
| 80 | +Replication events for the custom write checkpoint table (`checkpoints` in this example) needs to enabled. |
| 81 | +
|
| 82 | +For Postgres this involves adding the table to the [PowerSync publication](/usage/installation/database-setup), for example: |
| 83 | +
|
| 84 | +```SQL |
| 85 | +create publication powersync for table public.lists, public.todos, public.checkpoints; |
| 86 | +``` |
| 87 | + |
| 88 | +### Sync Rules Requirements |
| 89 | + |
| 90 | +You need to enable the `write_checkpoints` sync event in your sync rules configuration. This event should map the rows from the `checkpoints` table to the `CheckpointPayload` payload. |
| 91 | + |
| 92 | +```YAML |
| 93 | +# sync-rules.yaml |
| 94 | + |
| 95 | +# Register the custom write_checkpoints event |
| 96 | +event_definitions: |
| 97 | + write_checkpoints: |
| 98 | + payloads: |
| 99 | + # This defines where the replicated custom write checkpoints should be extracted from |
| 100 | + - SELECT user_id, checkpoint, client_id FROM checkpoints |
| 101 | + |
| 102 | +# Define sync rules as usual |
| 103 | +bucket_definitions: |
| 104 | + global: |
| 105 | + data: |
| 106 | + ... |
| 107 | +``` |
| 108 | + |
| 109 | +### Application |
| 110 | + |
| 111 | +Your application should handle custom write checkpoints on both the frontend and backend. |
| 112 | + |
| 113 | +#### Frontend |
| 114 | + |
| 115 | +Your client backend connector should make a call to the application backend to create a custom write checkpoint record after uploading items in the `uploadData` method. The write checkpoint number should be supplied to the CRUD transactions's `complete` method. |
| 116 | + |
| 117 | +```TypeScript |
| 118 | + async function uploadData(database: AbstractPowerSyncDatabase): Promise<void> { |
| 119 | + const transaction = await database.getNextCrudTransaction(); |
| 120 | + // Get the unique client ID from the PowerSync Database SQLite storage |
| 121 | + const clientId = await db.getClientId(); |
| 122 | + |
| 123 | + for (const operation of transaction.crud) { |
| 124 | + // Upload the items to application backend |
| 125 | + // .... |
| 126 | + } |
| 127 | + |
| 128 | + await transaction.complete(await getCheckpoint(clientId)); |
| 129 | + } |
| 130 | + |
| 131 | + async function getCheckpoint(clientId: string): string { |
| 132 | + /** |
| 133 | + * Should perform a request to the application backend which should create the write |
| 134 | + * checkpoint record and return the corresponding checkpoint number. |
| 135 | + */ |
| 136 | + return "the Write Checkpoint number from the request"; |
| 137 | + } |
| 138 | +``` |
| 139 | + |
| 140 | +#### Backend |
| 141 | + |
| 142 | +The backend should create a write checkpoint record when the client requests it. The record should automatically increment the write checkpoint number for the associated `user_id` and `client_id`. |
| 143 | + |
| 144 | +#### Postgres Example |
| 145 | + |
| 146 | +With the following table defined in the database |
| 147 | + |
| 148 | +```SQL |
| 149 | +CREATE TABLE checkpoints ( |
| 150 | + user_id VARCHAR(255), |
| 151 | + client_id VARCHAR(255), |
| 152 | + checkpoint INTEGER, |
| 153 | + PRIMARY KEY (user_id, client_id) |
| 154 | +); |
| 155 | +``` |
| 156 | + |
| 157 | +the backend should have a route which creates `checkpoints` records. |
| 158 | + |
| 159 | +```TypeScript |
| 160 | +router.put('/checkpoint', async (req, res) => { |
| 161 | + if (!req.body) { |
| 162 | + res.status(400).send({ |
| 163 | + message: 'Invalid body provided' |
| 164 | + }); |
| 165 | + return; |
| 166 | + } |
| 167 | + |
| 168 | + const client = await pool.connect(); |
| 169 | + |
| 170 | +// These could be obtained from the session |
| 171 | + const { user_id = 'UserID', client_id = '1' } = req.body; |
| 172 | + |
| 173 | + const response = await client.query( |
| 174 | + ` |
| 175 | + INSERT |
| 176 | + INTO |
| 177 | + checkpoints |
| 178 | + (user_id, client_id, checkpoint) |
| 179 | + VALUES |
| 180 | + ($1, $2, '1') |
| 181 | + ON |
| 182 | + CONFLICT (user_id, client_id) |
| 183 | + DO |
| 184 | + UPDATE |
| 185 | + SET checkpoint = checkpoints.checkpoint + 1 |
| 186 | + RETURNING checkpoint; |
| 187 | + `, |
| 188 | + [user_id, client_id] |
| 189 | + ); |
| 190 | + client.release(); |
| 191 | + |
| 192 | + // Return the Write Checkpoint number |
| 193 | + res.status(200).send({ |
| 194 | + checkpoint: response.rows[0].checkpoint |
| 195 | + }); |
| 196 | +}); |
| 197 | + |
| 198 | +``` |
0 commit comments