diff --git a/src/main.rs b/src/main.rs index c692684..a6f165b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -700,10 +700,10 @@ async fn main() -> anyhow::Result<()> { tables_to_sync, // Tables from filter sync_interval, // CLI: --sync-interval (default 60s) reconcile_interval, // CLI: --reconcile-interval (default 3600s) - 1000, // Batch size - None, // State file: use default - once, // CLI: --once (run single cycle) - no_reconcile, // CLI: --no-reconcile (disable delete detection) + database_replicator::utils::calculate_optimal_batch_size(), // Auto-detect based on available memory + None, // State file: use default + once, // CLI: --once (run single cycle) + no_reconcile, // CLI: --no-reconcile (disable delete detection) ) .await } diff --git a/src/utils.rs b/src/utils.rs index af418e7..b5b6650 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1318,10 +1318,299 @@ pub fn parse_pg_version_string(version_str: &str) -> Result { bail!("Could not parse PostgreSQL version from: {}", version_str) } +/// Get available system memory in bytes +/// +/// Cross-platform function that works on Linux, macOS, and Windows. +/// Returns the amount of memory available for use by applications. +/// +/// # Platform Details +/// +/// - **Linux**: Reads `MemAvailable` from `/proc/meminfo` +/// - **macOS**: Uses `sysctl hw.memsize` for total memory, estimates available +/// - **Windows**: Uses `GlobalMemoryStatusEx` API +/// +/// # Returns +/// +/// Available memory in bytes, or an error if detection fails. +/// +/// # Examples +/// +/// ```no_run +/// use database_replicator::utils::get_available_memory; +/// +/// let available = get_available_memory().unwrap(); +/// println!("Available memory: {} MB", available / 1024 / 1024); +/// ``` +pub fn get_available_memory() -> Result { + #[cfg(target_os = "linux")] + { + get_available_memory_linux() + } + + #[cfg(target_os = "macos")] + { + get_available_memory_macos() + } + + #[cfg(target_os = "windows")] + { + get_available_memory_windows() + } + + #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] + { + // Fallback: assume 1GB available for unknown platforms + tracing::warn!("Memory detection not supported on this platform, assuming 1GB available"); + Ok(1024 * 1024 * 1024) + } +} + +#[cfg(target_os = "linux")] +fn get_available_memory_linux() -> Result { + use std::fs; + + let meminfo = fs::read_to_string("/proc/meminfo").context("Failed to read /proc/meminfo")?; + + // Try MemAvailable first (more accurate, available since Linux 3.14) + for line in meminfo.lines() { + if line.starts_with("MemAvailable:") { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + let kb: u64 = parts[1] + .parse() + .context("Failed to parse MemAvailable value")?; + return Ok(kb * 1024); // Convert KB to bytes + } + } + } + + // Fallback: MemFree + Buffers + Cached (less accurate but works on older kernels) + let mut mem_free: u64 = 0; + let mut buffers: u64 = 0; + let mut cached: u64 = 0; + + for line in meminfo.lines() { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + let value: u64 = parts[1].parse().unwrap_or(0); + if line.starts_with("MemFree:") { + mem_free = value; + } else if line.starts_with("Buffers:") { + buffers = value; + } else if line.starts_with("Cached:") && !line.starts_with("SwapCached:") { + cached = value; + } + } + } + + Ok((mem_free + buffers + cached) * 1024) // Convert KB to bytes +} + +#[cfg(target_os = "macos")] +fn get_available_memory_macos() -> Result { + use std::process::Command; + + // Get total physical memory using sysctl + let output = Command::new("sysctl") + .args(["-n", "hw.memsize"]) + .output() + .context("Failed to execute sysctl")?; + + let total_str = String::from_utf8_lossy(&output.stdout); + let total_bytes: u64 = total_str + .trim() + .parse() + .context("Failed to parse hw.memsize")?; + + // Get actual page size using sysctl hw.pagesize + // Intel Macs use 4KB (4096), Apple Silicon uses 16KB (16384) + let page_size: u64 = { + let page_output = Command::new("sysctl") + .args(["-n", "hw.pagesize"]) + .output() + .context("Failed to execute sysctl hw.pagesize")?; + + let page_str = String::from_utf8_lossy(&page_output.stdout); + page_str.trim().parse().unwrap_or(4096) // Default to 4KB if parsing fails + }; + + // Get free pages using vm_stat + let vm_output = Command::new("vm_stat") + .output() + .context("Failed to execute vm_stat")?; + + let vm_stat = String::from_utf8_lossy(&vm_output.stdout); + + // Parse free and inactive pages from vm_stat + let mut pages_free: u64 = 0; + let mut pages_inactive: u64 = 0; + let mut pages_purgeable: u64 = 0; + + for line in vm_stat.lines() { + if line.starts_with("Pages free:") { + pages_free = parse_vm_stat_value(line); + } else if line.starts_with("Pages inactive:") { + pages_inactive = parse_vm_stat_value(line); + } else if line.starts_with("Pages purgeable:") { + pages_purgeable = parse_vm_stat_value(line); + } + } + + // Available = free + inactive + purgeable (conservative estimate) + let available = (pages_free + pages_inactive + pages_purgeable) * page_size; + + // If vm_stat parsing failed, estimate 50% of total as available + if available == 0 { + tracing::debug!("vm_stat parsing returned 0, estimating 50% of total memory as available"); + return Ok(total_bytes / 2); + } + + Ok(available) +} + +#[cfg(target_os = "macos")] +fn parse_vm_stat_value(line: &str) -> u64 { + // Format: "Pages free: 12345." + line.split(':') + .nth(1) + .and_then(|s| s.trim().trim_end_matches('.').parse().ok()) + .unwrap_or(0) +} + +#[cfg(target_os = "windows")] +fn get_available_memory_windows() -> Result { + use std::mem; + + // MEMORYSTATUSEX structure + #[repr(C)] + #[allow(non_snake_case)] + struct MEMORYSTATUSEX { + dwLength: u32, + dwMemoryLoad: u32, + ullTotalPhys: u64, + ullAvailPhys: u64, + ullTotalPageFile: u64, + ullAvailPageFile: u64, + ullTotalVirtual: u64, + ullAvailVirtual: u64, + ullAvailExtendedVirtual: u64, + } + + #[link(name = "kernel32")] + extern "system" { + fn GlobalMemoryStatusEx(lpBuffer: *mut MEMORYSTATUSEX) -> i32; + } + + let mut mem_status: MEMORYSTATUSEX = unsafe { mem::zeroed() }; + mem_status.dwLength = mem::size_of::() as u32; + + let result = unsafe { GlobalMemoryStatusEx(&mut mem_status) }; + + if result == 0 { + bail!("GlobalMemoryStatusEx failed"); + } + + Ok(mem_status.ullAvailPhys) +} + +/// Calculate optimal batch size based on available system memory +/// +/// Automatically determines an appropriate batch size for the sync daemon +/// based on the amount of available system memory. This prevents OOM errors +/// on memory-constrained instances while maximizing throughput on larger ones. +/// +/// # Memory Model +/// +/// The calculation assumes: +/// - Each row consumes approximately 2KB in memory (conservative estimate for wide tables) +/// - We should use at most 25% of available memory for batch processing +/// - Minimum batch size: 1,000 rows (for very constrained systems) +/// - Maximum batch size: 50,000 rows (diminishing returns beyond this) +/// +/// # Returns +/// +/// Optimal batch size in number of rows, or default of 10,000 if detection fails. +/// +/// # Examples +/// +/// ```no_run +/// use database_replicator::utils::calculate_optimal_batch_size; +/// +/// let batch_size = calculate_optimal_batch_size(); +/// println!("Using batch size: {}", batch_size); +/// // On t3.nano (512MB): ~1,000-2,000 +/// // On t3.small (2GB): ~10,000 +/// // On t3.large (8GB): ~50,000 (capped) +/// ``` +pub fn calculate_optimal_batch_size() -> usize { + const BYTES_PER_ROW: u64 = 2048; // Conservative: 2KB per row + const MEMORY_FRACTION: f64 = 0.25; // Use at most 25% of available memory + const MIN_BATCH_SIZE: usize = 1_000; + const MAX_BATCH_SIZE: usize = 50_000; + const DEFAULT_BATCH_SIZE: usize = 10_000; + + match get_available_memory() { + Ok(available_bytes) => { + // Calculate how many rows we can fit in 25% of available memory + let usable_bytes = (available_bytes as f64 * MEMORY_FRACTION) as u64; + let calculated_size = (usable_bytes / BYTES_PER_ROW) as usize; + + // Clamp to min/max range + let batch_size = calculated_size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE); + + tracing::info!( + "Auto-detected batch size: {} (available memory: {} MB)", + batch_size, + available_bytes / 1024 / 1024 + ); + + batch_size + } + Err(e) => { + tracing::warn!( + "Failed to detect available memory: {}. Using default batch size: {}", + e, + DEFAULT_BATCH_SIZE + ); + DEFAULT_BATCH_SIZE + } + } +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_get_available_memory() { + // This should work on all supported platforms + let result = get_available_memory(); + + // Should succeed (may fail in very restricted environments) + if let Ok(available) = result { + // Sanity check: should be at least 10MB, less than 1TB + assert!( + available > 10 * 1024 * 1024, + "Available memory too low: {}", + available + ); + assert!( + available < 1024 * 1024 * 1024 * 1024, + "Available memory too high: {}", + available + ); + } + } + + #[test] + fn test_calculate_optimal_batch_size() { + let batch_size = calculate_optimal_batch_size(); + + // Should be within expected range + assert!(batch_size >= 1_000, "Batch size too small: {}", batch_size); + assert!(batch_size <= 50_000, "Batch size too large: {}", batch_size); + } + #[test] fn test_validate_connection_string_valid() { assert!(validate_connection_string("postgresql://user:pass@localhost:5432/dbname").is_ok()); diff --git a/src/xmin/daemon.rs b/src/xmin/daemon.rs index 2baff6b..ecdb7c3 100644 --- a/src/xmin/daemon.rs +++ b/src/xmin/daemon.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::time::Duration; use tokio::time::interval; -use super::reader::XminReader; +use super::reader::{detect_wraparound, WraparoundCheck, XminReader}; use super::reconciler::Reconciler; use super::state::SyncState; use super::writer::{get_primary_key_columns, get_table_columns, row_to_values, ChangeWriter}; @@ -35,7 +35,7 @@ impl Default for DaemonConfig { sync_interval: Duration::from_secs(3600), // 1 hour reconcile_interval: Some(Duration::from_secs(86400)), // 1 day state_path: SyncState::default_path(), - batch_size: 1000, + batch_size: 10_000, // 10K rows per batch for good throughput while bounding memory tables: Vec::new(), schema: "public".to_string(), } @@ -203,7 +203,12 @@ impl SyncDaemon { } match reconciler - .reconcile_table(&self.config.schema, table, &pk_columns) + .reconcile_table_batched( + &self.config.schema, + table, + &pk_columns, + self.config.batch_size, + ) .await { Ok(deleted) => { @@ -323,7 +328,11 @@ impl SyncDaemon { Ok(()) } - /// Sync a single table. + /// Sync a single table using batched processing. + /// + /// This method processes rows in batches to avoid loading entire tables into memory. + /// This is critical for large tables (millions of rows) where loading everything + /// at once would cause OOM or connection timeouts. async fn sync_table( &self, reader: &XminReader<'_>, @@ -334,7 +343,7 @@ impl SyncDaemon { ) -> Result { // Get table state let table_state = state.get_or_create_table(schema, table); - let since_xmin = table_state.last_xmin; + let stored_xmin = table_state.last_xmin; // Get table metadata from SOURCE (not target - tables may not exist there yet) let columns = get_table_columns(reader.client(), schema, table).await?; @@ -346,54 +355,109 @@ impl SyncDaemon { let column_names: Vec = columns.iter().map(|(name, _)| name.clone()).collect(); - // Read changes with wraparound detection - let (rows, max_xmin, was_full_sync) = reader - .read_changes_with_wraparound_check(schema, table, &column_names, since_xmin) + // Check for xmin wraparound before starting + let current_xmin = reader.get_current_xmin().await?; + let (since_xmin, is_full_sync) = if detect_wraparound(stored_xmin, current_xmin) + == WraparoundCheck::WraparoundDetected + { + tracing::warn!( + "xmin wraparound detected for {}.{} - performing full table sync", + schema, + table + ); + (0, true) // Start from beginning + } else { + (stored_xmin, false) + }; + + // Use batched reading to avoid loading entire table into memory + let batch_size = self.config.batch_size; + let mut batch_reader = reader + .read_changes_batched(schema, table, &column_names, since_xmin, batch_size) .await?; - if rows.is_empty() { + let mut total_rows = 0u64; + let mut max_xmin = since_xmin; + let mut batch_count = 0u64; + + // Process batches until exhausted + while let Some((rows, batch_max_xmin)) = reader.fetch_batch(&mut batch_reader).await? { + if rows.is_empty() { + break; + } + + batch_count += 1; + let batch_len = rows.len(); + + // Log first batch with total context, then periodic progress + if batch_count == 1 { + if is_full_sync { + tracing::info!( + "Starting full table sync for {}.{} (batch size: {})", + schema, + table, + batch_size + ); + } else { + tracing::info!( + "Found changes in {}.{} (xmin {} -> {}), processing in batches", + schema, + table, + since_xmin, + batch_max_xmin + ); + } + } + + // Convert and apply batch immediately (memory = O(batch_size)) + let values: Vec>> = rows + .iter() + .map(|row| row_to_values(row, &columns)) + .collect(); + + let affected = writer + .apply_batch(schema, table, &pk_columns, &column_names, values) + .await?; + + total_rows += affected; + max_xmin = batch_max_xmin; + + // Update state after each batch for resume capability + state.update_table(schema, table, max_xmin, affected); + + // Log progress every 10 batches or 100K rows + if batch_count.is_multiple_of(10) || total_rows % 100_000 < batch_len as u64 { + tracing::info!( + "Progress: {}.{} - {} rows synced ({} batches), current xmin: {}", + schema, + table, + total_rows, + batch_count, + max_xmin + ); + } + } + + if total_rows == 0 { tracing::debug!( "No changes in {}.{} since xmin {}", schema, table, since_xmin ); - return Ok(0); - } - - if was_full_sync { - tracing::warn!( - "xmin wraparound detected for {}.{} - performed full table sync ({} rows)", - schema, - table, - rows.len() - ); } else { tracing::info!( - "Found {} changed rows in {}.{} (xmin {} -> {})", - rows.len(), + "Completed sync for {}.{}: {} rows in {} batches (xmin {} -> {})", schema, table, + total_rows, + batch_count, since_xmin, max_xmin ); } - // Convert rows to values (excluding the _xmin column we added) - let values: Vec>> = rows - .iter() - .map(|row| row_to_values(row, &columns)) - .collect(); - - // Apply changes - let affected = writer - .apply_batch(schema, table, &pk_columns, &column_names, values) - .await?; - - // Update state - state.update_table(schema, table, max_xmin, affected); - - Ok(affected) + Ok(total_rows) } /// Load existing state or create new state. @@ -431,7 +495,7 @@ mod tests { let config = DaemonConfig::default(); assert_eq!(config.sync_interval, Duration::from_secs(3600)); assert_eq!(config.reconcile_interval, Some(Duration::from_secs(86400))); - assert_eq!(config.batch_size, 1000); + assert_eq!(config.batch_size, 10_000); assert_eq!(config.schema, "public"); } diff --git a/src/xmin/reader.rs b/src/xmin/reader.rs index 6df328f..336e28d 100644 --- a/src/xmin/reader.rs +++ b/src/xmin/reader.rs @@ -167,12 +167,17 @@ impl<'a> XminReader<'a> { table: table.to_string(), columns: columns.to_vec(), current_xmin: since_xmin, + last_ctid: None, batch_size, exhausted: false, }) } /// Execute a batched read query and return the next batch. + /// + /// Uses (xmin, ctid) as the pagination key to correctly handle cases where + /// many rows share the same xmin (e.g., bulk inserts in a single transaction). + /// Without ctid tie-breaking, rows with duplicate xmin values would be skipped. pub async fn fetch_batch( &self, batch_reader: &mut BatchReader, @@ -192,45 +197,81 @@ impl<'a> XminReader<'a> { .join(", ") }; - let query = format!( - "SELECT {}, xmin::text::bigint as _xmin FROM \"{}\".\"{}\" \ - WHERE xmin::text::bigint > $1 \ - ORDER BY xmin::text::bigint \ - LIMIT $2", - column_list, batch_reader.schema, batch_reader.table - ); - - let rows = self - .client - .query( - &query, - &[ - &(batch_reader.current_xmin as i64), - &(batch_reader.batch_size as i64), - ], - ) - .await - .with_context(|| { - format!( - "Failed to read batch from {}.{}", - batch_reader.schema, batch_reader.table + // Use (xmin, ctid) as compound pagination key to handle duplicate xmin values. + // ctid is the physical tuple location and provides a stable tie-breaker. + let (query, rows) = if let Some(ref last_ctid) = batch_reader.last_ctid { + // Subsequent batches: use compound (xmin, ctid) > ($1, $2) filter + let query = format!( + "SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \ + FROM \"{}\".\"{}\" \ + WHERE (xmin::text::bigint, ctid) > ($1, $2::tid) \ + ORDER BY xmin::text::bigint, ctid \ + LIMIT $3", + column_list, batch_reader.schema, batch_reader.table + ); + + let rows = self + .client + .query( + &query, + &[ + &(batch_reader.current_xmin as i64), + &last_ctid, + &(batch_reader.batch_size as i64), + ], + ) + .await + .with_context(|| { + format!( + "Failed to read batch from {}.{}", + batch_reader.schema, batch_reader.table + ) + })?; + (query, rows) + } else { + // First batch: simple xmin > $1 filter + let query = format!( + "SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \ + FROM \"{}\".\"{}\" \ + WHERE xmin::text::bigint > $1 \ + ORDER BY xmin::text::bigint, ctid \ + LIMIT $2", + column_list, batch_reader.schema, batch_reader.table + ); + + let rows = self + .client + .query( + &query, + &[ + &(batch_reader.current_xmin as i64), + &(batch_reader.batch_size as i64), + ], ) - })?; + .await + .with_context(|| { + format!( + "Failed to read batch from {}.{}", + batch_reader.schema, batch_reader.table + ) + })?; + (query, rows) + }; + + // Suppress unused variable warning - query is useful for debugging + let _ = query; if rows.is_empty() { batch_reader.exhausted = true; return Ok(None); } - // Update current_xmin to the max in this batch - let max_xmin = rows - .iter() - .map(|row| { - let xmin: i64 = row.get("_xmin"); - (xmin & 0xFFFFFFFF) as u32 - }) - .max() - .unwrap_or(batch_reader.current_xmin); + // Get xmin and ctid from the last row for next iteration's pagination + let last_row = rows.last().unwrap(); + let last_xmin: i64 = last_row.get("_xmin"); + let last_ctid: String = last_row.get("_ctid"); + + let max_xmin = (last_xmin & 0xFFFFFFFF) as u32; // Mark as exhausted if we got fewer rows than batch_size if rows.len() < batch_reader.batch_size { @@ -238,6 +279,7 @@ impl<'a> XminReader<'a> { } batch_reader.current_xmin = max_xmin; + batch_reader.last_ctid = Some(last_ctid); Ok(Some((rows, max_xmin))) } @@ -437,11 +479,17 @@ impl<'a> XminReader<'a> { } /// Batch reader state for iterating over large result sets. +/// +/// Uses (xmin, ctid) as the pagination key to handle cases where many rows +/// share the same xmin (e.g., bulk inserts in a single transaction). pub struct BatchReader { pub schema: String, pub table: String, pub columns: Vec, pub current_xmin: u32, + /// Last seen ctid for tie-breaking when multiple rows have same xmin. + /// Format: "(page,tuple)" e.g., "(0,1)" + pub last_ctid: Option, pub batch_size: usize, pub exhausted: bool, } @@ -466,6 +514,7 @@ mod tests { table: "users".to_string(), columns: vec!["id".to_string(), "name".to_string()], current_xmin: 0, + last_ctid: None, batch_size: 1000, exhausted: false, }; @@ -473,6 +522,7 @@ mod tests { assert_eq!(reader.schema, "public"); assert_eq!(reader.table, "users"); assert_eq!(reader.current_xmin, 0); + assert!(reader.last_ctid.is_none()); assert!(!reader.exhausted); } diff --git a/src/xmin/reconciler.rs b/src/xmin/reconciler.rs index c0d96f2..18c78ed 100644 --- a/src/xmin/reconciler.rs +++ b/src/xmin/reconciler.rs @@ -2,6 +2,7 @@ // ABOUTME: Compares primary keys between source and target to find orphaned rows use anyhow::{Context, Result}; +use std::cmp::Ordering; use std::collections::HashSet; use tokio_postgres::types::ToSql; use tokio_postgres::Client; @@ -125,6 +126,9 @@ impl<'a> Reconciler<'a> { } /// Get all primary key values from a table. + /// + /// Note: Uses `::text` cast for both SELECT and ORDER BY to ensure consistent + /// lexicographic ordering that matches Rust string comparison. async fn get_all_primary_keys( &self, client: &Client, @@ -132,21 +136,18 @@ impl<'a> Reconciler<'a> { table: &str, primary_key_columns: &[String], ) -> Result>> { - let pk_cols: Vec = primary_key_columns + // Use ::text cast for both SELECT and ORDER BY to match Rust comparison + let pk_cols_text: Vec = primary_key_columns .iter() .map(|c| format!("\"{}\"::text", c)) .collect(); let query = format!( "SELECT {} FROM \"{}\".\"{}\" ORDER BY {}", - pk_cols.join(", "), + pk_cols_text.join(", "), schema, table, - primary_key_columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect::>() - .join(", ") + pk_cols_text.join(", ") ); let rows = client @@ -202,6 +203,351 @@ impl<'a> Reconciler<'a> { Ok(row.get(0)) } + + /// Reconcile a table using batched streaming comparison (memory-efficient). + /// + /// Uses merge-join comparison on sorted primary keys fetched in batches. + /// This avoids loading all PKs into memory, making it suitable for tables + /// with millions of rows. + /// + /// # Arguments + /// + /// * `schema` - Schema name + /// * `table` - Table name + /// * `primary_key_columns` - Primary key column names + /// * `batch_size` - Number of PKs to fetch per batch + /// + /// # Returns + /// + /// The number of orphaned rows deleted from target. + pub async fn reconcile_table_batched( + &self, + schema: &str, + table: &str, + primary_key_columns: &[String], + batch_size: usize, + ) -> Result { + tracing::info!( + "Starting batched reconciliation for {}.{} (batch size: {})", + schema, + table, + batch_size + ); + + let writer = ChangeWriter::new(self.target_client); + let mut total_deleted = 0u64; + let mut orphans_batch: Vec> = Vec::new(); + + // Initialize batch readers for both source and target + let mut source_reader = PkBatchReader::new( + self.source_client, + schema, + table, + primary_key_columns, + batch_size, + ); + let mut target_reader = PkBatchReader::new( + self.target_client, + schema, + table, + primary_key_columns, + batch_size, + ); + + // Fetch initial batches + let mut source_batch = source_reader.fetch_next().await?; + let mut target_batch = target_reader.fetch_next().await?; + let mut source_idx = 0; + let mut target_idx = 0; + let mut comparisons = 0u64; + + // Merge-join comparison loop + loop { + // Refill source batch if exhausted + if source_idx >= source_batch.len() && !source_reader.exhausted { + source_batch = source_reader.fetch_next().await?; + source_idx = 0; + } + + // Refill target batch if exhausted + if target_idx >= target_batch.len() && !target_reader.exhausted { + target_batch = target_reader.fetch_next().await?; + target_idx = 0; + } + + // Check termination conditions + let source_exhausted = source_idx >= source_batch.len(); + let target_exhausted = target_idx >= target_batch.len(); + + if source_exhausted && target_exhausted { + // Both exhausted - done + break; + } + + if source_exhausted { + // Source exhausted but target has more - all remaining are orphans + while target_idx < target_batch.len() { + orphans_batch.push(target_batch[target_idx].clone()); + target_idx += 1; + + // Delete batch when full + if orphans_batch.len() >= batch_size { + total_deleted += self + .delete_orphan_batch( + &writer, + schema, + table, + primary_key_columns, + &orphans_batch, + ) + .await?; + orphans_batch.clear(); + } + } + + // Fetch more from target + if !target_reader.exhausted { + target_batch = target_reader.fetch_next().await?; + target_idx = 0; + } + continue; + } + + if target_exhausted { + // Target exhausted but source has more - no more orphans possible + break; + } + + // Compare current PKs + let source_pk = &source_batch[source_idx]; + let target_pk = &target_batch[target_idx]; + comparisons += 1; + + match compare_pks(source_pk, target_pk) { + Ordering::Equal => { + // PKs match - both exist, advance both + source_idx += 1; + target_idx += 1; + } + Ordering::Less => { + // Source PK < Target PK - source has row target doesn't + // This is fine, just advance source + source_idx += 1; + } + Ordering::Greater => { + // Source PK > Target PK - target has orphan + orphans_batch.push(target_pk.clone()); + target_idx += 1; + + // Delete batch when full + if orphans_batch.len() >= batch_size { + total_deleted += self + .delete_orphan_batch( + &writer, + schema, + table, + primary_key_columns, + &orphans_batch, + ) + .await?; + orphans_batch.clear(); + } + } + } + + // Log progress periodically + if comparisons.is_multiple_of(100_000) { + tracing::info!( + "Reconciliation progress for {}.{}: {} comparisons, {} orphans found", + schema, + table, + comparisons, + total_deleted + orphans_batch.len() as u64 + ); + } + } + + // Delete remaining orphans + if !orphans_batch.is_empty() { + total_deleted += self + .delete_orphan_batch(&writer, schema, table, primary_key_columns, &orphans_batch) + .await?; + } + + tracing::info!( + "Completed reconciliation for {}.{}: {} comparisons, {} orphans deleted", + schema, + table, + comparisons, + total_deleted + ); + + Ok(total_deleted) + } + + /// Delete a batch of orphan rows. + async fn delete_orphan_batch( + &self, + writer: &ChangeWriter<'_>, + schema: &str, + table: &str, + primary_key_columns: &[String], + orphans: &[Vec], + ) -> Result { + if orphans.is_empty() { + return Ok(0); + } + + tracing::debug!( + "Deleting batch of {} orphan rows from {}.{}", + orphans.len(), + schema, + table + ); + + // Convert string PKs to ToSql values + let pk_values: Vec>> = orphans + .iter() + .map(|pk| { + pk.iter() + .map(|v| Box::new(v.clone()) as Box) + .collect() + }) + .collect(); + + writer + .delete_rows(schema, table, primary_key_columns, pk_values) + .await + } +} + +/// Compare two primary key tuples lexicographically. +fn compare_pks(a: &[String], b: &[String]) -> Ordering { + for (av, bv) in a.iter().zip(b.iter()) { + match av.cmp(bv) { + Ordering::Equal => continue, + other => return other, + } + } + a.len().cmp(&b.len()) +} + +/// Batch reader for primary keys using keyset pagination. +/// +/// Fetches PKs in sorted order using WHERE pk > last_pk LIMIT batch_size, +/// which is more efficient than OFFSET for large tables. +struct PkBatchReader<'a> { + client: &'a Client, + schema: String, + table: String, + pk_columns: Vec, + batch_size: usize, + last_pk: Option>, + pub exhausted: bool, +} + +impl<'a> PkBatchReader<'a> { + fn new( + client: &'a Client, + schema: &str, + table: &str, + pk_columns: &[String], + batch_size: usize, + ) -> Self { + Self { + client, + schema: schema.to_string(), + table: table.to_string(), + pk_columns: pk_columns.to_vec(), + batch_size, + last_pk: None, + exhausted: false, + } + } + + /// Fetch the next batch of primary keys. + /// + /// IMPORTANT: Both SELECT and ORDER BY use `::text` cast to ensure the SQL + /// stream order matches the lexicographic comparison used in Rust. Without + /// this, numeric PKs would be ordered numerically in SQL (1, 2, 10) but + /// compared lexicographically in Rust ("1" < "10" < "2"), causing false + /// orphan detection and data loss. + async fn fetch_next(&mut self) -> Result>> { + if self.exhausted { + return Ok(Vec::new()); + } + + // Cast PKs to text for both SELECT and ORDER BY to ensure SQL stream + // order matches Rust's lexicographic string comparison + let pk_cols_text: Vec = self + .pk_columns + .iter() + .map(|c| format!("\"{}\"::text", c)) + .collect(); + + let query = if self.last_pk.is_some() { + // Keyset pagination: WHERE (pk1::text, pk2::text, ...) > ($1, $2, ...) + // Must use text-cast columns in WHERE to match ORDER BY ordering + let params: Vec = (1..=self.pk_columns.len()) + .map(|i| format!("${}", i)) + .collect(); + + format!( + "SELECT {} FROM \"{}\".\"{}\" WHERE ({}) > ({}) ORDER BY {} LIMIT {}", + pk_cols_text.join(", "), + self.schema, + self.table, + pk_cols_text.join(", "), + params.join(", "), + pk_cols_text.join(", "), + self.batch_size + ) + } else { + // First batch: no WHERE clause + format!( + "SELECT {} FROM \"{}\".\"{}\" ORDER BY {} LIMIT {}", + pk_cols_text.join(", "), + self.schema, + self.table, + pk_cols_text.join(", "), + self.batch_size + ) + }; + + // Build parameters for keyset pagination + let params: Vec<&(dyn ToSql + Sync)> = if let Some(ref last) = self.last_pk { + last.iter().map(|s| s as &(dyn ToSql + Sync)).collect() + } else { + Vec::new() + }; + + let rows = self.client.query(&query, ¶ms).await.with_context(|| { + format!( + "Failed to fetch PK batch from {}.{}", + self.schema, self.table + ) + })?; + + if rows.len() < self.batch_size { + self.exhausted = true; + } + + let pks: Vec> = rows + .iter() + .map(|row| { + (0..self.pk_columns.len()) + .map(|i| row.get::<_, String>(i)) + .collect() + }) + .collect(); + + // Update last_pk for next iteration + if let Some(last_row) = pks.last() { + self.last_pk = Some(last_row.clone()); + } + + Ok(pks) + } } /// Configuration for reconciliation behavior.