diff --git a/dataflow/src/domain/mod.rs b/dataflow/src/domain/mod.rs index e59f2abbb..dcf176d0f 100644 --- a/dataflow/src/domain/mod.rs +++ b/dataflow/src/domain/mod.rs @@ -1546,16 +1546,17 @@ impl Domain { let entries: Result, _> = serde_json::from_str(&line); entries.ok() }) + // Parsing each individual line gives us an iterator over Vec. + // We're interested in chunking each record, so let's flat_map twice: + // Iter> -> Iter -> Iter .flat_map(|r| r) - // Merge packets into batches of RECOVERY_BATCH_SIZE: + .flat_map(|r| r) + // Merge individual records into batches of RECOVERY_BATCH_SIZE: .chunks(RECOVERY_BATCH_SIZE) .into_iter() - .map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| { - acc.append(data); - acc - })) // Then create Packet objects from the data: - .map(|data| { + .map(|chunk| { + let data: Records = chunk.collect(); let link = Link::new(local_addr, local_addr); if is_transactional { let (ts, prevs) = checktable.recover(global_addr).unwrap();