From 57cd580f1b75406e40c837e832058bbb5cf87fed Mon Sep 17 00:00:00 2001 From: Martin Ek Date: Wed, 25 Oct 2017 17:59:48 +0200 Subject: [PATCH] Merge recovery packets into batches --- src/flow/persistence/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/flow/persistence/mod.rs b/src/flow/persistence/mod.rs index 83df54020..000100bbc 100644 --- a/src/flow/persistence/mod.rs +++ b/src/flow/persistence/mod.rs @@ -3,6 +3,7 @@ use buf_redux::BufWriter; use buf_redux::strategy::WhenFull; use serde_json; +use itertools::Itertools; use std::fs; use std::fs::{File, OpenOptions}; @@ -108,6 +109,8 @@ impl Parameters { } } +const RECOVERY_BATCH_SIZE: usize = 512; + /// Retrieves a vector of packets from the persistent log. pub fn retrieve_recovery_packets( nodes: &DomainNodes, @@ -136,6 +139,14 @@ pub fn retrieve_recovery_packets( entries.ok() }) .flat_map(|r| r) + // Merge packets into batches of RECOVERY_BATCH_SIZE: + .chunks(RECOVERY_BATCH_SIZE) + .into_iter() + .map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| { + acc.append(data); + acc + })) + // Then create Packet objects from the data: .enumerate() .map(|(i, data)| { let link = Link::new(*local_addr, *local_addr);