Skip to content

Commit

Permalink
Properly chunk based on individual records
Browse files Browse the repository at this point in the history
  • Loading branch information
ekmartin committed Nov 8, 2017
1 parent 1a62618 commit 8c0d498
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions dataflow/src/domain/mod.rs
Expand Up @@ -1546,16 +1546,17 @@ impl Domain {
let entries: Result<Vec<Records>, _> = serde_json::from_str(&line);
entries.ok()
})
// Parsing each individual line gives us an iterator over Vec<Records>.
// We're interested in chunking each record, so let's flat_map twice:
// Iter<Vec<Records>> -> Iter<Records> -> Iter<Record>
.flat_map(|r| r)
// Merge packets into batches of RECOVERY_BATCH_SIZE:
.flat_map(|r| r)
// Merge individual records into batches of RECOVERY_BATCH_SIZE:
.chunks(RECOVERY_BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| {
acc.append(data);
acc
}))
// Then create Packet objects from the data:
.map(|data| {
.map(|chunk| {
let data: Records = chunk.collect();
let link = Link::new(local_addr, local_addr);
if is_transactional {
let (ts, prevs) = checktable.recover(global_addr).unwrap();
Expand Down

0 comments on commit 8c0d498

Please sign in to comment.