-
Notifications
You must be signed in to change notification settings - Fork 109
/
join.rs
207 lines (181 loc) · 6.37 KB
/
join.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
use std::collections::{HashMap, HashSet};
use mir::NodeIndex;
use nom_sql::Relation;
use readyset_errors::{internal_err, invariant, unsupported, ReadySetResult};
use super::JoinKind;
use crate::controller::sql::mir::SqlToMirConverter;
use crate::controller::sql::query_graph::{QueryGraph, QueryGraphEdge};
struct JoinChain {
tables: HashSet<Relation>,
last_node: NodeIndex,
}
impl JoinChain {
pub(super) fn merge_chain(self, other: JoinChain, last_node: NodeIndex) -> JoinChain {
let tables = self.tables.union(&other.tables).cloned().collect();
JoinChain { tables, last_node }
}
pub(super) fn has_table(&self, table: &Relation) -> bool {
self.tables.contains(table)
}
}
// Generate join nodes for the query.
// This is done by creating/merging join chains as each predicate is added.
// If a predicate's parent tables appear in a previous predicate, the
// current predicate is added to the on-going join chain of the previous
// predicate.
// If a predicate's parent tables haven't been used by any previous predicate,
// a new join chain is started for the current predicate. And we assume that
// a future predicate will bring these chains together.
pub(super) fn make_joins(
mir_converter: &mut SqlToMirConverter,
query_name: &Relation,
name: Relation,
qg: &QueryGraph,
node_for_rel: &HashMap<&Relation, NodeIndex>,
correlated_nodes: &HashSet<NodeIndex>,
) -> ReadySetResult<Vec<NodeIndex>> {
let mut join_nodes: Vec<NodeIndex> = Vec::new();
let mut join_chains = Vec::new();
for jref in qg.join_order.iter() {
let (mut join_kind, jps) = match &qg.edges[&(jref.src.clone(), jref.dst.clone())] {
QueryGraphEdge::Join { on } => (JoinKind::Inner, on),
QueryGraphEdge::LeftJoin {
on,
left_local_preds,
right_local_preds,
global_preds,
params,
} => {
if !(left_local_preds.is_empty()
&& right_local_preds.is_empty()
&& global_preds.is_empty()
&& params.is_empty())
{
unsupported!("Non equal-join predicates not (yet) supported in left joins");
}
(JoinKind::Left, on)
}
};
let (left_chain, right_chain) =
pick_join_chains(&jref.src, &jref.dst, &mut join_chains, node_for_rel)?;
if correlated_nodes.contains(&right_chain.last_node) {
match join_kind {
JoinKind::Left => {
join_kind = JoinKind::DependentLeft;
}
JoinKind::Inner => {
join_kind = JoinKind::DependentInner;
}
JoinKind::DependentInner | JoinKind::DependentLeft => {}
}
}
let jn = mir_converter.make_join_node(
query_name,
mir_converter.generate_label(&name),
jps,
left_chain.last_node,
right_chain.last_node,
join_kind,
)?;
// merge node chains
let new_chain = left_chain.merge_chain(right_chain, jn);
join_chains.push(new_chain);
join_nodes.push(jn);
}
Ok(join_nodes)
}
/// Make cartesian (cross) joins for the given list of nodes, returning a list of join nodes created
/// in order
///
/// Will return an error if passed an empty list of `nodes`.
///
/// Will never return an empty list.
pub(super) fn make_cross_joins(
mir_converter: &mut SqlToMirConverter,
query_name: &Relation,
name: &str,
nodes: Vec<NodeIndex>,
correlated_nodes: &HashSet<NodeIndex>,
) -> ReadySetResult<Vec<NodeIndex>> {
let mut join_nodes = vec![];
let mut nodes = nodes.into_iter();
let first_node = nodes
.next()
.ok_or_else(|| internal_err!("make_cross_joins called with empty nodes"))?;
nodes.try_fold(first_node, |n1, n2| -> ReadySetResult<_> {
let join_kind = if correlated_nodes.contains(&n2) {
JoinKind::DependentInner
} else {
JoinKind::Inner
};
let node = mir_converter.make_join_node(
query_name,
mir_converter.generate_label(&name.into()),
&[],
n1,
n2,
join_kind,
)?;
join_nodes.push(node);
Ok(node)
})?;
Ok(join_nodes)
}
// Generate join nodes for the query aggregates. This will call
// `mir_converter.make_join_aggregates_node` only once if there are only two parents, and otherwise
// create multiple nodes of type `MirNodeInner::JoinAggregates`.
pub(super) fn make_joins_for_aggregates(
mir_converter: &mut SqlToMirConverter,
query_name: &Relation,
name: &str,
ancestors: &[NodeIndex],
) -> ReadySetResult<Vec<NodeIndex>> {
invariant!(ancestors.len() >= 2);
let parent_join = mir_converter.make_join_aggregates_node(
query_name,
mir_converter.generate_label(&name.into()),
ancestors[0],
ancestors[1],
)?;
let mut join_nodes = vec![parent_join];
// We skip the first two because those were used for the initial parent join.
for ancestor in ancestors.iter().skip(2) {
// We want top join our most recent join node to our next ancestor.
let jn = mir_converter.make_join_aggregates_node(
query_name,
mir_converter.generate_label(&name.into()),
*join_nodes.last().unwrap(),
*ancestor,
)?;
join_nodes.push(jn);
}
Ok(join_nodes)
}
fn pick_join_chains(
src: &Relation,
dst: &Relation,
join_chains: &mut Vec<JoinChain>,
node_for_rel: &HashMap<&Relation, NodeIndex>,
) -> ReadySetResult<(JoinChain, JoinChain)> {
let left_chain = match join_chains
.iter()
.position(|chain| chain.has_table(&src.clone()))
{
Some(idx) => join_chains.swap_remove(idx),
None => JoinChain {
tables: std::iter::once(src.clone()).collect(),
last_node: node_for_rel[src],
},
};
let right_chain = match join_chains
.iter()
.position(|chain| chain.has_table(&dst.clone()))
{
Some(idx) => join_chains.swap_remove(idx),
None => JoinChain {
tables: std::iter::once(dst.clone()).collect(),
last_node: node_for_rel[dst],
},
};
Ok((left_chain, right_chain))
}