Skip to content

fix(copy_data): add retries for copy_data command#916

Merged
meskill merged 10 commits intomainfrom
fix/copy_data_retry
Apr 22, 2026
Merged

fix(copy_data): add retries for copy_data command#916
meskill merged 10 commits intomainfrom
fix/copy_data_retry

Conversation

@meskill
Copy link
Copy Markdown
Contributor

@meskill meskill commented Apr 20, 2026

fixes #897

Comment on lines 127 to 152
@@ -106,22 +146,19 @@ impl ParallelSyncManager {
self.permit.available_permits() / self.replicas.len(),
);

let mut replicas_iter = self.replicas.iter();
// Loop through replicas, one at a time.
// This works around Rust iterators not having a "rewind" function.
let replica = loop {
if let Some(replica) = replicas_iter.next() {
break replica;
} else {
replicas_iter = self.replicas.iter();
}
};
// cycle() is the idiomatic "rewind": it restarts the iterator from the
// beginning once exhausted, giving round-robin distribution across replicas.
let mut replicas_iter = self.replicas.iter().cycle();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@levkk could you please check this. Did I get the idea right about the original intention?

@blacksmith-sh

This comment has been minimized.

Comment thread pgdog/src/backend/pool/cluster.rs Outdated
Comment thread pgdog/src/backend/replication/logical/publisher/parallel_sync.rs Outdated

tokio::time::sleep(backoff).await;

if let Err(trunc_err) = self.table.truncate_destination(&self.dest).await {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should do this for two reasons:

  1. COPY is atomic and transactional: if it fails, none of the rows will be saved in the table; it will be empty when we retry
  2. TRUNCATE is a scary command to run for now. We should add a bunch more tests and conditions that prevent this from accidentally being called on the source DB. For now, let's have the user truncate manually if this retry logic fails for some reason.

There is a chance for a race condition where the table copy completes and we get an error somewhere below, e.g., while running COMMIT, but the odds of that are slim. We should definitely account for this (and truncate), but after we implement a few "this is definitely the destination" checks. I'll write up a separate issue for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, dropped the truncate, left the comment for future

@meskill meskill marked this pull request as ready for review April 22, 2026 14:50
Ok((0, 0))
}

async fn flush(&mut self) -> Result<(usize, usize), Error> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_one is actually not free! The ParallelConnection gave us I think a 30-40% copy speed boost. Have you benchmarked this? Also curious what made you refactor this one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to separate pr to verify and test it #920

Comment thread pgdog/src/backend/pool/shard/monitor.rs Outdated

impl ShardMonitor {
async fn spawn(&self) {
if self.shard.comms().lsn_check_interval == Duration::MAX {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We test this code somewhere in CI right? Just double checking. This is the replica/primary promoter, so we need to be sure its tested before tweaking it. At the very least, test it locally (with role = "auto") making sure it still works. I think this change is fine, although probably a no-op since the code below won't take any action if data provided by lsn_check_interval-controlled loop isn't provided.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this

/// Prevents accumulated counts from a discarded attempt inflating totals
/// and throughput calculations across retries.
pub(crate) fn reset(&self) {
if let Some(mut state) = TableCopies::get().get_mut(self) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad idea to track retries, but can add that as a follow-up.

@levkk
Copy link
Copy Markdown
Collaborator

levkk commented Apr 22, 2026

The mirror test is flaky. The rest looks good to me!

@blacksmith-sh

This comment has been minimized.

Copy link
Copy Markdown
Collaborator

@levkk levkk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be good to go!

@meskill meskill merged commit 851d230 into main Apr 22, 2026
10 of 11 checks passed
@meskill meskill deleted the fix/copy_data_retry branch April 22, 2026 20:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Resharding] Retry COPY on transient errors [Resharding] COPY_DATA: reliability of the data sync stage

2 participants