Skip to content

Commit

Permalink
Merge b25a64d into d68965b
Browse files Browse the repository at this point in the history
  • Loading branch information
twheys committed Jan 13, 2020
2 parents d68965b + b25a64d commit b9f38cc
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 37 deletions.
2 changes: 1 addition & 1 deletion fireant/dataset/references.py
Expand Up @@ -113,4 +113,4 @@ def ref_field(metric):

def calculate_delta_percent(ref_df, ref_delta_df):
# pandas raises an exception when dividing by zero
return 100.0 * ref_delta_df / ref_df.replace(0, np.nan)
return 100.0 * ref_delta_df.divide(ref_df.replace(0, np.nan))
82 changes: 48 additions & 34 deletions fireant/queries/execution.py
@@ -1,6 +1,4 @@
from functools import (
reduce,
)
from functools import reduce
from typing import (
Iterable,
Sized,
Expand All @@ -19,25 +17,32 @@
chunks,
)
from .finders import find_totals_dimensions
from .pandas_workaround import df_subtract


def fetch_data(database: Database,
queries: Union[Sized, Iterable],
dimensions: Iterable[Field],
share_dimensions: Iterable[Field] = (),
reference_groups=()):
def fetch_data(
database: Database,
queries: Union[Sized, Iterable],
dimensions: Iterable[Field],
share_dimensions: Iterable[Field] = (),
reference_groups=(),
):
queries = [
str(query.limit(min(query._limit or float("inf"), database.max_result_set_size)))
str(
query.limit(min(query._limit or float("inf"), database.max_result_set_size))
)
for query in queries
]
results = database.fetch_dataframes(*queries)
return reduce_result_set(results, reference_groups, dimensions, share_dimensions)


def reduce_result_set(results: Iterable[pd.DataFrame],
reference_groups,
dimensions: Iterable[Field],
share_dimensions: Iterable[Field]):
def reduce_result_set(
results: Iterable[pd.DataFrame],
reference_groups,
dimensions: Iterable[Field],
share_dimensions: Iterable[Field],
):
"""
Reduces the result sets from individual queries into a single data frame. This effectively joins sets of references
and concatenates the sets of totals.
Expand All @@ -52,26 +57,32 @@ def reduce_result_set(results: Iterable[pd.DataFrame],
# One result group for each rolled up dimension. Groups contain one member plus one for each reference type used.
result_groups = chunks(results, 1 + len(reference_groups))

dimension_keys = [alias_selector(d.alias)
for d in dimensions]
totals_dimension_keys = [alias_selector(d.alias)
for d in find_totals_dimensions(dimensions, share_dimensions)]
dimension_keys = [alias_selector(d.alias) for d in dimensions]
totals_dimension_keys = [
alias_selector(d.alias)
for d in find_totals_dimensions(dimensions, share_dimensions)
]
dimension_dtypes = result_groups[0][0][dimension_keys].dtypes

# Reduce each group to one data frame per rolled up dimension
group_data_frames = []
for i, result_group in enumerate(result_groups):
if dimension_keys:
result_group = [result.set_index(dimension_keys)
for result in result_group]
result_group = [result.set_index(dimension_keys) for result in result_group]

base_df = result_group[0]
reference_dfs = [_make_reference_data_frame(base_df, result, reference)
for result, reference_group in zip(result_group[1:], reference_groups)
for reference in reference_group]

reduced = reduce(lambda left, right: pd.merge(left, right, how='outer', left_index=True, right_index=True),
[base_df] + reference_dfs)
reference_dfs = [
_make_reference_data_frame(base_df, result, reference)
for result, reference_group in zip(result_group[1:], reference_groups)
for reference in reference_group
]

reduced = reduce(
lambda left, right: pd.merge(
left, right, how="outer", left_index=True, right_index=True
),
[base_df] + reference_dfs,
)

# If there are rolled up dimensions in this result set then replace the NaNs for that dimension value with a
# marker to indicate totals.
Expand All @@ -82,8 +93,7 @@ def reduce_result_set(results: Iterable[pd.DataFrame],

group_data_frames.append(reduced)

return pd.concat(group_data_frames, sort=False) \
.sort_index(na_position='first')
return pd.concat(group_data_frames, sort=False).sort_index(na_position="first")


def _replace_nans_for_totals_values(data_frame, dtypes):
Expand All @@ -93,7 +103,9 @@ def _replace_nans_for_totals_values(data_frame, dtypes):
data_frame.reset_index(inplace=True)

for dimension_key, dtype in dtypes.items():
data_frame[dimension_key] = data_frame[dimension_key].fillna(get_totals_marker_for_dtype(dtype))
data_frame[dimension_key] = data_frame[dimension_key].fillna(
get_totals_marker_for_dtype(dtype)
)

return data_frame.set_index(index_names)

Expand All @@ -110,9 +122,9 @@ def _make_reference_data_frame(base_df, ref_df, reference):
:param reference:
:return:
"""
mertric_column_indices = [i
for i, column in enumerate(ref_df.columns)
if column not in base_df.columns]
mertric_column_indices = [
i for i, column in enumerate(ref_df.columns) if column not in base_df.columns
]
ref_columns = [ref_df.columns[i] for i in mertric_column_indices]

if not (reference.delta or reference.delta_percent):
Expand All @@ -123,10 +135,12 @@ def _make_reference_data_frame(base_df, ref_df, reference):
# Select just the metric columns from the DF and rename them with the reference key as a suffix
base_df, ref_df = base_df[base_columns].copy(), ref_df[ref_columns].copy()
# Both data frame columns are renamed in order to perform the calculation below.
base_df.columns = ref_df.columns = [column.replace(reference.reference_type.alias, reference.alias)
for column in ref_columns]
base_df.columns = ref_df.columns = [
column.replace(reference.reference_type.alias, reference.alias)
for column in ref_columns
]

ref_delta_df = base_df.subtract(ref_df, fill_value=0)
ref_delta_df = df_subtract(base_df, ref_df, fill_value=0)

if reference.delta_percent:
return calculate_delta_percent(ref_df, ref_delta_df)
Expand Down
25 changes: 25 additions & 0 deletions fireant/queries/pandas_workaround.py
@@ -0,0 +1,25 @@
import pandas as pd


def _reindex_with_nans(df, idx, fill_value=None):
missing = pd.DataFrame(index=idx.symmetric_difference(df.index), columns=df.columns)
if fill_value is not None:
missing = missing.fillna(fill_value)
return df.append(missing).reindex(idx)


def _reindex_deduplicate(left, right, fill_value=None):
combined_index = left.index.append(right.index)
dededuplicated_index = combined_index[~combined_index.duplicated()]
left_reindex, right_reindex = [
_reindex_with_nans(df, dededuplicated_index, fill_value=fill_value)
for df in (left, right)
]
return left_reindex, right_reindex


def df_subtract(left, right, fill_value=None):
left_reindex, right_reindex = _reindex_deduplicate(
left, right, fill_value=fill_value
)
return left_reindex.subtract(right_reindex)
4 changes: 2 additions & 2 deletions fireant/tests/dataset/test_execution.py
Expand Up @@ -159,7 +159,7 @@ def test_reduce_delta_percent_result_set_with_zeros_in_reference_value(self):
)

expected = raw_df.copy()
expected["$metric_dod_delta_percent"] = [-50, np.nan]
expected["$metric_dod_delta_percent"] = pd.Series([-50, np.nan], dtype=object)
expected.set_index("$timestamp", inplace=True)

timestamp = mock_dataset.fields.timestamp
Expand All @@ -179,7 +179,7 @@ def test_reduce_delta_result_with_non_aligned_index(self):
)

expected = raw_df.copy()
expected["$metric_dod_delta"] = [-1.0, 2.0]
expected["$metric_dod_delta"] = pd.Series([-1.0, 2.0], dtype=object)
expected.set_index("$timestamp", inplace=True)

timestamp = mock_dataset.fields.timestamp
Expand Down
68 changes: 68 additions & 0 deletions fireant/tests/dataset/test_pandas_workaround.py
@@ -0,0 +1,68 @@
from unittest import TestCase

import numpy as np
import pandas as pd

from fireant.queries.pandas_workaround import df_subtract


class TestSubtract(TestCase):
def test_subtract_partially_aligned_multi_index_dataframes_with_nans(self):
df0 = pd.DataFrame(
data=[
[1, 2],
[3, 4],
[5, 6],
[7, 8],
[9, 10],
[11, 12],
[13, 14],
[15, 16],
[17, 18],
],
columns=["happy", "sad"],
index=pd.MultiIndex.from_product(
[["a", "b", None], [0, 1, np.nan]], names=["l0", "l1"]
),
)
df1 = pd.DataFrame(
data=[
[1, 2],
[3, 4],
[5, 6],
[7, 8],
[9, 10],
[11, 12],
[13, 14],
[15, 16],
[17, 18],
],
columns=["happy", "sad"],
index=pd.MultiIndex.from_product(
[["b", "c", None], [1, 2, np.nan]], names=["l0", "l1"]
),
)

result = df_subtract(df0, df1, fill_value=0)
expected = pd.DataFrame.from_records(
[
["a", 0, 1 - 0, 2 - 0],
["a", 1, 3 - 0, 4 - 0],
["a", np.nan, 5 - 0, 6 - 0],
["b", 0, 7 - 0, 8 - 0],
["b", 1, 9 - 1, 10 - 2],
["b", np.nan, 11 - 5, 12 - 6],
[np.nan, 0, 13 - 0, 14 - 0],
[np.nan, 1, 15 - 13, 16 - 14],
[np.nan, np.nan, 17 - 17, 18 - 18],
["b", 2, 0 - 3, 0 - 4],
["c", 1, 0 - 7, 0 - 8],
["c", 2, 0 - 9, 0 - 10],
["c", np.nan, 0 - 11, 0 - 12],
[np.nan, 2, 0 - 15, 0 - 16],
],
columns=["l0", "l1", "happy", "sad"],
).set_index(["l0", "l1"])

pd.testing.assert_frame_equal(expected, result)
self.assertTrue(result.index.is_unique)

0 comments on commit b9f38cc

Please sign in to comment.