diff --git a/src/flow/persistence/mod.rs b/src/flow/persistence/mod.rs index 83df54020..000100bbc 100644 --- a/src/flow/persistence/mod.rs +++ b/src/flow/persistence/mod.rs @@ -3,6 +3,7 @@ use buf_redux::BufWriter; use buf_redux::strategy::WhenFull; use serde_json; +use itertools::Itertools; use std::fs; use std::fs::{File, OpenOptions}; @@ -108,6 +109,8 @@ impl Parameters { } } +const RECOVERY_BATCH_SIZE: usize = 512; + /// Retrieves a vector of packets from the persistent log. pub fn retrieve_recovery_packets( nodes: &DomainNodes, @@ -136,6 +139,14 @@ pub fn retrieve_recovery_packets( entries.ok() }) .flat_map(|r| r) + // Merge packets into batches of RECOVERY_BATCH_SIZE: + .chunks(RECOVERY_BATCH_SIZE) + .into_iter() + .map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| { + acc.append(data); + acc + })) + // Then create Packet objects from the data: .enumerate() .map(|(i, data)| { let link = Link::new(*local_addr, *local_addr);