Skip to content

Commit

Permalink
Fix set feature for data blending.
Browse files Browse the repository at this point in the history
  • Loading branch information
x8lucas8x committed Jul 14, 2020
1 parent d1bbd48 commit 840f7ed
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 54 deletions.
19 changes: 17 additions & 2 deletions fireant/dataset/fields.py
Expand Up @@ -116,6 +116,12 @@ def __init__(
self.precision = precision
self.hyperlink_template = hyperlink_template

# An artificial field is created dynamically as the query is mounted, instead of being defined during
# instantiation. That's the case for set dimensions, for instance. The only practical aspect of this
# is that artificial dimensions are hashed differently (see dunder hash method). Instead of using the
# memory reference, artificial dimensions use a set of its fields.
self.is_artificial = False

@property
def is_aggregate(self):
return self.definition.is_aggregate
Expand Down Expand Up @@ -271,13 +277,22 @@ def __hash__(self):
"""
return hash(None)

if self.is_artificial:
"""
Python's deep copy built-in are a problem when:
Handling artificial fields (e.g. set field) in data blending.
Given artificial fields are dynamically generated, they can't be properly compared when hashed
by memory id. Therefore we hash them differently.
"""
return hash((self.alias, self.label, self.data_type, self.definition))

return id(self)

def for_(self, replacement):
return replacement

def get_sql(self, **kwargs):
raise NotImplementedError
def get_sql(self, *args, **kwargs):
return self.definition.get_sql(*args, **kwargs)

@property
@immutable
Expand Down
101 changes: 72 additions & 29 deletions fireant/queries/builder/dataset_blender_query_builder.py
Expand Up @@ -15,7 +15,10 @@
from fireant.utils import alias_selector, listify, ordered_distinct_list_by_attr
from fireant.widgets.base import Widget
from pypika import Query, JoinType

from ..sets import (
apply_set_dimensions,
omit_set_filters,
)

@listify
def _find_dataset_fields_needed_to_be_mapped(dataset):
Expand Down Expand Up @@ -58,56 +61,95 @@ def _find_dataset_fields_needed_to_be_mapped(dataset):


@listify
def _map_field(dataset, fields, dimension_map=None):
for field in fields:
def _map_field(dataset_fields, blender_dataset_fields, dimension_map=None):
"""
Returns a list of tuples shaped as blended dataset field and its matching field in the provided fields. Blender
fields that have to match in the dataset fields will be ignored. Please note that the Field class has a custom
dunder hash method.
:param dataset_fields: A set of Field instances pertaining to a single dataset (i.e. primary or secondary).
:param blender_dataset_fields: A list of Field instances in the blender dataset.
:param dimension_map: A dict mapping Field instances from different datasets.
Often used to express the equivalency of primary and secondary dataset fields.
:return: A list of tuples shaped as blended dataset field and its matching field in the provided fields.
"""
for field in blender_dataset_fields:
if dimension_map is not None and field.definition in dimension_map:
yield field, dimension_map[field.definition]
if field.definition in dataset.fields:
if field.definition in dataset_fields:
yield field, field.definition
# also yield the dataset field mapped to itself so that any reference to this field (from blender or
# from dataset) can be used.
yield field.definition, field.definition
if field in dataset.fields:
if field in dataset_fields:
yield field, field


def _datasets_and_field_maps(blender):
def _datasets_and_field_maps(blender_dataset, filters):
"""
Returns a list of tuples shaped as dataset and its fields mapped per the blender dataset ones. The field maps
are later used for knowing which columns to query on the respective primary and secondary datasets, given the
columns selected for the blender dataset.
:param blender: A DataSetBlender instance.
:param filters: A list of Filter instances, present on the blender query. This is used for creating dimensions
derived from filters, such as the set dimension.
:return: A list of tuples shaped as dataset and its fields mapped per the blender dataset.
"""
from fireant.dataset.data_blending import DataSetBlender

def _flatten_blend_datasets(dataset) -> List:
primary = dataset.primary_dataset
secondary = dataset.secondary_dataset
dataset_fields = _find_dataset_fields_needed_to_be_mapped(dataset)
primary_dataset = dataset.primary_dataset
secondary_dataset = dataset.secondary_dataset
blender_dataset_fields = apply_set_dimensions(
_find_dataset_fields_needed_to_be_mapped(dataset), filters, dataset,
)

primary_dataset_fields = set(apply_set_dimensions(primary_dataset.fields, filters, primary_dataset))
secondary_dataset_fields = set(apply_set_dimensions(secondary_dataset.fields, filters, secondary_dataset))

primary_dimension_per_blender_dimension = dict(_map_field(
primary_dataset_fields, blender_dataset_fields,
))

primary_mapped_dimensions = list(
apply_set_dimensions(dataset.dimension_map.keys(), filters, primary_dataset)
)
secondary_mapped_dimensions = list(
apply_set_dimensions(dataset.dimension_map.values(), filters, secondary_dataset)
)
secondary_dimension_per_primary_dimension = dict(zip(
primary_mapped_dimensions, secondary_mapped_dimensions,
))

blender2primary_field_map = dict(_map_field(primary, dataset_fields))
blender2secondary_field_map = dict(
_map_field(secondary, dataset_fields, dataset.dimension_map)
secondary_dimension_per_blender_dimension = dict(
_map_field(secondary_dataset_fields, blender_dataset_fields, secondary_dimension_per_primary_dimension)
)

if not isinstance(primary, DataSetBlender):
if not isinstance(primary_dataset, DataSetBlender):
return [
(primary, blender2primary_field_map),
(secondary, blender2secondary_field_map),
(primary_dataset, primary_dimension_per_blender_dimension),
(secondary_dataset, secondary_dimension_per_blender_dimension),
]

# get the dataset children of the blender (`dataset.primary_dataset`) and their corresponding field_maps,
# then update the field map to reference this blender's field (`dataset`)
datasets_and_field_maps = []
for ds, fm in _flatten_blend_datasets(primary):
for ds, fm in _flatten_blend_datasets(primary_dataset):
remapped_field_map = {**fm}
for field in dataset_fields:
for field in blender_dataset_fields:
if (
field not in blender2primary_field_map
or blender2primary_field_map[field] not in fm
field not in primary_dimension_per_blender_dimension
or primary_dimension_per_blender_dimension[field] not in fm
):
continue
remapped_field_map[field] = fm[blender2primary_field_map[field]]
remapped_field_map[field] = fm[primary_dimension_per_blender_dimension[field]]

datasets_and_field_maps.append((ds, remapped_field_map))

return [*datasets_and_field_maps, (secondary, blender2secondary_field_map)]
return [*datasets_and_field_maps, (secondary_dataset, secondary_dimension_per_blender_dimension)]

return zip(*_flatten_blend_datasets(blender))
return zip(*_flatten_blend_datasets(blender_dataset))


class EmptyWidget(Widget):
Expand Down Expand Up @@ -139,7 +181,7 @@ def _map_fields(fields):

dataset_metrics = ordered_distinct_list_by_attr(_map_fields(metrics))
dataset_dimensions = _map_fields(dimensions)
dataset_filters = _map_fields(filters)
dataset_filters = _map_fields(omit_set_filters(filters))
dataset_references = _map_fields(references)
dataset_share_dimensions = _map_fields(share_dimensions)

Expand Down Expand Up @@ -299,10 +341,11 @@ def sql(self):
# First run validation for the query on all widgets
self._validate()

datasets, field_maps = _datasets_and_field_maps(self.dataset)
datasets, field_maps = _datasets_and_field_maps(self.dataset, self._filters)
dimensions = self.dimensions
metrics = find_metrics_for_widgets(self._widgets)
metrics_aliases = {metric.alias for metric in metrics}
dimensions_aliases = {dimension.alias for dimension in self.dimensions}
dimensions_aliases = {dimension.alias for dimension in dimensions}

orders = self.orders
if orders is None:
Expand All @@ -318,7 +361,7 @@ def sql(self):

dataset_metrics = find_dataset_fields(metrics)
operations = find_operations_for_widgets(self._widgets)
share_dimensions = find_share_dimensions(self.dimensions, operations)
share_dimensions = find_share_dimensions(dimensions, operations)

datasets_queries = []
for dataset, field_map in zip(datasets, field_maps):
Expand All @@ -327,8 +370,8 @@ def sql(self):
dataset,
field_map,
dataset_metrics,
self.dimensions,
self.filters,
dimensions,
self._filters,
self._references,
operations,
share_dimensions
Expand Down Expand Up @@ -366,7 +409,7 @@ def sql(self):
blended_queries = []
for queryset in query_sets:
blended_query = _blend_query(
self.dimensions, metrics, orders, field_maps, queryset
dimensions, metrics, orders, field_maps, queryset
)

if blended_query:
Expand Down
3 changes: 1 addition & 2 deletions fireant/queries/builder/query_builder.py
Expand Up @@ -12,7 +12,6 @@
apply_set_dimensions,
omit_set_filters,
)
from ...dataset.modifiers import DimensionModifier


class QueryException(DataSetException):
Expand Down Expand Up @@ -168,7 +167,7 @@ def dimensions(self):
:return: A list of Field instances.
"""
return apply_set_dimensions(self._dimensions, self._filters)
return apply_set_dimensions(self._dimensions, self._filters, self.dataset)

@property
def filters(self):
Expand Down

0 comments on commit 840f7ed

Please sign in to comment.