Skip to content

Commit

Permalink
Merge pull request #270 from kayak/dataset_blender_ordering
Browse files Browse the repository at this point in the history
More data blending related clean up
  • Loading branch information
twheys committed Jan 9, 2020
2 parents b5a3110 + db57a34 commit d68965b
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 164 deletions.
4 changes: 4 additions & 0 deletions fireant/dataset/data_blending.py
Expand Up @@ -79,6 +79,10 @@ def __deepcopy__(self, memodict={}):
def table(self):
return None

@property
def database(self):
return self.primary_dataset.database

@immutable
def extra_fields(self, *fields):
for field in fields:
Expand Down
9 changes: 2 additions & 7 deletions fireant/queries/builder/dataset_blender_query_builder.py
Expand Up @@ -126,9 +126,6 @@ def _join_criteria_for_blender_subqueries(primary, secondary, dimensions, field_
def _blender(dimensions, metrics, orders, field_maps) -> Callable:
raw_dataset_metrics = set(find_dataset_metrics(metrics))

if orders is None:
orders = [(dimension, None) for dimension in dimensions]

def _field_subquery_map(dataset_sql):
"""
This nasty little function returns a dictionary that tells how how to select dimensions and metrics in the
Expand Down Expand Up @@ -227,6 +224,7 @@ def sql(self):
datasets, field_maps = _datasets_and_field_maps(self.dataset)
metrics = find_metrics_for_widgets(self._widgets)
raw_dataset_metrics = find_dataset_metrics(metrics)
orders = self.orders
dataset_queries = [
_build_dataset_query(
dataset,
Expand Down Expand Up @@ -264,8 +262,5 @@ def sql(self):
zip(*[dataset_query.sql for i, dataset_query in enumerate(dataset_queries)])
)

blend_query = _blender(self._dimensions, metrics, self._orders, field_maps)
blend_query = _blender(self._dimensions, metrics, orders, field_maps)
return [blend_query(*cp) for cp in tx_query_matrix]

def __str__(self):
return str(self.sql)
76 changes: 29 additions & 47 deletions fireant/queries/builder/dataset_query_builder.py
Expand Up @@ -9,6 +9,7 @@
alias_selector,
immutable,
)

from .query_builder import (
QueryBuilder,
QueryException,
Expand All @@ -18,7 +19,6 @@
)
from .. import special_cases
from ..execution import fetch_data
from ..field_helper import make_term_for_dimension
from ..finders import (
find_and_group_references_for_dimensions,
find_metrics_for_widgets,
Expand Down Expand Up @@ -68,28 +68,6 @@ def reference_groups(self):
).values()
)

@property
def orders(self):
"""
Initialize the DataSetQueryBuilder values for orders so that the SQL queries can be built. This also initializes
the default values for orders, which is all of the dimensions, if no order is specified.
"""
if self._orders is not None:
return [
(field.definition.as_(alias_selector(field.alias)), orientation)
for (field, orientation) in self._orders
]

# Initialize ordering to be by all dimensions

# Use the same function to make the definition terms to force it to be consistent.
# Always take the last element in order to prefer the display definition.
definitions = [
make_term_for_dimension(dimension) for dimension in self._dimensions
]

return [(definition, None) for definition in definitions]

@property
def sql(self):
"""
Expand All @@ -110,7 +88,6 @@ def sql(self):
share_dimensions = find_share_dimensions(self._dimensions, operations)

return make_slicer_query_with_totals_and_references(
self.dataset,
self.dataset.database,
self.table,
self.dataset.joins,
Expand Down Expand Up @@ -188,27 +165,32 @@ def __str__(self):

def __repr__(self):
return ".".join(
["dataset", "query"]
+ ["widget({})".format(repr(widget)) for widget in self._widgets]
+ [
"dimension({})".format(repr(dimension))
for dimension in self._dimensions
]
+ [
"filter({}{})".format(
repr(f),
", apply_filter_to_totals=True" if apply_filter_to_totals else "",
)
for f, apply_filter_to_totals in zip(
self._filters, self._apply_filter_to_totals
)
]
+ [
"reference({})".format(repr(reference))
for reference in self._references
]
+ [
"orderby({}, {})".format(definition.alias, orientation)
for (definition, orientation) in self.orders
]
[
"dataset",
"query",
*["widget({})".format(repr(widget)) for widget in self._widgets],
*[
"dimension({})".format(repr(dimension))
for dimension in self._dimensions
],
*[
"filter({}{})".format(
repr(f),
", apply_filter_to_totals=True"
if apply_filter_to_totals
else "",
)
for f, apply_filter_to_totals in zip(
self._filters, self._apply_filter_to_totals
)
],
*[
"reference({})".format(repr(reference))
for reference in self._references
],
*[
"orderby({}, {})".format(definition.alias, orientation)
for (definition, orientation) in self.orders
],
],
)
4 changes: 2 additions & 2 deletions fireant/queries/builder/dimension_choices_query_builder.py
Expand Up @@ -8,7 +8,7 @@
get_column_names,
)
from ..execution import fetch_data
from ..field_helper import make_term_for_dimension
from ..field_helper import make_term_for_field
from ..finders import find_joins_for_tables
from ..sql_transformer import make_slicer_query
from ...formats import display_value
Expand Down Expand Up @@ -82,7 +82,7 @@ def _make_terms_for_hint_dimensions(self):
"""
dimension_terms = []
for dimension in self._dimensions:
dimension_term = make_term_for_dimension(
dimension_term = make_term_for_field(
dimension, self.dataset.database.trunc_date
)
dimension_term = dimension_term.replace_table(
Expand Down
6 changes: 6 additions & 0 deletions fireant/queries/builder/query_builder.py
Expand Up @@ -135,6 +135,12 @@ def offset(self, offset):
"""
self._offset = offset

@property
def orders(self):
if self._orders is None:
return [(dimension, None) for dimension in self._dimensions]
return self._orders

@property
def sql(self):
"""
Expand Down
21 changes: 8 additions & 13 deletions fireant/queries/field_helper.py
Expand Up @@ -2,27 +2,22 @@
from fireant.utils import alias_selector


def make_term_for_metrics(metric):
f_alias = alias_selector(metric.alias)
return metric.definition.as_(f_alias)


def make_term_for_dimension(dimension, window=None):
def make_term_for_field(field, window=None):
"""
Makes a list of pypika terms for a given dataset definition.
Makes a list of pypika terms for a given dataset field.
:param dimension:
A dataset dimension.
:param field:
A field from a dataset.
:param window:
A window function to apply to the dimension definition if it is a continuous dimension.
:return:
a list of terms required to select and group by in a SQL query given a dataset dimension. This list will contain
either one or two elements. A second element will be included if the dimension has a definition for its display
field.
"""
f_alias = alias_selector(dimension.alias)
f_alias = alias_selector(field.alias)

if window and isinstance(dimension, DatetimeInterval):
return window(dimension.definition, dimension.interval_key).as_(f_alias)
if window and isinstance(field, DatetimeInterval):
return window(field.definition, field.interval_key).as_(f_alias)

return dimension.definition.as_(f_alias)
return field.definition.as_(f_alias)
52 changes: 28 additions & 24 deletions fireant/queries/pagination.py
@@ -1,19 +1,22 @@
import pandas as pd
from fireant.utils import alias_selector

from pypika import Order


def _get_window(limit, offset):
start = offset
end = offset + limit \
if None not in (offset, limit) \
else limit
end = offset + limit if None not in (offset, limit) else limit
return start, end


def _apply_sorting(orders):
sort_values, ascending = zip(*[(order[0].alias, order[1] is Order.asc)
for order in orders])
sort_values, ascending = zip(
*[
(alias_selector(field.alias), orientation is Order.asc)
for field, orientation in orders
]
)
return list(sort_values), ascending


Expand All @@ -37,9 +40,9 @@ def paginate(data_frame, widgets, orders=(), limit=None, offset=None):
return data_frame

start, end = _get_window(limit, offset)
group_pagination = isinstance(data_frame.index, pd.MultiIndex) \
and any([getattr(widget, 'group_pagination', False)
for widget in widgets])
group_pagination = isinstance(data_frame.index, pd.MultiIndex) and any(
[getattr(widget, "group_pagination", False) for widget in widgets]
)

if group_pagination:
return _group_paginate(data_frame, start, end, orders)
Expand All @@ -62,17 +65,16 @@ def _simple_paginate(data_frame, start=None, end=None, orders=()):
"""
if orders:
sort, ascending = _apply_sorting(orders)
data_frame = data_frame.sort_values(by=sort,
ascending=ascending)
data_frame = data_frame.sort_values(by=sort, ascending=ascending)

return data_frame[start:end]


def _index_isnull(data_frame):
if isinstance(data_frame.index, pd.MultiIndex):
return [any(pd.isnull(value)
for value in level)
for level in list(data_frame.index)]
return [
any(pd.isnull(value) for value in level) for level in list(data_frame.index)
]

return pd.isnull(data_frame.index)

Expand All @@ -97,26 +99,27 @@ def _group_paginate(data_frame, start=None, end=None, orders=()):

# Do not apply ordering on the 0th dimension !!!
# This would not have any result since the X-Axis on a chart is ordered sequentially
orders = [order
for order in orders
if order[0].alias != data_frame.index.names[0]]
orders = [order for order in orders if order[0].alias != data_frame.index.names[0]]

if orders:
# FIXME this should aggregate according to field definition, instead of sum
# Need a way to interpret definitions in python code in order to do that
aggregated_df = dimension_groups.sum()

sort, ascending = _apply_sorting(orders)
sorted_df = aggregated_df.sort_values(by=sort,
ascending=ascending)
sorted_df = aggregated_df.sort_values(by=sort, ascending=ascending)
sorted_dimension_values = tuple(sorted_df.index)[start:end]

else:
sorted_dimension_values = tuple(dimension_groups.apply(lambda g: g.name))[start:end]
sorted_dimension_values = tuple(dimension_groups.apply(lambda g: g.name))[
start:end
]

sorted_dimension_values = pd.Index(sorted_dimension_values, name=dimension_levels[0]) \
if len(dimension_levels) == 1 \
sorted_dimension_values = (
pd.Index(sorted_dimension_values, name=dimension_levels[0])
if len(dimension_levels) == 1
else pd.MultiIndex.from_tuples(sorted_dimension_values, names=dimension_levels)
)

def _apply_pagination(df):
# This function applies sorting by using the sorted dimension values as an index to select values in the right
Expand All @@ -138,7 +141,8 @@ def _apply_pagination(df):

return dfx.loc[index_slice, :].append(dfx[isnull])

return data_frame \
.sort_values(data_frame.index.names[0], ascending=True) \
.groupby(level=0) \
return (
data_frame.sort_values(data_frame.index.names[0], ascending=True)
.groupby(level=0)
.apply(_apply_pagination)
)
4 changes: 2 additions & 2 deletions fireant/queries/references.py
Expand Up @@ -2,7 +2,7 @@

from fireant.dataset.fields import Field

from .field_helper import make_term_for_dimension
from .field_helper import make_term_for_field
from .finders import find_field_in_modified_field


Expand All @@ -23,7 +23,7 @@ def adapt_for_reference_query(


def _replace_reference_dimension(dimension, offset_func, trunc_date=None):
ref_definition = offset_func(make_term_for_dimension(dimension, trunc_date))
ref_definition = offset_func(make_term_for_field(dimension, trunc_date))
field = Field(
alias=dimension.alias,
definition=ref_definition,
Expand Down

0 comments on commit d68965b

Please sign in to comment.