Skip to content
This repository was archived by the owner on Jan 28, 2026. It is now read-only.

blocking local replication#12

Merged
avichalp merged 21 commits intonextfrom
avichalp/local-replication-blocking
Nov 10, 2023
Merged

blocking local replication#12
avichalp merged 21 commits intonextfrom
avichalp/local-replication-blocking

Conversation

@avichalp
Copy link
Contributor

@avichalp avichalp commented Oct 26, 2023

Summary

  • Changes the streamer to replay the WAL records locally on a duck db instance
  • On replication start command, all the existing db files are uploaded before we start replicating the target database to local duck db.
  • The DBManager Maintains a time window. It is configurable. By default it is set to 3600 seconds. When the window is passed the current database is exported and uploaded to the provider.
  • This implementation is single threaded and blocking at the moment. I think, this is good enough to start with. We can parallelize this implementation when we are dealing with higher data volume. The single threaded, blocking approach greatly simplifies the current implementation.
  • See discussion

@avichalp avichalp force-pushed the avichalp/local-replication-blocking branch from 01f18f0 to ff3dae6 Compare October 31, 2023 12:29
@@ -1,2 +1,2 @@
{"commit_lsn":957393888,"records":[{"action":"I","xid":1055,"lsn":"0/3910AB70","nextlsn":"","timestamp":"2023-08-22 10:56:08.151-03","schema":"public","table":"a","columns":[{"name":"a","type":"integer","value":4}],"pk":[]}]}
{"commit_lsn":957398296,"records":[{"action":"I","xid":1058,"lsn":"0/3910B898","nextlsn":"","timestamp":"2023-08-22 14:44:02.043586-03","schema":"public","table":"t","columns":[{"name":"id","type":"integer","value":200232},{"name":"name","type":"text","value":"100"}],"pk":[{"name":"id","type":"integer"}]}]} No newline at end of file
{"commit_lsn":957398296,"records":[{"action":"I","xid":1058,"lsn":"0/3910B898","nextlsn":"","timestamp":"2023-08-22 14:44:02.043586-03","schema":"public","table":"t","columns":[{"name":"id","type":"integer","value":200232},{"name":"name","type":"text","value":"100"}],"pk":[{"name":"id","type":"integer"}]}]}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the first example to have the same schema for current test cases

signature, err := b.sign(capnpTx)
if err != nil {
return fmt.Errorf("sign: %s", err)
if err := b.dbMngr.Replay(ctx, tx); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local WAL replay before committing LSN

return nil
}

func (b *BasinStreamer) sign(tx basincapnp.Tx) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wouldn't need sign and push because we can re use the file uploader's functions


// Create sink table in local DB
if err := b.dbMngr.Setup(ctx); err != nil {
return fmt.Errorf("cannot setup db: %s", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creates and sets up a new local db before starting replication

}

// queryFromWAL creates a query for a WAL TX records.
func (dbm *DBManager) queryFromWAL(tx *pgrepl.Tx) string {
Copy link
Contributor Author

@avichalp avichalp Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pg -> ddb type remappings is a todo for a separate PR

WHERE c.table_name = $1;
`, rel,
)
columns, err := inspectTable(ctx, tx, rel)
Copy link
Contributor Author

@avichalp avichalp Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted this logic to get table schema out into a new function

}
defer rows.Close()

type column struct {
Copy link
Contributor Author

@avichalp avichalp Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the column public and moved to the app package since it needs to be shared with the local db

func (dbm *DBManager) Replay(ctx context.Context, tx *pgrepl.Tx) error {
if dbm.windowPassed() {
slog.Info("replacing current db before replaying further txs")
if err := dbm.replace(ctx); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If export or upload fails, the replicator will exit. the next time it starts it will first upload all existing db files and then start replicating on a fresh db

createdAT time.Time
cols []Column
winSize time.Duration
uploader *BasinUploader
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuses file uploader to upload the exported files

return strings.Join(queries, "\n")
}

func (dbm *DBManager) replace(ctx context.Context) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how the db is being replaced when the winSize exceeds

return nil
}

func (bp *basinProviderMock) Upload(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mock impl, makes a copy of the file that was uploaded. to test, we will import this file into a db and make assertions

@avichalp avichalp force-pushed the avichalp/local-replication-blocking branch from d01db14 to d7d0f6f Compare November 1, 2023 22:30
}

// UploadAll uploads all db dumps in the db dir.
func (dbm *DBManager) UploadAll(ctx context.Context) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UploadAll is called when the replication starts to send any remaining data left.

return fmt.Errorf("upload all: %s", err)
}

basinStreamer := app.NewBasinStreamer(ns, r, dbm)
Copy link
Contributor Author

@avichalp avichalp Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streamer doesn't bind to BasinProvider anymore because going forward, BasinProvider is only used for fetching and updating metadata. Uploader is used to upload files. DBManager is instantiated with an Uploader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment about an alternative more decoupled architecture. No need to change it.

One option would be to not have the Uploader as a dependency of DBManager. You could use a channel to synchronize both components and make them independent. Every time a file is ready to be uploaded, DBManager notifies the channel, Uploader picks up the notification and uploads the file. Notice that I'm not saying to make it async, it would still be blocking, but more decoupled and easier to test the parts independently.

// BasinProvider ...
type BasinProvider interface {
Create(context.Context, string, string, basincapnp.Schema, common.Address) (bool, error)
Push(context.Context, string, string, basincapnp.Tx, []byte) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing Push from here as it will not be used now. I left it intact in the capnp definition maybe we can clean that up when we remove capnp otherwise we need to regenerate the files in this PR

}

// Push pushes Postgres tx to the server.
func (bp *BasinProvider) Push(ctx context.Context, ns string, rel string, tx basincapnp.Tx, sig []byte) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Push will not be used anymore

@avichalp avichalp marked this pull request as ready for review November 2, 2023 12:43
Copy link
Contributor

@brunocalza brunocalza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! In general, it looks good and correct to me. Left some comments for your considerations. Nothing critical, just minor improvements from my perspective.


func newPublicationCreateCommand() *cli.Command {
var owner, dburi, provider string
var owner, dburi, provider, winSize string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could switch the winSize type from string to int64, in L82 use the &cli.Int64Flag struct, avoin the parseInt in L127

}

func mkDBDir(dir, pub string) error {
if err := os.Mkdir(path.Join(dir, pub), 0o755); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better using MkdirAll so it ignores if the dir already exists?

MkdirAll creates a directory named path, along with any necessary parents, and returns nil, or else returns an error. The permission bits perm (before umask) are used for all directories that MkdirAll creates. If path is already a directory, MkdirAll does nothing and returns nil.


cols := strings.Join(columnNames, ", ")
valsStr := strings.Join(vals, ", ")
query := fmt.Sprintf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't have to create multiple insert stmts (one for each record), you can do all into one statement, e.g. INSERT INTO %s (%s) values (%s), (%s), (%s), ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! i didn't know that you could write an insert statement like that

}

// Create a new db
db, err := dbm.NewDB()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern is weird

db, err := dbm.NewDB()
...
dbm.db = db

you're returning a db and setting into in the dbm later. Why not simply do the dbm.db = db inside NewDB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, if every time you call NewDB you have to call Setup, you can make Setup private and be called inside NewDb.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense 👍

return fmt.Errorf("upload all: %s", err)
}

basinStreamer := app.NewBasinStreamer(ns, r, dbm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment about an alternative more decoupled architecture. No need to change it.

One option would be to not have the Uploader as a dependency of DBManager. You could use a channel to synchronize both components and make them independent. Every time a file is ready to be uploaded, DBManager notifies the channel, Uploader picks up the notification and uploads the file. Notice that I'm not saying to make it async, it would still be blocking, but more decoupled and easier to test the parts independently.

return err
}

oldDBFname := dbm.dbFname
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simply do oldDBPath := path.Join(dbm.dbDir, dbm.dbFname) in L99

return fmt.Errorf("cannot stat file: %s", err)
}

progress := progressbar.DefaultBytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if the progress bar makes sense in a background routine, this is more useful for a user when executing a command

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can pass in nil to remove it but it looks okay to me since you can tell that the "replication is paused while this file uploads".

may we can change it later based on the user feedback

func (b *BasinStreamer) Run(ctx context.Context) error {
txs, table, err := b.replicator.StartReplication(ctx)
// Open a local DB for replaying txs
db, err := b.dbMngr.NewDB()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@sanderpick sanderpick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome work.

@avichalp avichalp force-pushed the avichalp/local-replication-blocking branch from 1dd5599 to 9ea6305 Compare November 9, 2023 07:22
@avichalp avichalp changed the base branch from main to next November 9, 2023 07:44
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
Signed-off-by: avichalp <hi@avichalp.me>
@avichalp avichalp force-pushed the avichalp/local-replication-blocking branch from 9ea6305 to e1358d9 Compare November 9, 2023 17:17
@avichalp avichalp changed the base branch from next to main November 10, 2023 09:03
@avichalp avichalp changed the base branch from main to next November 10, 2023 09:26
@avichalp avichalp merged commit cf7afc6 into next Nov 10, 2023
@avichalp avichalp deleted the avichalp/local-replication-blocking branch November 10, 2023 09:27
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants