-
Notifications
You must be signed in to change notification settings - Fork 241
/
keys.rs
155 lines (138 loc) · 5.16 KB
/
keys.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use flow::prelude::*;
use petgraph;
use petgraph::graph::NodeIndex;
use std::collections::HashMap;
pub fn provenance_of<F>(
graph: &Graph,
node: NodeIndex,
column: usize,
mut on_join: F,
) -> Vec<Vec<(NodeIndex, Option<usize>)>>
where
F: FnMut(NodeIndex, Option<usize>, &[NodeIndex]) -> Option<NodeIndex>,
{
let path = vec![(node, Some(column))];
trace(graph, &mut on_join, path)
}
fn trace<F>(
graph: &Graph,
on_join: &mut F,
mut path: Vec<(NodeIndex, Option<usize>)>,
) -> Vec<Vec<(NodeIndex, Option<usize>)>>
where
F: FnMut(NodeIndex, Option<usize>, &[NodeIndex]) -> Option<NodeIndex>,
{
// figure out what node/column we're looking up
let (node, column) = path.last().cloned().unwrap();
let parents: Vec<_> = graph
.neighbors_directed(node, petgraph::EdgeDirection::Incoming)
.collect();
if parents.is_empty() {
// this path reached the source node.
// but we should have stopped at base nodes above...
unreachable!();
}
let n = &graph[node];
// have we reached a base node?
if n.is_internal() && n.get_base().is_some() {
return vec![path];
}
// if the column isn't known, our job is trivial -- just map to all ancestors
if column.is_none() {
// except if we're a join and on_join says to only walk through one...
if n.is_internal() && n.is_join() {
if let Some(parent) = on_join(node, column, &parents[..]) {
path.push((parent, None));
return trace(graph, on_join, path);
}
}
let mut paths = Vec::with_capacity(parents.len());
for p in parents {
let mut path = path.clone();
path.push((p, None));
paths.extend(trace(graph, on_join, path));
}
return paths;
}
let column = column.unwrap();
// we know all non-internal nodes use an identity mapping
if !n.is_internal() {
let parent = parents.into_iter().next().unwrap();
path.push((parent, Some(column)));
return trace(graph, on_join, path);
}
// try to resolve the currently selected column
let resolved = n.parent_columns(column);
assert!(!resolved.is_empty());
// is it a generated column?
if resolved.len() == 1 && resolved[0].0 == node {
// how could this be Some?
// path terminates here, and has no connection to ancestors
// so, we depend on *all* our *full* parents
assert!(resolved[0].1.is_none());
let mut paths = Vec::with_capacity(parents.len());
for p in parents {
let mut path = path.clone();
path.push((p, None));
paths.extend(trace(graph, on_join, path));
}
return paths;
}
// no, it resolves to at least one parent column
// if there is only one parent, we can step right to that
if parents.len() == 1 {
let parent = parents.into_iter().next().unwrap();
let resolved = resolved.into_iter().next().unwrap();
assert_eq!(resolved.0, parent);
path.push((parent, resolved.1));
return trace(graph, on_join, path);
}
// there are multiple parents.
// this means we are either a union or a join.
// let's deal with the union case first.
// in unions, all keys resolve to more than one parent.
if !n.is_join() {
// all columns come from all parents
assert_eq!(parents.len(), resolved.len());
// traverse up all the paths
let mut paths = Vec::with_capacity(parents.len());
for (parent, column) in resolved {
let mut path = path.clone();
// we know that the parent is in the same domain for unions, so [] is ok
path.push((parent, column));
paths.extend(trace(graph, on_join, path));
}
return paths;
}
let mut resolved: HashMap<_, _> = resolved
.into_iter()
.map(|(p, col)| {
// we know joins don't generate values.
(p, col.unwrap())
})
.collect();
// okay, so this is a join. it's up to the on_join function to tell us whether to walk up *all*
// the parents of the join, or just one of them. let's ask.
// TODO: provide an early-termination mechanism?
match on_join(node, Some(column), &parents[..]) {
None => {
// our caller wants information about all our parents.
// since the column we're chasing only follows a single path through a join (unless it
// is a join key, which we don't yet handle), we need to produce (_, None) for all the
// others.
let mut paths = Vec::with_capacity(parents.len());
for parent in parents {
let mut path = path.clone();
path.push((parent, resolved.get(&parent).cloned()));
paths.extend(trace(graph, on_join, path));
}
paths
}
Some(parent) => {
// our caller only cares about *one* parent.
// hopefully we can give key information about that parent
path.push((parent, resolved.remove(&parent)));
trace(graph, on_join, path)
}
}
}