Skip to content

Commit

Permalink
Merge recovery packets into batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ekmartin committed Oct 25, 2017
1 parent a4dcad4 commit 57cd580
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/flow/persistence/mod.rs
Expand Up @@ -3,6 +3,7 @@ use buf_redux::BufWriter;
use buf_redux::strategy::WhenFull;

use serde_json;
use itertools::Itertools;

use std::fs;
use std::fs::{File, OpenOptions};
Expand Down Expand Up @@ -108,6 +109,8 @@ impl Parameters {
}
}

const RECOVERY_BATCH_SIZE: usize = 512;

/// Retrieves a vector of packets from the persistent log.
pub fn retrieve_recovery_packets(
nodes: &DomainNodes,
Expand Down Expand Up @@ -136,6 +139,14 @@ pub fn retrieve_recovery_packets(
entries.ok()
})
.flat_map(|r| r)
// Merge packets into batches of RECOVERY_BATCH_SIZE:
.chunks(RECOVERY_BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| {
acc.append(data);
acc
}))
// Then create Packet objects from the data:
.enumerate()
.map(|(i, data)| {
let link = Link::new(*local_addr, *local_addr);
Expand Down

0 comments on commit 57cd580

Please sign in to comment.