Skip to content

Commit

Permalink
Produce even better DOT graphs
Browse files Browse the repository at this point in the history
Output DOT graphs without using petgraph's stock formatter so that we
can output each node's configuration using a dense, human-readable,
relational-algebra-like format.
  • Loading branch information
benesch committed Nov 11, 2016
1 parent f2c8b42 commit 5a3c950
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 2 deletions.
36 changes: 35 additions & 1 deletion src/flow/mod.rs
Expand Up @@ -2,7 +2,7 @@ use petgraph;
use clocked_dispatch;

use std::fmt;
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::sync::mpsc;
use std::sync;
use std::thread;
Expand Down Expand Up @@ -953,6 +953,40 @@ impl<Q, U, D> FlowGraph<Q, U, D>
}
}

impl<Q, U, D> Display for FlowGraph<Q, U, D>
where Q: 'static + Clone + Send + Sync,
U: 'static + Clone + Send,
D: 'static + Clone + Send
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use ops::NodeOp;
let indent = " ";
try!(writeln!(f, "digraph {{"));
try!(writeln!(f, "{}node [shape=record, fontsize=10]", indent));
for index in self.graph.node_indices() {
try!(write!(f, "{}{}", indent, index.index()));
try!(write!(f, " [label=\""));
match self.graph[index].as_ref() {
None => try!(write!(f, "(source)")),
Some(n) => {
try!(write!(f, "{{ {{ {} / {} | {} }} | {} }}",
index.index(), n.name(), n.operator().unwrap().description(),
n.args().join(", ")));
}
}
try!(writeln!(f, "\"]"));
}
for (_, edge) in self.graph.raw_edges().iter().enumerate() {
try!(writeln!(f, "{}{} -> {}",
indent,
edge.source().index(),
edge.target().index()));
}
try!(write!(f, "}}"));
Ok(())
}
}

impl<Q, U, D> Debug for FlowGraph<Q, U, D>
where Q: Clone + Debug + Send + Sync,
U: Clone + Send,
Expand Down
4 changes: 4 additions & 0 deletions src/ops/base.rs
Expand Up @@ -65,4 +65,8 @@ impl NodeOp for Base {
fn is_base(&self) -> bool {
true
}

fn description(&self) -> String {
"𝓕".into()
}
}
4 changes: 4 additions & 0 deletions src/ops/gatedid.rs
Expand Up @@ -73,6 +73,10 @@ impl NodeOp for GatedIdentity {
fn resolve(&self, col: usize) -> Option<Vec<(flow::NodeIndex, usize)>> {
Some(vec![(self.src, col)])
}

fn description(&self) -> String {
"GatedIdentity".into()
}
}

#[cfg(test)]
Expand Down
23 changes: 23 additions & 0 deletions src/ops/grouped/aggregate.rs
Expand Up @@ -98,6 +98,16 @@ impl GroupedOperation for Aggregator {
unreachable!();
}
}

fn description(&self) -> String {
let op_string = match self.op {
Aggregation::COUNT => "Count".into(),
Aggregation::SUM => format!("Sum({})", self.over),
};
let group_cols = self.group.iter().map(|g| g.to_string())
.collect::<Vec<_>>().join(", ");
format!("{} γ[{}]", op_string, group_cols)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -144,6 +154,19 @@ mod tests {
}
}

#[test]
fn it_describes() {
let s = 0.into();

let c = ops::new("count", &["x", "z", "ys"], true,
Aggregation::COUNT.over(s, 1, &[0, 2]));
assert_eq!(c.inner.description(), "Count γ[0, 2]");

let c = ops::new("sum", &["x", "z", "ys"], true,
Aggregation::SUM.over(s, 1, &[2, 0]));
assert_eq!(c.inner.description(), "Sum(1) γ[2, 0]");
}

#[test]
fn it_forwards() {
let src = flow::NodeIndex::new(0);
Expand Down
26 changes: 26 additions & 0 deletions src/ops/grouped/concat.rs
Expand Up @@ -188,6 +188,26 @@ impl GroupedOperation for GroupConcat {
new.truncate(real_len);
new.into()
}

fn description(&self) -> String {
let fields = self.components.iter()
.map(|c| {
match *c {
TextComponent::Literal(s) => format!("\"{}\"", s),
TextComponent::Column(i) => i.to_string(),
}
})
.collect::<Vec<_>>()
.join(", ");

// Sort group by columns for consistent output.
let mut group_cols = self.group.clone();
group_cols.sort();
let group_cols = group_cols.iter().map(|g| g.to_string())
.collect::<Vec<_>>().join(", ");

format!("Concat({} / \"{}\") γ[{}]", fields, self.separator, group_cols)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -234,6 +254,12 @@ mod tests {
}
}

#[test]
fn it_describes() {
let c = setup(true, true);
assert_eq!(c.inner.description(), "Concat(\".\", 1, \";\" / \"#\") γ[0, 2]");
}

#[test]
fn it_forwards() {
let src = flow::NodeIndex::new(0);
Expand Down
6 changes: 6 additions & 0 deletions src/ops/grouped/mod.rs
Expand Up @@ -63,6 +63,8 @@ pub trait GroupedOperation: fmt::Debug {
/// Given the given `current` value, and a number of changes for a group (`diffs`), compute the
/// updated group value.
fn apply(&self, current: &query::DataType, diffs: Vec<(Self::Diff, i64)>) -> query::DataType;

fn description(&self) -> String;
}

#[derive(Debug)]
Expand Down Expand Up @@ -369,4 +371,8 @@ impl<T: GroupedOperation> NodeOp for GroupedOperator<T> {
}
Some(vec![(self.src, self.colfix[col])])
}

fn description(&self) -> String {
self.inner.description()
}
}
4 changes: 4 additions & 0 deletions src/ops/identity.rs
Expand Up @@ -59,6 +59,10 @@ impl NodeOp for Identity {
fn resolve(&self, col: usize) -> Option<Vec<(flow::NodeIndex, usize)>> {
Some(vec![(self.src, col)])
}

fn description(&self) -> String {
"≡".into()
}
}

#[cfg(test)]
Expand Down
32 changes: 32 additions & 0 deletions src/ops/join.rs
Expand Up @@ -439,6 +439,25 @@ impl NodeOp for Joiner {
fn resolve(&self, col: usize) -> Option<Vec<(flow::NodeIndex, usize)>> {
Some(vec![self.emit[col].clone()])
}

fn description(&self) -> String {
self.join
.iter()
.flat_map(|(left, rs)| {
rs.against
.iter()
.filter(move |&(right, _)| left < right)
.flat_map(move |(right, rs)| {
let op = if rs.outer { "⋉" } else { "⋈" };
rs.fields.iter().map(move |&(li, ri)| {
format!("{}:{} {} {}:{}",
left.index(), li, op, right.index(), ri)
})
})
})
.collect::<Vec<_>>()
.join(", ")
}
}

#[cfg(test)]
Expand Down Expand Up @@ -486,6 +505,19 @@ mod tests {
(ops::new("join", &["j0", "j1", "j2"], false, c), l, r)
}


#[test]
fn it_describes() {
let (j, _, _) = setup(false);
assert_eq!(j.inner.description(), "0:0 ⋈ 1:0");
}

#[test]
fn it_describes_left() {
let (j, _, _) = setup(true);
assert_eq!(j.inner.description(), "0:0 ⋉ 1:0");
}

fn forward_non_weird(j: ops::Node, l: flow::NodeIndex, r: flow::NodeIndex) {
// these are the data items we have to work with
// these are in left
Expand Down
12 changes: 12 additions & 0 deletions src/ops/latest.rs
Expand Up @@ -258,6 +258,12 @@ impl NodeOp for Latest {
fn resolve(&self, col: usize) -> Option<Vec<(flow::NodeIndex, usize)>> {
Some(vec![(self.src, col)])
}

fn description(&self) -> String {
let key_cols = self.key.iter().map(|k| k.to_string())
.collect::<Vec<_>>().join(", ");
format!("Latest γ[{}]", key_cols)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -308,6 +314,12 @@ mod tests {
}
}

#[test]
fn it_describes() {
let c = setup(vec![0, 2], false);
assert_eq!(c.inner.description(), "Latest γ[2, 0]");
}

#[test]
fn it_forwards() {
let src = flow::NodeIndex::new(0);
Expand Down
24 changes: 23 additions & 1 deletion src/ops/mod.rs
Expand Up @@ -150,6 +150,8 @@ pub trait NodeOp: Debug {
fn is_base(&self) -> bool {
false
}

fn description(&self) -> String;
}

/// The set of node types supported by distributary.
Expand Down Expand Up @@ -270,6 +272,22 @@ impl NodeOp for NodeType {
false
}
}

fn description(&self) -> String {
match *self {
NodeType::Base(ref n) => n.description(),
NodeType::Aggregate(ref n) => n.description(),
NodeType::Join(ref n) => n.description(),
NodeType::Latest(ref n) => n.description(),
NodeType::Union(ref n) => n.description(),
NodeType::Identity(ref n) => n.description(),
NodeType::GroupConcat(ref n) => n.description(),
#[cfg(test)]
NodeType::Test(ref n) => n.description(),
#[cfg(test)]
NodeType::GatedIdentity(ref n) => n.description(),
}
}
}

impl Debug for NodeType {
Expand Down Expand Up @@ -315,7 +333,7 @@ impl Node {

impl Debug for Node {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.name)
write!(f, "{:?}({:#?})", self.name, *self.inner)
}
}

Expand Down Expand Up @@ -568,6 +586,10 @@ mod tests {
fn resolve(&self, _: usize) -> Option<Vec<(flow::NodeIndex, usize)>> {
None
}

fn description(&self) -> String {
"Tester".into()
}
}
fn e2e_test(mat: bool) {
use std::collections::HashSet;
Expand Down
20 changes: 20 additions & 0 deletions src/ops/union.rs
Expand Up @@ -176,6 +176,20 @@ impl NodeOp for Union {
fn resolve(&self, col: usize) -> Option<Vec<(flow::NodeIndex, usize)>> {
Some(self.emit.iter().map(|(src, emit)| (*src, emit[col])).collect())
}

fn description(&self) -> String {
// Ensure we get a consistent output by sorting.
let mut emit = self.emit.iter().collect::<Vec<_>>();
emit.sort();
emit.iter()
.map(|&(src, emit)| {
let cols = emit.iter().map(|e| e.to_string())
.collect::<Vec<_>>().join(", ");
format!("{}:[{}]", src.index(), cols)
})
.collect::<Vec<_>>()
.join(" ∪ ")
}
}

#[cfg(test)]
Expand Down Expand Up @@ -223,6 +237,12 @@ mod tests {
(ops::new("union", &["u0", "u1"], false, c), l, r)
}

#[test]
fn it_describes() {
let (u, _, _) = setup();
assert_eq!(u.inner.description(), "0:[0, 1] ∪ 1:[0, 2]");
}

#[test]
fn it_works() {
let (u, l, r) = setup();
Expand Down

0 comments on commit 5a3c950

Please sign in to comment.