Skip to content

Commit

Permalink
Merge pull request #361 from gl3nn/move-to-database
Browse files Browse the repository at this point in the history
Move code around so that Database subclasses can overwrite behaviour
  • Loading branch information
gl3nn committed May 19, 2021
2 parents f204d00 + de34a50 commit ecef5c1
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 309 deletions.
224 changes: 223 additions & 1 deletion fireant/database/base.py
@@ -1,17 +1,37 @@
from datetime import datetime
from functools import partial
from typing import Collection, Dict, Union
from typing import Iterable, List, Sequence, Type

import pandas as pd
from pypika import (
Query,
enums,
functions as fn,
terms,
)
from pypika import Table, functions as fn
from pypika.queries import QueryBuilder
from pypika.terms import Function

from fireant.dataset.fields import Field
from fireant.dataset.filters import Filter
from fireant.dataset.joins import Join
from fireant.exceptions import QueryCancelled
from fireant.middleware.decorators import apply_middlewares, connection_middleware
from fireant.queries.finders import (
find_totals_dimensions,
find_and_group_references_for_dimensions,
find_required_tables_to_join,
find_joins_for_tables,
)
from fireant.queries.references import make_reference_dimensions, make_reference_metrics, make_reference_filters
from fireant.queries.special_cases import adjust_daterange_filter_for_rolling_window
from fireant.queries.totals_helper import adapt_for_totals_query
from fireant.utils import (
alias_selector,
flatten,
)
from fireant.dataset.intervals import DatetimeInterval


class Database(object):
Expand Down Expand Up @@ -126,3 +146,205 @@ def fetch_dataframe(self, query, **kwargs):

def __str__(self):
return f'Database|{self.__class__.__name__}|{self.host}'

def make_slicer_query_with_totals_and_references(
self,
table,
joins,
dimensions,
metrics,
operations,
filters,
references,
orders,
share_dimensions=(),
) -> List[Type[QueryBuilder]]:
"""
The following two loops will run over the spread of the two sets including a NULL value in each set:
- reference group (WoW, MoM, etc.)
- dimension with roll up/totals enabled (totals dimension)
This will result in at least one query where the reference group and totals dimension is NULL, which shall be
called base query. The base query will ALWAYS be present, even if there are zero reference groups or totals
dimensions.
For a concrete example, check the test case in :
```
fireant.tests.queries.test_build_dimensions.QueryBuilderDimensionTotalsTests
#test_build_query_with_totals_cat_dimension_with_references
```
"""

filters = adjust_daterange_filter_for_rolling_window(dimensions, operations, filters)

totals_dimensions = find_totals_dimensions(
dimensions,
share_dimensions,
)
totals_dimensions_and_none = [None] + totals_dimensions[::-1]

reference_groups = find_and_group_references_for_dimensions(dimensions, references)
reference_groups_and_none = [(None, None)] + list(reference_groups.items())

queries = []
for totals_dimension in totals_dimensions_and_none:
(dimensions_with_totals, filters_with_totals) = adapt_for_totals_query(
totals_dimension,
dimensions,
filters,
)

for reference_parts, references in reference_groups_and_none:
(dimensions_with_ref, metrics_with_ref, filters_with_ref,) = self.adapt_for_reference_query(
reference_parts,
dimensions_with_totals,
metrics,
filters_with_totals,
references,
)
query = self.make_slicer_query(
table,
joins,
dimensions_with_ref,
metrics_with_ref,
filters_with_ref,
orders,
)

# Add these to the query instance so when the data frames are joined together, the correct references and
# totals can be applied when combining the separate result set from each query.
query._totals = totals_dimension
query._references = references
queries.append(query)

return queries

def adapt_for_reference_query(self, reference_parts, dimensions, metrics, filters, references):
if reference_parts is None:
return dimensions, metrics, filters

ref_dim, unit, interval = reference_parts

offset_func = partial(self.date_add, date_part=unit, interval=interval)
offset_func_inv = partial(self.date_add, date_part=unit, interval=-interval)

ref_dimensions = make_reference_dimensions(
dimensions, ref_dim, offset_func, self.transform_field_to_query, self.trunc_date
)
ref_metrics = make_reference_metrics(metrics, references[0].reference_type.alias)
ref_filters = make_reference_filters(filters, ref_dim, offset_func_inv)

return ref_dimensions, ref_metrics, ref_filters

def make_slicer_query(
self,
base_table: Table,
joins: Sequence[Join] = (),
dimensions: Sequence[Field] = (),
metrics: Sequence[Field] = (),
filters: Sequence[Filter] = (),
orders: Sequence = (),
) -> Type[QueryBuilder]:
"""
Creates a pypika/SQL query from a list of slicer elements.
This is the base implementation shared by two implementations: the query to fetch data for a slicer request and
the query to fetch choices for dimensions.
This function only handles dimensions (select+group by) and filtering (where/having), which is everything needed
for the query to fetch choices for dimensions.
The slicer query extends this with metrics, references, and totals.
:param base_table:
pypika.Table - The base table of the query, the one in the FROM clause
:param joins:
A collection of joins available in the slicer. This should include all slicer joins. Only joins required for
the query will be used.
:param dimensions:
A collection of dimensions to use in the query.
:param metrics:
A collection of metrics to use in the query.
:param filters:
A collection of filters to apply to the query.
:param orders:
A collection of orders as tuples of the metric/dimension to order by and the direction to order in.
:return:
"""
query = self.query_cls.from_(base_table, immutable=False)
elements = flatten([metrics, dimensions, filters])

# Add joins
join_tables_needed_for_query = find_required_tables_to_join(elements, base_table)

for join in find_joins_for_tables(joins, base_table, join_tables_needed_for_query):
query = query.join(join.table, how=join.join_type).on(join.criterion)

# Add dimensions
for dimension in dimensions:
dimension_term = self.transform_field_to_query(dimension, self.trunc_date)
query = query.select(dimension_term)

if dimension.groupable:
query = query.groupby(dimension_term)

# Add filters
for fltr in filters:
query = query.having(fltr.definition) if fltr.is_aggregate else query.where(fltr.definition)

# Add metrics
metric_terms = [self.transform_field_to_query(metric) for metric in metrics]
if metric_terms:
query = query.select(*metric_terms)

# In the case that the orders are determined by a field that is not selected as a metric or dimension, then it needs
# to be added to the query.
select_aliases = {el.alias for el in query._selects}
for (orderby_field, orientation) in orders:
orderby_term = self.transform_field_to_query(orderby_field)
query = query.orderby(orderby_term, order=orientation)

if orderby_term.alias not in select_aliases:
query = query.select(orderby_term)

return query

def make_latest_query(
self,
base_table: Table,
joins: Iterable[Join] = (),
dimensions: Iterable[Field] = (),
):
query = self.query_cls.from_(base_table, immutable=False)

# Add joins
join_tables_needed_for_query = find_required_tables_to_join(dimensions, base_table)
for join in find_joins_for_tables(joins, base_table, join_tables_needed_for_query):
query = query.join(join.table, how=join.join_type).on(join.criterion)

for dimension in dimensions:
f_dimension_key = alias_selector(dimension.alias)
query = query.select(fn.Max(dimension.definition).as_(f_dimension_key))

return query

def transform_field_to_query(self, field, window=None):
"""
Makes a list of pypika terms for a given dataset field.
:param field:
A field from a dataset.
:param window:
A window function to apply to the dimension definition if it is a continuous dimension.
:return:
a list of terms required to select and group by in a SQL query given a dataset dimension. This list will contain
either one or two elements. A second element will be included if the dimension has a definition for its display
field.
"""
f_alias = alias_selector(field.alias)

if window and isinstance(field, DatetimeInterval):
return window(field.definition, field.interval_key).as_(f_alias)

return field.definition.as_(f_alias)
1 change: 0 additions & 1 deletion fireant/queries/__init__.py
@@ -1 +0,0 @@
from .builder import *
6 changes: 1 addition & 5 deletions fireant/queries/builder/dataset_blender_query_builder.py
Expand Up @@ -12,7 +12,6 @@
find_operations_for_widgets,
find_share_dimensions,
)
from fireant.queries.sql_transformer import make_slicer_query_with_totals_and_references
from fireant.reference_helpers import reference_type_alias
from fireant.utils import alias_selector, filter_nones, listify, ordered_distinct_list_by_attr
from fireant.widgets.base import Widget
Expand Down Expand Up @@ -187,12 +186,9 @@ def _build_dataset_query(
dataset_references = map_blender_fields_to_dataset_fields(references, field_map, dataset)
dataset_share_dimensions = map_blender_fields_to_dataset_fields(share_dimensions, field_map, dataset)
dataset_metrics = ordered_distinct_list_by_attr(dataset_metrics)

# TODO: It's possible that we have to adapt/map the operations for @apply_special_cases
dataset_operations = operations

return make_slicer_query_with_totals_and_references(
database=dataset.database,
return dataset.database.make_slicer_query_with_totals_and_references(
table=dataset.table,
joins=dataset.joins,
dimensions=dataset_dimensions,
Expand Down
11 changes: 2 additions & 9 deletions fireant/queries/builder/dataset_query_builder.py
Expand Up @@ -28,10 +28,6 @@
find_share_dimensions,
)
from ..pagination import paginate
from ..sql_transformer import (
make_slicer_query,
make_slicer_query_with_totals_and_references,
)

if TYPE_CHECKING:
from pypika import PyPikaQueryBuilder
Expand Down Expand Up @@ -91,8 +87,7 @@ def sql(self) -> List[Type['PyPikaQueryBuilder']]:
operations = find_operations_for_widgets(self._widgets)
share_dimensions = find_share_dimensions(dimensions, operations)

queries = make_slicer_query_with_totals_and_references(
database=self.dataset.database,
queries = self.dataset.database.make_slicer_query_with_totals_and_references(
table=self.table,
joins=self.dataset.joins,
dimensions=dimensions,
Expand All @@ -116,7 +111,6 @@ def fetch(self, hint=None) -> Union[Iterable[Dict], Dict]:
A list of dict (JSON) objects containing the widget configurations.
"""
queries = add_hints(self.sql, hint)

operations = find_operations_for_widgets(self._widgets)
dimensions = self.dimensions

Expand Down Expand Up @@ -203,8 +197,7 @@ def fetch_annotation(self):

annotation_dimensions = [annotation_alignment_field, annotation.field]

annotation_query = make_slicer_query(
database=self.dataset.database,
annotation_query = self.dataset.database.make_slicer_query(
base_table=annotation.table,
dimensions=annotation_dimensions,
filters=annotation_alignment_dimension_filters,
Expand Down
13 changes: 5 additions & 8 deletions fireant/queries/builder/dimension_choices_query_builder.py
Expand Up @@ -8,11 +8,9 @@
add_hints,
get_column_names,
)
from ..execution import fetch_data
from ..field_helper import make_term_for_field
from ..finders import find_joins_for_tables
from ..sql_transformer import make_slicer_query
from ...formats import display_value
from fireant.queries.execution import fetch_data
from fireant.queries.finders import find_joins_for_tables
from fireant.formats import display_value


class DimensionChoicesQueryBuilder(QueryBuilder):
Expand Down Expand Up @@ -76,7 +74,7 @@ def _make_terms_for_hint_dimensions(self):
"""
dimension_terms = []
for dimension in self.dimensions:
dimension_term = make_term_for_field(dimension, self.dataset.database.trunc_date)
dimension_term = self.dataset.database.transform_field_to_query(dimension, self.dataset.database.trunc_date)
dimension_term = dimension_term.replace_table(dimension_term.table, self.hint_table)
dimension_terms.append(dimension_term)

Expand All @@ -95,8 +93,7 @@ def sql(self):
filters = self._extract_hint_filters() if self.hint_table else self.filters

query = (
make_slicer_query(
database=self.dataset.database,
self.dataset.database.make_slicer_query(
base_table=self.dataset.table,
joins=self.dataset.joins,
dimensions=dimensions,
Expand Down
8 changes: 3 additions & 5 deletions fireant/queries/builder/dimension_latest_query_builder.py
Expand Up @@ -5,9 +5,8 @@
alias_for_alias_selector,
immutable,
)
from .query_builder import QueryBuilder, QueryException, add_hints
from ..execution import fetch_data
from ..sql_transformer import make_latest_query
from fireant.queries.builder.query_builder import QueryBuilder, QueryException, add_hints
from fireant.queries.execution import fetch_data


class DimensionLatestQueryBuilder(QueryBuilder):
Expand All @@ -32,8 +31,7 @@ def sql(self):
if not self.dimensions:
raise QueryException("Must select at least one dimension to query latest values")

query = make_latest_query(
database=self.dataset.database,
query = self.dataset.database.make_latest_query(
base_table=self.table,
joins=self.dataset.joins,
dimensions=self.dimensions,
Expand Down
7 changes: 4 additions & 3 deletions fireant/queries/execution.py
Expand Up @@ -6,12 +6,13 @@

from fireant.database import Database
from fireant.dataset.fields import DataType, Field
from fireant.dataset.modifiers import RollupValue
from fireant.dataset.references import calculate_delta_percent
from fireant.dataset.totals import get_totals_marker_for_dtype
from fireant.queries.finders import find_field_in_modified_field, find_totals_dimensions
from fireant.queries.pandas_workaround import df_subtract
from fireant.utils import alias_selector, chunks
from .finders import find_field_in_modified_field, find_totals_dimensions
from .pandas_workaround import df_subtract
from ..dataset.modifiers import RollupValue


logger = logging.getLogger(__name__)

Expand Down

0 comments on commit ecef5c1

Please sign in to comment.