Skip to content

Commit

Permalink
Merge pull request #271 from kayak/fix_blender_metrics_with_conflicti…
Browse files Browse the repository at this point in the history
…ng_aliases_in_datasets

DataSetBlenderQueryBuilder bugfix
  • Loading branch information
twheys committed Jan 13, 2020
2 parents 9b55b3d + d6f0f25 commit 08ceed6
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 43 deletions.
8 changes: 1 addition & 7 deletions fireant/dataset/fields.py
Expand Up @@ -270,13 +270,7 @@ def __hash__(self):
"""
return hash(None)

return hash(
(
self.alias,
self.data_type,
# self.definition
)
)
return id(self)

def get_sql(self, **kwargs):
raise NotImplementedError
Expand Down
62 changes: 33 additions & 29 deletions fireant/queries/builder/dataset_blender_query_builder.py
@@ -1,11 +1,10 @@
from copy import deepcopy
import copy
from functools import reduce
from typing import (
Callable,
List,
)

from fireant.dataset.fields import Field
from fireant.exceptions import DataSetException
from fireant.queries.builder.dataset_query_builder import DataSetQueryBuilder
from fireant.queries.finders import (
Expand Down Expand Up @@ -123,8 +122,8 @@ def _join_criteria_for_blender_subqueries(primary, secondary, dimensions, field_
return reduce(lambda a, b: a & b, join_criteria)


def _blender(dimensions, metrics, orders, field_maps) -> Callable:
raw_dataset_metrics = set(find_dataset_metrics(metrics))
def _blender(datasets, dimensions, metrics, orders, field_maps) -> Callable:
dataset_metrics = set(find_dataset_metrics(metrics))

def _field_subquery_map(dataset_sql):
"""
Expand All @@ -144,31 +143,36 @@ def _field_subquery_map(dataset_sql):
field_subquery_map[dimension] = base[dimension_alias].as_(dimension_alias)

# dataset metrics
for metric in raw_dataset_metrics:
for sql in dataset_sql:
metric_alias = alias_selector(reference_alias(metric, reference))
select_aliases = {select.alias for select in sql._selects}
if metric_alias in select_aliases:
field_subquery_map[metric] = sql[metric_alias].as_(metric_alias)
break

# complex metrics - fields with definitions referring to other fields
complex_metrics = [
metric for metric in metrics if metric not in raw_dataset_metrics
]
for metric in complex_metrics:
for metric in dataset_metrics:
# Get the pypika query for the dataset this metric belongs too
sql = [
sql
for dataset, sql in zip(datasets, dataset_sql)
if metric in dataset.fields
][0]

metric_alias = alias_selector(reference_alias(metric, reference))
term = sql[metric_alias].as_(metric_alias)
field_subquery_map[metric] = term

# ######### WARNING: This is pretty shitty. #########
# A `get_sql` method is monkey patched to the instance of each Field inside the definition of the Field
# containing them. The definition must also be deep copied in case there are reference queries,
# since there will be multiple instances of the field with different aliases.
#
# A better solution for this would be to implement a replace function in pypika which could replace
# specific nodes in the object graph.
definition_copy = deepcopy(metric.definition)
metric_alias = alias_selector(reference_alias(metric, reference))
field_subquery_map[metric] = definition_copy.as_(metric_alias)
for field in definition_copy.find_(Field):
field.get_sql = field_subquery_map[field].get_sql
metric.get_sql = term.get_sql

dataset_blender_metrics = [
metric for metric in metrics if metric not in dataset_metrics
]
for metric in dataset_blender_metrics:
if metric.definition in field_subquery_map:
field_subquery_map[metric] = field_subquery_map[metric.definition]

else:
# Need to copy the metrics if there are references so that the `get_sql` monkey patch does not conflict
definition_copy = copy.deepcopy(metric.definition)
metric_alias = alias_selector(reference_alias(metric, reference))
field_subquery_map[metric] = definition_copy.as_(metric_alias)

return field_subquery_map

Expand All @@ -188,7 +192,7 @@ def _blend_query(base, *rest):

field_subquery_map = _field_subquery_map([base, *rest])
query = query.select(
*[field_subquery_map[select] for select in [*dimensions, *metrics]]
*[field_subquery_map[select] for select in [*dimensions, *metrics]],
)

for field, orientation in orders:
Expand Down Expand Up @@ -258,9 +262,9 @@ def sql(self):
be produced. This next line converts the list of rows of the table in the diagram to a list of columns. Each
set of queries in a column are then reduced to a single data blending sql query.
"""
tx_query_matrix = list(
querysets_tx = list(
zip(*[dataset_query.sql for i, dataset_query in enumerate(dataset_queries)])
)

blend_query = _blender(self._dimensions, metrics, orders, field_maps)
return [blend_query(*cp) for cp in tx_query_matrix]
blend_query = _blender(datasets, self._dimensions, metrics, orders, field_maps)
return [blend_query(*cp) for cp in querysets_tx]
16 changes: 15 additions & 1 deletion fireant/queries/builder/query_builder.py
@@ -1,6 +1,9 @@
from fireant.dataset.fields import Field
from fireant.exceptions import DataSetException
from fireant.utils import immutable
from fireant.utils import (
immutable,
deepcopy,
)
from pypika import Order
from ..execution import fetch_data
from ..finders import find_field_in_modified_field
Expand Down Expand Up @@ -64,6 +67,17 @@ def __init__(self, dataset, table):
self._limit = None
self._offset = None

def __deepcopy__(self, memodict={}):
fields = [d for d in self._dimensions]
if self._orders is not None:
fields += [field for (field, _) in self._orders]

for field in fields:
field = find_field_in_modified_field(field)
memodict[id(field)] = field

return deepcopy(self, memodict)

@immutable
def dimension(self, *dimensions):
"""
Expand Down
3 changes: 0 additions & 3 deletions fireant/tests/queries/test_build_data_blending.py
Expand Up @@ -6,9 +6,6 @@
Rollup,
mock_dataset_blender,
)


# noinspection SqlDialectInspection,SqlNoDataSourceInspection
from pypika import Order


Expand Down
97 changes: 97 additions & 0 deletions fireant/tests/queries/test_data_blending_corner_cases.py
@@ -0,0 +1,97 @@
from unittest import TestCase

from fireant import (
DataSet,
DataType,
Database,
Field,
ReactTable,
)
from pypika import Tables


class DataSetBlenderCornerCasesTests(TestCase):
maxDiff = None

def test_conflicting_metric_aliases_in_blended_datasets(self):
db = Database()
t0, t1 = Tables("test0", "test1")
base_ds = DataSet(
table=t0,
database=db,
fields=[
Field(
"timestamp",
label="Timestamp",
definition=t0.timestamp,
data_type=DataType.date,
),
Field(
"metric",
label="Metric",
definition=t0.metric,
data_type=DataType.number,
),
],
)
secondary_ds = DataSet(
table=t0,
database=db,
fields=[
Field(
"timestamp",
label="Timestamp",
definition=t0.timestamp,
data_type=DataType.date,
),
Field(
"metric",
label="Metric",
definition=t0.metric,
data_type=DataType.number,
),
],
)
blend_ds = (
base_ds.blend(secondary_ds)
.on_dimensions()
.extra_fields(
Field(
"metric_share",
label="Metric Share",
definition=base_ds.fields.metric / secondary_ds.fields.metric,
data_type=DataType.number,
),
)
)

sql = (
blend_ds.query()
.dimension(blend_ds.fields.timestamp)
.widget(ReactTable(blend_ds.fields.metric_share))
).sql

(query,) = sql
self.assertEqual(
"SELECT "
'"sq0"."$timestamp" "$timestamp",'
'"sq0"."$metric"/"sq1"."$metric" "$metric_share" '
"FROM ("
"SELECT "
'"timestamp" "$timestamp",'
'"metric" "$metric" '
'FROM "test0" '
'GROUP BY "$timestamp" '
'ORDER BY "$timestamp"'
') "sq0" '
"LEFT JOIN ("
"SELECT "
'"timestamp" "$timestamp",'
'"metric" "$metric" '
'FROM "test0" '
'GROUP BY "$timestamp" '
'ORDER BY "$timestamp"'
') "sq1" ON "sq0"."$timestamp"="sq1"."$timestamp" '
'ORDER BY "$timestamp"',
str(query),
)
4 changes: 1 addition & 3 deletions fireant/utils.py
Expand Up @@ -39,9 +39,7 @@ def deepcopy(value, memodict):
memodict[id(value)] = result

for k, v in value.__dict__.items():
result.__dict__[k] = (
memodict[id(v)] if id(v) in memodict else copy.deepcopy(v, memodict)
)
result.__dict__[k] = copy.deepcopy(v, memodict)

return result

Expand Down

0 comments on commit 08ceed6

Please sign in to comment.