-
Notifications
You must be signed in to change notification settings - Fork 20
/
dataset_query_builder.py
196 lines (171 loc) · 6.29 KB
/
dataset_query_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from typing import (
Dict,
Iterable,
)
from fireant.dataset.totals import scrub_totals_from_share_results
from fireant.reference_helpers import reference_alias
from fireant.utils import (
alias_selector,
immutable,
)
from .query_builder import (
QueryBuilder,
QueryException,
ReferenceQueryBuilderMixin,
WidgetQueryBuilderMixin,
add_hints,
)
from .. import special_cases
from ..execution import fetch_data
from ..finders import (
find_and_group_references_for_dimensions,
find_metrics_for_widgets,
find_operations_for_widgets,
find_share_dimensions,
)
from ..pagination import paginate
from ..sql_transformer import make_slicer_query_with_totals_and_references
class DataSetQueryBuilder(
ReferenceQueryBuilderMixin, WidgetQueryBuilderMixin, QueryBuilder
):
"""
Data Set queries consist of widgets, dimensions, filters, orders by and references. At least one or more widgets
is required. All others are optional.
"""
def __init__(self, dataset):
super(DataSetQueryBuilder, self).__init__(dataset, dataset.table)
self._totals_dimensions = set()
self._apply_filter_to_totals = []
def __call__(self, *args, **kwargs):
return self
@immutable
def filter(self, *filters, apply_to_totals=True):
"""
Add one or more filters when building a dataset query.
:param filters:
Filters to add to the query
:param apply_to_totals:
Whether filters should apply to totals or not
:return:
A copy of the query with the filters added.
"""
self._filters += [f for f in filters]
self._apply_filter_to_totals += [apply_to_totals] * len(filters)
@property
def reference_groups(self):
return list(
find_and_group_references_for_dimensions(
self._dimensions, self._references
).values()
)
@property
def sql(self):
"""
Serialize this query builder to a list of Pypika/SQL queries. This function will return one query for every
combination of reference and rolled up dimension (including null options).
This collects all of the metrics in each widget, dimensions, and filters and builds a corresponding pypika query
to fetch the data. When references are used, the base query normally produced is wrapped in an outer query and
a query for each reference is joined based on the referenced dimension shifted.
:return: a list of Pypika's Query subclass instances.
"""
# First run validation for the query on all widgets
self._validate()
metrics = find_metrics_for_widgets(self._widgets)
operations = find_operations_for_widgets(self._widgets)
share_dimensions = find_share_dimensions(self._dimensions, operations)
return make_slicer_query_with_totals_and_references(
self.dataset.database,
self.table,
self.dataset.joins,
self._dimensions,
metrics,
operations,
self._filters,
self._references,
self.orders,
share_dimensions=share_dimensions,
)
def fetch(self, hint=None) -> Iterable[Dict]:
"""
Fetch the data for this query and transform it into the widgets.
:param hint:
A query hint label used with database vendors which support it. Adds a label comment to the query.
:return:
A list of dict (JSON) objects containing the widget configurations.
"""
queries = add_hints(self.sql, hint)
operations = find_operations_for_widgets(self._widgets)
share_dimensions = find_share_dimensions(self._dimensions, operations)
data_frame = fetch_data(
self.dataset.database,
queries,
self._dimensions,
share_dimensions,
self.reference_groups,
)
# Apply operations
for operation in operations:
for reference in [None] + self._references:
df_key = alias_selector(reference_alias(operation, reference))
data_frame[df_key] = operation.apply(data_frame, reference)
data_frame = scrub_totals_from_share_results(data_frame, self._dimensions)
data_frame = special_cases.apply_operations_to_data_frame(
operations, data_frame
)
data_frame = paginate(
data_frame,
self._widgets,
orders=self.orders,
limit=self._limit,
offset=self._offset,
)
# Apply transformations
return [
widget.transform(
data_frame, self.dataset, self._dimensions, self._references
)
for widget in self._widgets
]
def plot(self):
try:
from IPython.display import display
except ImportError:
raise QueryException(
"Optional dependency ipython missing. Please install fireant[ipython] to use plot."
)
widgets = self.fetch()
for widget in reversed(widgets):
display(widget)
def __str__(self):
return str(self.sql)
def __repr__(self):
return ".".join(
[
"dataset",
"query",
*["widget({})".format(repr(widget)) for widget in self._widgets],
*[
"dimension({})".format(repr(dimension))
for dimension in self._dimensions
],
*[
"filter({}{})".format(
repr(f),
", apply_filter_to_totals=True"
if apply_filter_to_totals
else "",
)
for f, apply_filter_to_totals in zip(
self._filters, self._apply_filter_to_totals
)
],
*[
"reference({})".format(repr(reference))
for reference in self._references
],
*[
"orderby({}, {})".format(definition.alias, orientation)
for (definition, orientation) in self.orders
],
],
)