-
Notifications
You must be signed in to change notification settings - Fork 20
/
dimension_choices_query_builder.py
181 lines (147 loc) · 6.22 KB
/
dimension_choices_query_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import pandas as pd
from pypika import Order
from fireant.utils import alias_selector
from .query_builder import (
QueryBuilder,
add_hints,
get_column_names,
)
from ..execution import fetch_data
from ..field_helper import make_term_for_field
from ..finders import find_joins_for_tables
from ..sql_transformer import make_slicer_query
from ...formats import display_value
class DimensionChoicesQueryBuilder(QueryBuilder):
"""
This builder is used for building dataset queries for fetching the choices for a dimension given a set of filters.
"""
def __init__(self, dataset, dimension):
super(DimensionChoicesQueryBuilder, self).__init__(dataset, dataset.table)
self.hint_table = getattr(dimension, "hint_table", None)
self._dimensions.append(dimension)
# TODO remove after 3.0.0
display_alias = dimension.alias + "_display"
if display_alias in dataset.fields:
self._dimensions.append(dataset.fields[display_alias])
def _extract_hint_filters(self):
"""
Extracts filters that can be applied when using the hint table.
:return:
A list of filters.
"""
base_table = self.dataset.table
hint_column_names = get_column_names(self.dataset.database, self.hint_table)
filters = []
for filter_ in self._filters:
base_fields = [
field
for field in filter_.definition.fields_()
if all(table == base_table for table in field.tables_)
]
join_tables = [
table
for field in filter_.definition.fields_()
for table in field.tables_
if table != base_table
]
required_joins = find_joins_for_tables(
self.dataset.joins, self.dataset.table, join_tables
)
base_fields.extend(
[
field
for join in required_joins
for field in join.criterion.fields_()
if all(table == base_table for table in field.tables_)
]
)
if all(field.name in hint_column_names for field in base_fields):
filters.append(filter_)
return filters
def _make_terms_for_hint_dimensions(self):
"""
Makes a list pypika terms using the hint table instead of their original table.
:return:
A list of pypika terms.
"""
dimension_terms = []
for dimension in self._dimensions:
dimension_term = make_term_for_field(
dimension, self.dataset.database.trunc_date
)
dimension_term = dimension_term.replace_table(
dimension_term.table, self.hint_table
)
dimension_terms.append(dimension_term)
return dimension_terms
@property
def sql(self):
"""
Serializes this query builder as a set of SQL queries. This method will always return a list of one query since
only one query is required to retrieve dimension choices.
The dataset query extends this with metrics, references, and totals.
"""
dimensions = [] if self.hint_table else self._dimensions
filters = self._extract_hint_filters() if self.hint_table else self._filters
query = (
make_slicer_query(
database=self.dataset.database,
base_table=self.dataset.table,
joins=self.dataset.joins,
dimensions=dimensions,
filters=filters,
)
.limit(self._limit)
.offset(self._offset)
)
if self.hint_table:
hint_dimension_terms = self._make_terms_for_hint_dimensions()
query = query.select(*hint_dimension_terms).groupby(*hint_dimension_terms)
query = query.replace_table(self.dataset.table, self.hint_table)
return [query]
def fetch(self, hint=None, force_include=()) -> pd.Series:
"""
Fetch the data for this query and transform it into the widgets.
:param hint:
For database vendors that support it, add a query hint to collect analytics on the queries triggered by
fireant.
:param force_include:
A list of dimension values to include in the result set. This can be used to avoid having necessary results
cut off due to the pagination. These results will be returned at the head of the results.
:return:
A list of dict (JSON) objects containing the widget configurations.
"""
query = add_hints(self.sql, hint)[0]
dimension = self._dimensions[0]
alias_definition = dimension.definition.as_(alias_selector(dimension.alias))
dimension_definition = dimension.definition
if self.hint_table:
alias_definition = alias_definition.replace_table(
alias_definition.table, self.hint_table
)
dimension_definition = dimension.definition.replace_table(
dimension_definition.table, self.hint_table
)
if force_include:
include = self.dataset.database.to_char(dimension_definition).isin(
[str(x) for x in force_include]
)
# Ensure that these values are included
query = query.orderby(include, order=Order.desc)
# Order by the dimension definition that the choices are for
query = query.orderby(alias_definition)
data = fetch_data(self.dataset.database, [query], self._dimensions)
if len(data.index.names) > 1:
display_alias = data.index.names[1]
data.reset_index(display_alias, inplace=True)
choices = data[display_alias]
else:
data["display"] = data.index.tolist()
choices = data["display"]
dimension_display = self._dimensions[-1]
return choices.map(lambda raw: display_value(raw, dimension_display) or raw)
def __repr__(self):
return ".".join(
["dataset", self._dimensions[0].alias, "choices"]
+ ["filter({})".format(repr(f)) for f in self._filters]
)