Skip to content

Commit

Permalink
Fixed bug in export-trace command
Browse files Browse the repository at this point in the history
  • Loading branch information
josephg committed May 25, 2024
1 parent 9e10d8a commit fe4b2d4
Showing 1 changed file with 75 additions and 16 deletions.
91 changes: 75 additions & 16 deletions crates/dt-cli/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::cmp::Ordering;
/// Write a second export script which outputs the data to some dt-json style format (making this a
/// non-issue). Or just add these fields in and demand people ignore them.

use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::ffi::OsString;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -260,21 +260,36 @@ fn safe_assignments_needed_for_agent(oplog: &ListOpLog, agent: AgentId) -> (RleV
let mut last_lv = vec![];
let mut map = RleVec::new();

for (seq, lv, len) in oplog.cg.agent_assignment.iter_lv_map_for_agent(agent) {
// Find the first item in last_lv which is strictly before lv.
let slot = if let Some(slot) = last_lv.iter().position(|other_lv| {
oplog.cg.graph.version_cmp(*other_lv, lv) == Some(Ordering::Less)
}) {
last_lv[slot] = lv + len - 1;
slot
} else {
let slot = last_lv.len();
last_lv.push(lv + len - 1);
slot
};
for (_seq, outer_lv, outer_len) in oplog.cg.agent_assignment.iter_lv_map_for_agent(agent) {
// There's a potential bug with just using (seq,lv,len) as-is: The chunk described from
// lv..lv+len may contain a series of individual graph entries, which each need to be
// assigned to a different agent.
//
// So we'll split up the span by iterating through the graph...
for entry in oplog.cg.iter_range((outer_lv..outer_lv + outer_len).into()) {
let lv = entry.start;
// let len = entry.len();
// let seq = entry.span.seq_range.start;
assert_eq!(entry.span.agent, agent);

let last_lv_in_span = lv + entry.len() - 1;

// Find the first item in last_lv which is strictly before lv.
let slot = if let Some(slot) = last_lv.iter().position(|other_lv| {
oplog.cg.graph.version_cmp(*other_lv, lv) == Some(Ordering::Less)
}) {
last_lv[slot] = last_lv_in_span;
slot
} else {
let slot = last_lv.len();
last_lv.push(last_lv_in_span);
slot
};

// map.push(KVPair(lv, RleRun::new(slot, len)));
map.push(KVPair(entry.span.seq_range.start, RleRun::new(slot, entry.len())));
}

// map.push(KVPair(lv, RleRun::new(slot, len)));
map.push(KVPair(seq, RleRun::new(slot, len)));
}

// dbg!(map);
Expand All @@ -284,6 +299,47 @@ fn safe_assignments_needed_for_agent(oplog: &ListOpLog, agent: AgentId) -> (RleV
}


fn check_history(num_agents: usize, txns: &[TraceExportTxn]) {
// Each entry in the history must come causally after all other entries with the same agent.
// Let's check thats actually true!

let mut last_idx_for_agent = vec![usize::MAX; num_agents];
for (i, e) in txns.iter().enumerate() {
let agent = e.agent;
let prev = last_idx_for_agent[agent];

if prev != usize::MAX {
// Check that prev comes causally before i. The first item with the same agent that
// we run into in the BFS expansion must be prev.
let mut queue = BinaryHeap::new();
for parent in e.parents.iter() {
queue.push(*parent);
}

while let Some(p_i) = queue.pop() {
let p_e = &txns[p_i];

while let Some(peek_i) = queue.peek() { // Handle graph merging.
if *peek_i != p_i { break; }
queue.pop();
}

if p_e.agent == agent {
assert_eq!(p_i, prev, "Nonlinear edits from agent {agent}: {i} should come after {prev} but instead we found {p_i} / {:?} || {:?}", e._dt_span, p_e._dt_span);
break;
}

for parent in p_e.parents.iter() {
queue.push(*parent);
}
}

}

last_idx_for_agent[agent] = i;
}
}

pub fn export_trace_to_json(oplog: &ListOpLog, timestamp_filename: Option<OsString>, shatter: bool) -> TraceExportData {
let timestamps = timestamp_filename.map(Timestamps::from_file);

Expand Down Expand Up @@ -323,7 +379,8 @@ pub fn export_trace_to_json(oplog: &ListOpLog, timestamp_filename: Option<OsStri
let mut next = 0;
for &agent in sorted_agents.iter() {
agent_map[agent as usize] = next;
next += mappings[agent as usize].1;
let slots_used = mappings[agent as usize].1;
next += slots_used;
}

let mut txns = vec![];
Expand Down Expand Up @@ -420,6 +477,8 @@ pub fn export_trace_to_json(oplog: &ListOpLog, timestamp_filename: Option<OsStri
}
}

check_history(num_agents, &txns);

let end_content = oplog.checkout_tip().into_inner().to_string();
TraceExportData {
kind: "concurrent",
Expand Down

0 comments on commit fe4b2d4

Please sign in to comment.