Skip to content

Commit

Permalink
Merge 69e02ca into e490e30
Browse files Browse the repository at this point in the history
  • Loading branch information
twheys committed Dec 13, 2018
2 parents e490e30 + 69e02ca commit 0c65015
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 265 deletions.
14 changes: 11 additions & 3 deletions fireant/formats.py
@@ -1,11 +1,16 @@
import numpy as np
import pandas as pd
from datetime import (
date,
datetime,
time,
)

import numpy as np
import pandas as pd
from fireant.utils import (
MAX_NUMBER,
MAX_STRING,
MAX_TIMESTAMP,
)

INF_VALUE = "Inf"
NULL_VALUE = 'null'
Expand Down Expand Up @@ -60,9 +65,12 @@ def dimension_value(value):
When True, dates and datetimes will be converted to ISO strings. The time is omitted for dates. When False, the
datetime will be converted to a POSIX timestamp (millis-since-epoch).
"""
if pd.isnull(value):
if value in {MAX_STRING, MAX_NUMBER, MAX_TIMESTAMP}:
return 'Totals'

if pd.isnull(value):
return NAN_VALUE

if isinstance(value, date):
if not hasattr(value, 'time') or value.time() == NO_TIME:
return value.strftime('%Y-%m-%d')
Expand Down
3 changes: 2 additions & 1 deletion fireant/slicer/queries/builder.py
Expand Up @@ -224,7 +224,8 @@ def fetch(self, hint=None) -> Iterable[Dict]:
data_frame = paginate(data_frame,
self._widgets,
orders=self._orders,
limit=self._limit, offset=self._offset)
limit=self._limit,
offset=self._offset)

# Apply transformations
return [widget.transform(data_frame, self.slicer, self._dimensions, self._references)
Expand Down
157 changes: 45 additions & 112 deletions fireant/slicer/queries/database.py
@@ -1,47 +1,43 @@
import time
from functools import (
partial,
reduce,
wraps,
)
from multiprocessing.pool import ThreadPool

import pandas as pd
from typing import (
Iterable,
Sized,
Union,
)

import numpy as np
import pandas as pd

from fireant.database import Database
from fireant.formats import (
NULL_VALUE,
TOTALS_VALUE,
)
from fireant.utils import (
MAX_NUMBER,
MAX_STRING,
MAX_TIMESTAMP,
chunks,
format_dimension_key,
)
from .logger import (
query_logger,
slow_query_logger,
)
from ..dimensions import (
ContinuousDimension,
Dimension,
)
from ..dimensions import Dimension


def fetch_data(database: Database, queries: Union[Sized, Iterable], dimensions: Iterable[Dimension],
reference_groups=()):
iterable = [(str(query.limit(int(database.max_result_set_size))), database, dimensions)
iterable = [(str(query.limit(int(database.max_result_set_size))), database)
for query in queries]

with ThreadPool(processes=database.max_processes) as pool:
results = pool.map(_exec, iterable)
pool.close()

return _reduce_result_set(results, reference_groups)
return _reduce_result_set(results, reference_groups, dimensions)


def _exec(args):
Expand Down Expand Up @@ -81,46 +77,73 @@ def wrapper(query, database, *args):

@db_cache
@log
def _do_fetch_data(query: str, database: Database, dimensions: Iterable[Dimension]):
def _do_fetch_data(query: str, database: Database):
"""
Executes a query to fetch data from database middleware and builds/cleans the data as a data frame. The query
execution is logged with its duration.
:param database:
instance of `fireant.Database`, database middleware
:param query: Query string
:param dimensions: A list of dimensions, used for setting the index on the result data frame.
:return: `pd.DataFrame` constructed from the result of the query
"""
with database.connect() as connection:
data_frame = pd.read_sql(query, connection, coerce_float=True, parse_dates=True)
return _clean_and_apply_index(data_frame, dimensions)
return pd.read_sql(query, connection, coerce_float=True, parse_dates=True)


def _reduce_result_set(results: Iterable[pd.DataFrame], reference_groups=()):
def _reduce_result_set(results: Iterable[pd.DataFrame], reference_groups, dimensions: Iterable[Dimension]):
"""
Reduces the result sets from individual queries into a single data frame. This effectively joins sets of references
and concats the sets of totals.
:param results: A list of data frame
:param reference_groups: A list of groups of references (grouped by interval such as WoW, etc)
:param dimensions: A list of dimensions, used for setting the index on the result data frame.
:return:
"""

# One result group for each rolled up dimension. Groups contain one member plus one for each reference type used.
result_groups = chunks(results, 1 + len(reference_groups))

groups = []
for result_group in result_groups:
dimension_keys = [format_dimension_key(d.key)
for d in dimensions]
rollup_dimension_keys = [format_dimension_key(d.key)
for d in dimensions
if d.is_rollup]
rollup_dimension_dtypes = result_groups[0][0][rollup_dimension_keys].dtypes

# Reduce each group to one data frame per rolled up dimension
group_data_frames = []
for i, result_group in enumerate(result_groups):
base_df = result_group[0]
reference_dfs = [_make_reference_data_frame(base_df, result, reference)
for result, reference_group in zip(result_group[1:], reference_groups)
for reference in reference_group]

reduced = reduce(lambda left, right: pd.merge(left, right, how='outer', left_index=True, right_index=True),
[base_df] + reference_dfs)
groups.append(reduced)

return pd.concat(groups, sort=False)
if rollup_dimension_keys[:i]:
reduced = _replace_nans_for_rollup_values(reduced, rollup_dimension_dtypes[-i:])

reduced = reduced.set_index(dimension_keys)
group_data_frames.append(reduced)

return pd.concat(group_data_frames, sort=False) \
.sort_index(na_position='first')


def _replace_nans_for_rollup_values(data_frame, dtypes):
replace = {
np.dtype('<M8[ns]'): MAX_TIMESTAMP,
np.dtype('int64'): MAX_NUMBER,
}

for dimension_key, dtype in dtypes.items():
data_frame[dimension_key] = replace.get(dtype, MAX_STRING)

return data_frame


def _make_reference_data_frame(base_df, ref_df, reference):
Expand Down Expand Up @@ -156,93 +179,3 @@ def _make_reference_data_frame(base_df, ref_df, reference):
if reference.delta_percent:
return 100. * ref_delta_df / ref_df
return ref_delta_df


def _clean_and_apply_index(data_frame: pd.DataFrame, dimensions: Iterable[Dimension]):
"""
Sets the index on a data frame. This will also replace any nulls from the database with an empty string for
non-continuous dimensions. Totals will be indexed with Nones.
:param data_frame:
:param dimensions:
:return:
"""
if not dimensions:
return data_frame

dimension_keys = [format_dimension_key(d.key)
for d in dimensions]

for i, dimension in enumerate(dimensions):
if isinstance(dimension, ContinuousDimension):
# Continuous dimensions are can never contain null values since they are selected as windows of values
# With that in mind, we leave the NaNs in them to represent Totals.
continue

level = format_dimension_key(dimension.key)
data_frame[level] = _fill_nans_in_level(data_frame, dimensions[:i + 1]) \
.apply(
# Handles an annoying case of pandas in which the ENTIRE data frame gets converted from int to float if
# the are NaNs, even if there are no NaNs in the column :/
lambda x: int(x) if isinstance(x, float) and float.is_integer(x) else x) \
.apply(lambda x: str(x) if not pd.isnull(x) else None)

# Set index on dimension columns
return data_frame.set_index(dimension_keys)


def _fill_nans_in_level(data_frame, dimensions):
"""
In case there are NaN values representing both totals (from ROLLUP) and database nulls, we need to replace the real
nulls with an empty string in order to distinguish between them. We choose to replace the actual database nulls
with an empty string in order to let NaNs represent Totals because mixing string values in the pandas index types
used by continuous dimensions does work.
:param data_frame:
The data_frame we are replacing values in.
:param dimensions:
A list of dimensions with the last item in the list being the dimension to fill nans for. This function requires
the dimension being processed as well as the preceding dimensions since a roll up in a higher level dimension
results in nulls for lower level dimension.
:return:
The level in the data_frame with the nulls replaced with empty string
"""
level = format_dimension_key(dimensions[-1].key)

number_rollup_dimensions = sum(dimension.is_rollup for dimension in dimensions)
if 0 < number_rollup_dimensions:
fill_nan_for_nulls = partial(_fill_nan_for_nulls, n_rolled_up_dimensions=number_rollup_dimensions)

if 1 < len(dimensions):
preceding_dimension_keys = [format_dimension_key(d.key)
for d in dimensions[:-1]]

return (data_frame
.groupby(preceding_dimension_keys)[level]
.apply(fill_nan_for_nulls))

return fill_nan_for_nulls(data_frame[level])

return data_frame[level].fillna(NULL_VALUE)


def _fill_nan_for_nulls(df, n_rolled_up_dimensions=1):
"""
Fills the first NaN with a literal string "null" if there are two NaN values, otherwise nothing is filled.
:param df:
:param n_rolled_up_dimensions:
The number of rolled up dimensions preceding and including the dimension
:return:
"""

# If there are rolled up dimensions, then fill only the first instance of NULL with a literal string "null" and
# the rest of the nulls are totals. This check compares the number of nulls to the number of rolled up dimensions,
# or expected nulls which are totals rows. If there are more nulls, there should be exactly
# `n_rolled_up_dimensions+1` nulls which means one is a true `null` value.
number_of_nulls_for_dimension = pd.isnull(df).sum()
if n_rolled_up_dimensions < number_of_nulls_for_dimension:
assert n_rolled_up_dimensions + 1 == number_of_nulls_for_dimension
return df.fillna(NULL_VALUE, limit=1).fillna(TOTALS_VALUE)

return df.fillna(TOTALS_VALUE)
2 changes: 1 addition & 1 deletion fireant/slicer/queries/makers.py
Expand Up @@ -80,7 +80,7 @@ def make_slicer_query_with_rollup_and_references(database,
```
"""
rollup_dimensions = find_rolled_up_dimensions(dimensions)
rollup_dimensions_and_none = [None] + rollup_dimensions
rollup_dimensions_and_none = [None] + rollup_dimensions[::-1]

reference_groups = find_and_group_references_for_dimensions(references)
reference_groups_and_none = [(None, None)] + list(reference_groups.items())
Expand Down
8 changes: 7 additions & 1 deletion fireant/slicer/widgets/datatables.py
Expand Up @@ -7,7 +7,11 @@
formats,
utils,
)
from fireant.formats import NAN_VALUE
from fireant.utils import (
MAX_NUMBER,
MAX_STRING,
MAX_TIMESTAMP,
format_dimension_key,
format_metric_key,
)
Expand Down Expand Up @@ -40,8 +44,10 @@ def _render_dimension_cell(dimension_value: str, display_values: dict):
dimension_cell = {'value': formats.dimension_value(dimension_value)}

if display_values is not None:
if pd.isnull(dimension_value):
if dimension_value in {MAX_STRING, MAX_NUMBER, MAX_TIMESTAMP}:
dimension_cell['display'] = 'Totals'
elif pd.isnull(dimension_value):
dimension_cell['display'] = NAN_VALUE
else:
display_value = display_values.get(dimension_value, dimension_value)
dimension_cell['display'] = formats.dimension_value(display_value)
Expand Down
14 changes: 12 additions & 2 deletions fireant/slicer/widgets/helpers.py
@@ -1,13 +1,23 @@
import numpy as np
import pandas as pd

from fireant import utils
from fireant.formats import (
INF_VALUE,
NAN_VALUE,
)
from fireant.utils import (
MAX_NUMBER,
MAX_STRING,
MAX_TIMESTAMP,
)
from ..references import reference_label

TOTALS_MARKERS = {
MAX_NUMBER,
MAX_STRING,
MAX_TIMESTAMP,
}


def extract_display_values(dimensions, data_frame):
"""
Expand Down Expand Up @@ -77,7 +87,7 @@ def render_series_label(dimension_values, metric=None, reference=None):
dimension_labels = [utils.getdeepattr(dimension_display_values,
(utils.format_dimension_key(dimension.key), dimension_value),
dimension_value)
if not pd.isnull(dimension_value)
if dimension_value not in TOTALS_MARKERS
else 'Totals'
for dimension, dimension_value in zip(used_dimensions, dimension_values)]

Expand Down
9 changes: 8 additions & 1 deletion fireant/slicer/widgets/highcharts.py
@@ -1,6 +1,7 @@
import itertools

import pandas as pd
from datetime import timedelta

from fireant import (
DatetimeDimension,
Expand Down Expand Up @@ -57,6 +58,7 @@
)

SERIES_NEEDING_MARKER = (ChartWidget.LineSeries, ChartWidget.AreaSeries)
TS_UPPER_BOUND = pd.Timestamp.max - timedelta(seconds=1)


class HighCharts(ChartWidget, TransformableWidget):
Expand Down Expand Up @@ -96,6 +98,12 @@ def transform(self, data_frame, slicer, dimensions, references):

dimension_display_values = extract_display_values(dimensions, data_frame)
render_series_label = dimensional_metric_label(dimensions, dimension_display_values)
is_timeseries = dimensions and isinstance(dimensions[0], DatetimeDimension)

# Timestamp.max is used as a marker for rolled up dimensions (totals). Filter out the totals value for the
# dimension used for the x-axis
if is_timeseries:
data_frame = data_frame.loc[:TS_UPPER_BOUND]

# Group the results by index levels after the 0th, one for each series
# This will result in a series for every combination of dimension values and each series will contain a data set
Expand All @@ -118,7 +126,6 @@ def transform(self, data_frame, slicer, dimensions, references):
y_axes[0:0] = self._render_y_axis(axis_idx,
axis_color if 1 < total_num_series else None,
references)
is_timeseries = dimensions and isinstance(dimensions[0], DatetimeDimension)

series += self._render_series(axis,
axis_idx,
Expand Down

0 comments on commit 0c65015

Please sign in to comment.