-
Notifications
You must be signed in to change notification settings - Fork 20
/
finders.py
248 lines (196 loc) · 7.52 KB
/
finders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from collections import (
defaultdict,
namedtuple,
)
from toposort import (
CircularDependencyError,
toposort_flatten,
)
from fireant.dataset.intervals import (
DATETIME_INTERVALS,
DatetimeInterval,
)
from fireant.dataset.modifiers import (
OmitFromRollup,
Rollup,
)
from fireant.dataset.operations import Share
from fireant.exceptions import DataSetException
from fireant.utils import (
groupby,
ordered_distinct_list,
ordered_distinct_list_by_attr,
)
class MissingTableJoinException(DataSetException):
pass
class CircularJoinsException(DataSetException):
pass
ReferenceGroup = namedtuple("ReferenceGroup", ("dimension", "time_unit", "intervals"))
def find_required_tables_to_join(elements, base_table):
"""
Collect all the tables required for a given list of dataset elements. This looks through the definition and
display_definition attributes of all elements and
This looks through the metrics, dimensions, and filter included in this dataset query. It also checks both the
definition
field of each element as well as the display definition for Unique Dimensions.
:return:
A collection of tables required to execute a query,
"""
return ordered_distinct_list(
[
table
for element in elements
# Need extra for-loop to incl. the `display_definition` from `UniqueDimension`
for attr in [getattr(element, "definition", None)]
# ... but then filter Nones since most elements do not have `display_definition`
if attr is not None
for table in attr.tables_
# Omit the base table from this list
if base_table != table
]
)
def find_joins_for_tables(joins, base_table, required_tables):
"""
Given a set of tables required for a dataset query, this function finds the joins required for the query and
sorts them topologically.
:return:
A list of joins in the order that they must be joined to the query.
:raises:
MissingTableJoinException - If a table is required but there is no join for that table
CircularJoinsException - If there is a circular dependency between two or more joins
"""
dependencies = defaultdict(set)
slicer_joins = {join.table: join for join in joins}
while required_tables:
table = required_tables.pop()
if table not in slicer_joins:
raise MissingTableJoinException(
"Could not find a join for table {}".format(str(table))
)
join = slicer_joins[table]
tables_required_for_join = set(join.criterion.tables_) - {
base_table,
join.table,
}
dependencies[join] |= {
slicer_joins[table] for table in tables_required_for_join
}
required_tables += tables_required_for_join - {d.table for d in dependencies}
try:
return toposort_flatten(dependencies, sort=True)
except CircularDependencyError as e:
raise CircularJoinsException(str(e))
def find_metrics_for_widgets(widgets):
"""
:return:
an ordered, distinct list of metrics used in all widgets as part of this query.
"""
return ordered_distinct_list_by_attr(
[metric for widget in widgets for metric in widget.metrics]
)
def find_operations_for_widgets(widgets):
"""
:return:
an ordered, distinct list of metrics used in all widgets as part of this query.
"""
return ordered_distinct_list_by_attr(
[operation for widget in widgets for operation in widget.operations]
)
def find_dataset_metrics(metrics):
"""
Given a list of metrics used in widgets from a dataset blender query, this function returns a list of metrics that
are from a dataset. Concretely, this means that if a dataset blender has a metric built on dataset metrics, then
this will replace that metric with the metrics from the dataset.
"""
from fireant.dataset.fields import Field
return [
field or metric
for metric in metrics
for field in metric.definition.find_(Field)
]
def find_share_dimensions(dimensions, operations):
"""
Returns a subset list of dimensions from the list of dimensions that are used as the over-dimension in share
operations.
:param dimensions:
:param operations:
:return:
"""
share_operations_over_dimensions = [
operation.over
for operation in operations
if isinstance(operation, Share) and operation.over is not None
]
dimension_map = {dimension.alias: dimension for dimension in dimensions}
return [
dimension_map[dimension.alias] for dimension in share_operations_over_dimensions
]
def find_totals_dimensions(dimensions, share_dimensions):
"""
:param dimensions:
:param share_dimensions:
:return:
an list of all dimension field in the list argument `dimensions` which have the `Rollup` modifier applied to
them or are used as a basis for a share metric.
"""
share_dimension_aliases = {d.alias for d in share_dimensions}
return [
dimension
for dimension in dimensions
if isinstance(dimension, Rollup) or dimension.alias in share_dimension_aliases
]
def find_filters_for_totals(filters):
"""
:param filters:
:return:
a list of filters that should be applied to totals queries. This removes any filters from the list that have the
`OmitFromRollup` modifier applied to them.
"""
return [fltr for fltr in filters if not isinstance(fltr, OmitFromRollup)]
def find_field_in_modified_field(field):
"""
Returns the field from a modified field argument (or just the field argument if it is not modified).
"""
root = field
while hasattr(root, "dimension"):
root = root.dimension
return root
def find_fields_in_modified_fields(fields):
return [find_field_in_modified_field(field) for field in fields]
interval_weekdays = {
"month": ("week", 4),
"quarter": ("week", 4 * 3),
"year": ("week", 4 * 13),
}
def find_and_group_references_for_dimensions(dimensions, references):
"""
Finds all of the references for dimensions and groups them by dimension, interval unit, number of intervals.
This structure reflects how the references need to be joined to the dataset query. References of the same
type (WoW, WoW.delta, WoW.delta_percent) can share a join query.
:param dimensions:
:param references:
:return:
An `OrderedDict` where the keys are 3-item tuples consisting of "Dimension, interval unit, # of intervals.
.. code-block:: python
Example
{
(Dimension(date_1), 'weeks', 1): [WoW, WoW.delta],
(Dimension(date_1), 'years', 1): [YoY],
(Dimension(date_7), 'days', 1): [DoD, DoD.delta_percent],
}
"""
align_weekdays = (
dimensions
and isinstance(dimensions[0], DatetimeInterval)
and -1 < DATETIME_INTERVALS.index(dimensions[0].interval_key) < 3
)
def get_dimension_time_unit_and_interval(reference):
defaults = (reference.time_unit, 1)
time_unit, interval_muliplier = (
interval_weekdays.get(reference.time_unit, defaults)
if align_weekdays
else defaults
)
return reference.field, time_unit, interval_muliplier * reference.interval
distinct_references = ordered_distinct_list(references)
return groupby(distinct_references, get_dimension_time_unit_and_interval)