Skip to content

Commit

Permalink
Fixed a bug in how the data frames are reduced after executing multip…
Browse files Browse the repository at this point in the history
…le queries so that the totals will be properly marked

Fixed a bug in how share metrics are calculated when there are 3 or more dimensions active
  • Loading branch information
twheys committed Jan 10, 2019
1 parent 245ecc4 commit 420d72d
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 10 deletions.
11 changes: 9 additions & 2 deletions fireant/slicer/operations.py
Expand Up @@ -250,8 +250,15 @@ def apply(self, data_frame, reference):

totals = reduce_data_frame_levels(data_frame.loc[totals_key, f_metric_key], group_levels)

def apply_totals(df):
return 100 * reduce_data_frame_levels(df / totals, group_levels)
def apply_totals(group_df):
if not isinstance(totals, pd.Series):
return 100 * group_df / totals

n_index_levels = len(totals.index.names)
extra_level_names = group_df.index.names[n_index_levels:]
group_df = group_df.reset_index(extra_level_names, drop=True)
share = 100 * group_df / totals[group_df.index]
return pd.Series(share.values, index=group_df.index)

return data_frame[f_metric_key] \
.groupby(level=group_levels) \
Expand Down
15 changes: 8 additions & 7 deletions fireant/slicer/queries/execution.py
@@ -1,4 +1,3 @@
import time
from functools import (
reduce,
wraps,
Expand All @@ -11,6 +10,7 @@
)

import pandas as pd
import time

from fireant.database import Database
from fireant.slicer.totals import get_totals_marker_for_dtype
Expand Down Expand Up @@ -38,7 +38,7 @@ def fetch_data(database: Database,
results = pool.map(_exec, iterable)
pool.close()

return _reduce_result_set(results, reference_groups, dimensions, share_dimensions)
return reduce_result_set(results, reference_groups, dimensions, share_dimensions)


def _exec(args):
Expand Down Expand Up @@ -93,17 +93,18 @@ def _do_fetch_data(query: str, database: Database):
return pd.read_sql(query, connection, coerce_float=True, parse_dates=True)


def _reduce_result_set(results: Iterable[pd.DataFrame],
reference_groups,
dimensions: Iterable[Dimension],
share_dimensions: Dimension):
def reduce_result_set(results: Iterable[pd.DataFrame],
reference_groups,
dimensions: Iterable[Dimension],
share_dimensions: Iterable[Dimension]):
"""
Reduces the result sets from individual queries into a single data frame. This effectively joins sets of references
and concats the sets of totals.
:param results: A list of data frame
:param reference_groups: A list of groups of references (grouped by interval such as WoW, etc)
:param dimensions: A list of dimensions, used for setting the index on the result data frame.
:param share_dimensions: A list of dimensions from which the totals are used for calculating share operations.
:return:
"""

Expand Down Expand Up @@ -136,7 +137,7 @@ def _reduce_result_set(results: Iterable[pd.DataFrame],
# The data frames will be ordered so that the first group will contain the data without any rolled up
# dimensions, then followed by the groups with them, ordered by the last rollup dimension first.
if totals_dimension_keys[:i]:
reduced = _replace_nans_for_totals_values(reduced, dimension_dtypes[-i - 1:])
reduced = _replace_nans_for_totals_values(reduced, dimension_dtypes)

group_data_frames.append(reduced)

Expand Down
13 changes: 13 additions & 0 deletions fireant/tests/slicer/mocks.py
Expand Up @@ -368,6 +368,16 @@ def _totals(df):

if groupby_levels:
level_totals_df = data_frame[columns].groupby(level=groupby_levels).apply(_totals)

missing_dims = set(data_frame.index.names) - set(level_totals_df.index.names)
if missing_dims:
for dim in missing_dims:
dtype = data_frame.index.levels[data_frame.index.names.index(dim)].dtype
level_totals_df[dim] = get_totals_marker_for_dtype(dtype)
level_totals_df.set_index(dim, append=True, inplace=True)

level_totals_df = level_totals_df.reorder_levels(data_frame.index.names)

else:
totals_index_values = [get_totals_marker_for_dtype(level.dtype)
for level in data_frame.index.levels]
Expand Down Expand Up @@ -399,7 +409,10 @@ def _totals(df):

cat_dim_totals_df = totals(cat_dim_df, [fd('political_party')], _columns)
cont_cat_dim_totals_df = totals(cont_cat_dim_df, [fd('political_party')], _columns)
cont_cat_dim_all_totals_df = totals(cont_cat_dim_df, [fd('timestamp'), fd('political_party')], _columns)
cont_uni_dim_totals_df = totals(cont_uni_dim_df, [fd('state')], _columns)
cont_uni_dim_all_totals_df = totals(cont_uni_dim_df, [fd('timestamp'), fd('state')], _columns)
cont_cat_uni_dim_all_totals_df = totals(cont_cat_uni_dim_df, [fd('timestamp'), fd('political_party'), fd('state')],
_columns)

ElectionOverElection = ReferenceType('eoe', 'EoE', 'year', 4)
218 changes: 218 additions & 0 deletions fireant/tests/slicer/test_execution.py
@@ -0,0 +1,218 @@
from unittest import (
TestCase,
skip,
)

import numpy as np
import pandas as pd
import pandas.testing

from fireant.slicer.queries.execution import reduce_result_set
from fireant.slicer.totals import get_totals_marker_for_dtype
from .mocks import (
cat_dim_df,
cat_dim_totals_df,
cat_uni_dim_df,
cont_cat_dim_all_totals_df,
cont_cat_dim_df,
cont_cat_dim_totals_df,
cont_cat_uni_dim_all_totals_df,
cont_cat_uni_dim_df,
cont_dim_df,
single_metric_df,
slicer,
)

pd.set_option('display.expand_frame_repr', False)


def replace_totals(data_frame):
index_names = data_frame.index.names

raw = data_frame.reset_index()
for name in index_names:
marker = get_totals_marker_for_dtype(raw[name].dtype)
raw[name].replace(marker, np.nan, inplace=True)

return raw


class ReduceResultSetsTests(TestCase):
def test_reduce_single_result_set_no_dimensions(self):
expected = single_metric_df
raw_df = expected

dimensions = ()
result = reduce_result_set([raw_df], (), dimensions, ())
pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_dimension(self):
expected = cont_dim_df
raw_df = replace_totals(expected)

dimensions = (slicer.dimensions.timestamp,)
result = reduce_result_set([raw_df], (), dimensions, ())
pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cat_dimension(self):
expected = cat_dim_df
raw_df = replace_totals(expected)

dimensions = (slicer.dimensions.political_party,)
result = reduce_result_set([raw_df], (), dimensions, ())
pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_dimensions(self):
expected = cont_cat_dim_df
raw_df = replace_totals(expected)

dimensions = (slicer.dimensions.timestamp, slicer.dimensions.political_party)
result = reduce_result_set([raw_df], (), dimensions, ())
pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cat_uni_dimensions(self):
expected = cat_uni_dim_df.sort_index()
raw_df = replace_totals(expected)

dimensions = (slicer.dimensions.political_party, slicer.dimensions.candidate)
result = reduce_result_set([raw_df], (), dimensions, ())
pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_uni_dimensions(self):
expected = cont_cat_uni_dim_df
raw_df = replace_totals(expected)

dimensions = (slicer.dimensions.timestamp, slicer.dimensions.political_party, slicer.dimensions.state)
result = reduce_result_set([raw_df], (), dimensions, ())
pandas.testing.assert_frame_equal(expected, result)


class ReduceResultSetsWithTotalsTests(TestCase):
def test_reduce_single_result_set_with_cat_dimension(self):
expected = cat_dim_totals_df
raw_df = replace_totals(cat_dim_df)
totals_df = pd.merge(pd.DataFrame([None], columns=['$d$political_party']),
pd.DataFrame([raw_df[['$m$votes', '$m$wins']].sum(axis=0)]),
how='outer',
left_index=True,
right_index=True)

dimensions = (slicer.dimensions.political_party.rollup(),)
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_dimensions_cont_totals(self):
expected = cont_cat_dim_all_totals_df.loc[(slice(None), slice('d', 'r')), :] \
.append(cont_cat_dim_all_totals_df.iloc[-1])
raw_df = replace_totals(cont_cat_dim_df)
totals_df = pd.merge(pd.DataFrame([[None, None]], columns=['$d$timestamp', '$d$political_party']),
pd.DataFrame([raw_df[['$m$votes', '$m$wins']].sum(axis=0)]),
how='outer',
left_index=True,
right_index=True)

dimensions = (slicer.dimensions.timestamp.rollup(), slicer.dimensions.political_party)
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_dimensions_cat_totals(self):
expected = cont_cat_dim_totals_df
raw_df = replace_totals(cont_cat_dim_df)
totals_df = raw_df.groupby('$d$timestamp').sum().reset_index()
totals_df['$d$political_party'] = None
totals_df = totals_df[['$d$timestamp', '$d$political_party', '$m$votes', '$m$wins']]

dimensions = (slicer.dimensions.timestamp, slicer.dimensions.political_party.rollup())
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_uni_dimensions_cont_totals(self):
expected = cont_cat_uni_dim_all_totals_df.loc[(slice(None), slice('d', 'r'), slice('1', '2')), :] \
.append(cont_cat_uni_dim_all_totals_df.iloc[-1])
raw_df = replace_totals(cont_cat_uni_dim_df)
totals_df = pd.merge(pd.DataFrame([[None, None, None, None]],
columns=['$d$timestamp', '$d$political_party',
'$d$state', '$d$state_display']),
pd.DataFrame([raw_df[['$m$votes', '$m$wins']].sum(axis=0)]),
how='outer',
left_index=True,
right_index=True)
totals_df = totals_df[['$d$timestamp', '$d$political_party', '$d$state', '$d$state_display',
'$m$votes', '$m$wins']]

dimensions = (slicer.dimensions.timestamp.rollup(), slicer.dimensions.political_party, slicer.dimensions.state)
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_uni_dimensions_cat_totals(self):
expected = cont_cat_uni_dim_all_totals_df.loc[(slice(None), slice(None), slice('1', '2')), :] \
.append(cont_cat_uni_dim_all_totals_df.loc[(slice(None), '~~totals'), :].iloc[:-1]) \
.sort_index()
raw_df = replace_totals(cont_cat_uni_dim_df)
totals_df = raw_df.groupby('$d$timestamp').sum().reset_index()
totals_df['$d$political_party'] = None
totals_df['$d$state'] = None
totals_df['$d$state_display'] = None
totals_df = totals_df[['$d$timestamp', '$d$political_party', '$d$state', '$d$state_display',
'$m$votes', '$m$wins']]

dimensions = (slicer.dimensions.timestamp, slicer.dimensions.political_party.rollup(), slicer.dimensions.state)
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)

def test_reduce_single_result_set_with_cont_cat_uni_dimensions_uni_totals(self):
expected = cont_cat_uni_dim_all_totals_df.loc[(slice(None), slice('d', 'r')), :]
raw_df = replace_totals(cont_cat_uni_dim_df)
totals_df = raw_df.groupby(['$d$timestamp', '$d$political_party']).sum().reset_index()
totals_df['$d$state'] = None
totals_df['$d$state_display'] = None
totals_df = totals_df[['$d$timestamp', '$d$political_party', '$d$state', '$d$state_display',
'$m$votes', '$m$wins']]

dimensions = (slicer.dimensions.timestamp, slicer.dimensions.political_party, slicer.dimensions.state.rollup())
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)

@skip('BAN-2594')
def test_reduce_single_result_set_with_cont_cat_uni_dimensions_cat_totals_with_null_in_cont_dim(self):
index_names = list(cont_cat_uni_dim_all_totals_df.index.names)
nulls = pd.DataFrame([[np.nan, 'd', '1', 'Texas', 5, 0], [np.nan, 'd', '2', 'California', 2, 0],
[np.nan, 'i', '1', 'Texas', 5, 0], [np.nan, 'i', '2', 'California', 7, 0],
[np.nan, 'r', '1', 'Texas', 11, 0], [np.nan, 'r', '2', 'California', 3, 0]],
columns=index_names + list(cont_cat_uni_dim_all_totals_df.columns))
nulls_totals = pd.DataFrame([nulls[['$m$votes', '$m$wins']].sum()])
nulls_totals[index_names[0]] = np.nan
nulls_totals[index_names[1]] = '~~totals'
nulls_totals[index_names[2]] = '~~totals'

expected = cont_cat_uni_dim_all_totals_df.loc[(slice(None), slice(None), slice('1', '2')), :] \
.append(cont_cat_uni_dim_all_totals_df.loc[(slice(None), '~~totals'), :].iloc[:-1]) \
.append(nulls.set_index(index_names)) \
.append(nulls_totals.set_index(index_names)) \
.sort_index()
raw_df = replace_totals(cont_cat_uni_dim_df)
raw_df = nulls \
.append(raw_df) \
.sort_values(['$d$timestamp', '$d$political_party', '$d$state'])

totals_df = raw_df.groupby('$d$timestamp').sum().reset_index()
null_totals_df = pd.DataFrame([raw_df[raw_df['$d$timestamp'].isnull()]
[['$m$votes', '$m$wins']].sum()])
null_totals_df['$d$timestamp'] = None
totals_df = totals_df.append(null_totals_df)
totals_df['$d$political_party'] = None
totals_df['$d$state'] = None
totals_df['$d$state_display'] = None
totals_df = totals_df[['$d$timestamp', '$d$political_party', '$d$state', '$d$state_display',
'$m$votes', '$m$wins']]

dimensions = (slicer.dimensions.timestamp, slicer.dimensions.political_party.rollup(), slicer.dimensions.state)
result = reduce_result_set([raw_df, totals_df], (), dimensions, ())

pandas.testing.assert_frame_equal(expected, result)
2 changes: 1 addition & 1 deletion requirements.txt
@@ -1,6 +1,6 @@
six
pandas==0.23.4
pypika==0.19.0
pypika==0.20.1
toposort==1.5
typing==3.6.2
python-dateutil==2.7.3
Expand Down

0 comments on commit 420d72d

Please sign in to comment.