Problem
The sync command's xmin-based daemon loads entire tables into memory before processing, causing:
- Massive memory usage - 10GB+ VSZ for tables with millions of rows
- Connection timeouts - Long-running queries exceed ELB/proxy idle timeouts (typically 60s-10min)
- Failed syncs - All tables fail with "connection closed" errors
Evidence
Production logs showing the issue:
2025-12-10T17:43:46Z INFO Found 14446803 changed rows in public.serendb_coingeckoprice (xmin 0 -> 2471865127)
2025-12-10T18:01:37Z ERROR Failed to sync public.serendb_coingeckoprice: Failed to upsert batch
Caused by: connection closed
- 14.4 million rows attempted to load into memory
- ~18 minutes before connection timeout
- Process showed 10GB VSZ (virtual memory)
Root Cause
In src/xmin/daemon.rs:350-386, sync_table loads all rows at once:
// PROBLEM: Loads ALL rows into memory
let (rows, max_xmin, was_full_sync) = reader
.read_changes_with_wraparound_check(schema, table, &column_names, since_xmin)
.await?;
// PROBLEM: Creates another full copy in memory
let values: Vec<Vec<Box<dyn ToSql>>> = rows
.iter()
.map(|row| row_to_values(row, &columns))
.collect();
Both read_changes() and read_all_rows() in src/xmin/reader.rs use .query() which returns all results at once.
Ironically, a batched reader already exists but isn't used: read_changes_batched() and fetch_batch() in reader.rs (lines 157-243).
Solution
Modify sync_table to use streaming/batched processing:
1. Use existing batched reader
async fn sync_table(&self, ...) -> Result<u64> {
// ... setup code ...
let batch_size = self.config.batch_size; // Default 10,000
let mut batch_reader = reader
.read_changes_batched(schema, table, &column_names, since_xmin, batch_size)
.await?;
let mut total_rows = 0u64;
let mut max_xmin = since_xmin;
while let Some((rows, batch_max_xmin)) = reader.fetch_batch(&mut batch_reader).await? {
if rows.is_empty() {
break;
}
tracing::debug!(
"Processing batch of {} rows for {}.{} (xmin {} -> {})",
rows.len(), schema, table, max_xmin, batch_max_xmin
);
// Convert and apply batch immediately (memory = O(batch_size))
let values: Vec<Vec<Box<dyn ToSql + Sync + Send>>> = rows
.iter()
.map(|row| row_to_values(row, &columns))
.collect();
let affected = writer
.apply_batch(schema, table, &pk_columns, &column_names, values)
.await?;
total_rows += affected;
max_xmin = batch_max_xmin;
// Update state after each batch for resume capability
state.update_table(schema, table, max_xmin, affected);
}
Ok(total_rows)
}
2. Handle wraparound detection
Add wraparound check before starting batched read:
// Check for wraparound at start
let current_xmin = reader.get_current_xmin().await?;
let since_xmin = if detect_wraparound(table_state.last_xmin, current_xmin) == WraparoundCheck::WraparoundDetected {
tracing::warn!("xmin wraparound detected for {}.{} - performing full table sync", schema, table);
0 // Start from beginning
} else {
table_state.last_xmin
};
3. Add progress logging
For large tables, log progress periodically:
if total_rows > 0 && total_rows % 100_000 == 0 {
tracing::info!(
"Synced {} rows so far for {}.{}",
total_rows, schema, table
);
}
Benefits
| Metric |
Before |
After |
| Memory |
O(total_rows) ~10GB |
O(batch_size) ~10MB |
| Connection idle time |
Minutes (query duration) |
Seconds (batch query) |
| Resume capability |
None (all or nothing) |
Per-batch (state saved) |
| Progress visibility |
None until complete |
Logged per batch |
Testing
- Unit test: Mock batched reader, verify batches processed correctly
- Integration test: Sync table with >100K rows, verify memory stays bounded
- Manual test: Sync the production
serendb_coingeckoprice table (14.4M rows)
Related
- Connection timeout troubleshooting in CLAUDE.md mentions ELB idle timeouts
- Batch size is already configurable via
DaemonConfig::batch_size (default 1000, should increase to 10000)
Problem
The
synccommand's xmin-based daemon loads entire tables into memory before processing, causing:Evidence
Production logs showing the issue:
Root Cause
In
src/xmin/daemon.rs:350-386,sync_tableloads all rows at once:Both
read_changes()andread_all_rows()insrc/xmin/reader.rsuse.query()which returns all results at once.Ironically, a batched reader already exists but isn't used:
read_changes_batched()andfetch_batch()in reader.rs (lines 157-243).Solution
Modify
sync_tableto use streaming/batched processing:1. Use existing batched reader
2. Handle wraparound detection
Add wraparound check before starting batched read:
3. Add progress logging
For large tables, log progress periodically:
Benefits
Testing
serendb_coingeckopricetable (14.4M rows)Related
DaemonConfig::batch_size(default 1000, should increase to 10000)