Skip to content

Commit

Permalink
Merge 057a750 into 1582491
Browse files Browse the repository at this point in the history
  • Loading branch information
twheys committed Jul 10, 2018
2 parents 1582491 + 057a750 commit 05ccb9f
Show file tree
Hide file tree
Showing 14 changed files with 2,626 additions and 2,391 deletions.
47 changes: 41 additions & 6 deletions fireant/slicer/operations.py
Expand Up @@ -19,6 +19,10 @@ class Operation(object):
def apply(self, data_frame):
raise NotImplementedError()

@property
def metrics(self):
raise NotImplementedError()

@property
def operations(self):
return []
Expand All @@ -32,6 +36,17 @@ def __init__(self, key, label, prefix=None, suffix=None, precision=None):
self.suffix = suffix
self.precision = precision

def apply(self, data_frame):
raise NotImplementedError()

@property
def metrics(self):
raise NotImplementedError()

@property
def operations(self):
raise NotImplementedError()

def _group_levels(self, index):
"""
Get the index levels that need to be grouped. This is to avoid apply the cumulative function across separate
Expand All @@ -57,6 +72,9 @@ def __init__(self, arg):

self.arg = arg

def apply(self, data_frame):
raise NotImplementedError()

@property
def metrics(self):
return [metric
Expand All @@ -70,9 +88,6 @@ def operations(self):
if isinstance(operation, Operation)
for op_and_children in [operation] + operation.operations]

def apply(self, data_frame):
raise NotImplementedError()

def __repr__(self):
return self.key

Expand Down Expand Up @@ -123,9 +138,9 @@ def apply(self, data_frame):
return self.cummean(data_frame[df_key])


class _Rolling(_BaseOperation):
class RollingOperation(_BaseOperation):
def __init__(self, arg, window, min_periods=None):
super(_Rolling, self).__init__(
super(RollingOperation, self).__init__(
key='{}({})'.format(self.__class__.__name__.lower(),
getattr(arg, 'key', arg)),
label='{}({})'.format(self.__class__.__name__,
Expand All @@ -139,11 +154,31 @@ def __init__(self, arg, window, min_periods=None):
self.window = window
self.min_periods = min_periods

def _should_adjust(self, other_operations):
# Need to figure out if this rolling operation is has the largest window, and if it's the first of multiple
# rolling operations if there are more than one operation sharing the largest window.
first_max_rolling = list(sorted(other_operations, key=lambda operation: operation.window))[0]

return first_max_rolling is self

def apply(self, data_frame):
raise NotImplementedError()

@property
def metrics(self):
return [metric
for metric in [self.arg]
if isinstance(metric, Metric)]

@property
def operations(self):
return [op_and_children
for operation in [self.arg]
if isinstance(operation, Operation)
for op_and_children in [operation] + operation.operations]


class RollingMean(_Rolling):
class RollingMean(RollingOperation):
def rolling_mean(self, x):
return x.rolling(self.window, self.min_periods).mean()

Expand Down
23 changes: 15 additions & 8 deletions fireant/slicer/queries/builder.py
Expand Up @@ -13,6 +13,7 @@
format_metric_key,
immutable,
)
from . import special_cases
from .database import fetch_data
from .finders import (
find_and_group_references_for_dimensions,
Expand Down Expand Up @@ -154,14 +155,18 @@ def query(self):
reference_groups = find_and_group_references_for_dimensions(self._references)
totals_dimensions = find_dimensions_with_totals(self._dimensions)

query = make_slicer_query_with_references_and_totals(self.slicer.database,
self.table,
self.slicer.joins,
self._dimensions,
find_metrics_for_widgets(self._widgets),
self._filters,
reference_groups,
totals_dimensions)
operations = find_operations_for_widgets(self._widgets)
args = special_cases.apply_to_query_args(self.slicer.database,
self.table,
self.slicer.joins,
self._dimensions,
find_metrics_for_widgets(self._widgets),
self._filters,
reference_groups,
totals_dimensions,
operations)

query = make_slicer_query_with_references_and_totals(*args)

# Add ordering
orders = (self._orders or make_orders_for_dimensions(self._dimensions))
Expand Down Expand Up @@ -193,6 +198,8 @@ def fetch(self, hint=None) -> Iterable[Dict]:
df_key = format_metric_key(operation.key)
data_frame[df_key] = operation.apply(data_frame)

data_frame = special_cases.apply_operations_to_data_frame(operations, data_frame)

# Apply transformations
return [widget.transform(data_frame, self.slicer, self._dimensions, self._references)
for widget in self._widgets]
Expand Down
77 changes: 77 additions & 0 deletions fireant/slicer/queries/special_cases.py
@@ -0,0 +1,77 @@
from datetime import timedelta

import pandas as pd

from fireant.slicer.dimensions import DatetimeDimension
from fireant.slicer.filters import RangeFilter
from fireant.slicer.operations import RollingOperation


def adjust_daterange_filter_for_rolling_window(dimensions, operations, filters):
has_datetime_dimension_in_first_dimension_pos = not len(dimensions) \
or not isinstance(dimensions[0], DatetimeDimension)
if has_datetime_dimension_in_first_dimension_pos:
return filters

has_rolling = any([isinstance(operation, RollingOperation)
for operation in operations])
if not has_rolling:
return filters

dim0 = dimensions[0]
filters_on_dim0 = [filter_
for filter_ in filters
if isinstance(filter_, RangeFilter)
and str(filter_.definition.term) == str(dim0.definition)]
if not 0 < len(filters_on_dim0):
return filters

max_rolling_period = max(operation.window
for operation in operations
if isinstance(operation, RollingOperation))

for filter_ in filters_on_dim0:
# Monkey patch the update start date on the date filter
args = {dim0.interval + 's': max_rolling_period} \
if 'quarter' != dim0.interval \
else {'months': max_rolling_period * 3}
filter_.definition.start.value -= timedelta(**args)

return filters


def adjust_dataframe_for_rolling_window(operations, data_frame):
has_rolling = any([isinstance(operation, RollingOperation)
for operation in operations])
if not has_rolling:
return data_frame

max_rolling_period = max(operation.window
for operation in operations
if isinstance(operation, RollingOperation))

if isinstance(data_frame.index, pd.DatetimeIndex):
return data_frame.iloc[max_rolling_period - 1:]

if isinstance(data_frame.index, pd.MultiIndex) \
and isinstance(data_frame.index.levels[0], pd.DatetimeIndex):
num_levels = len(data_frame.index.levels)

return data_frame.groupby(level=list(range(1, num_levels))) \
.apply(lambda df: df.iloc[max_rolling_period - 1:]) \
.reset_index(level=list(range(num_levels - 1)), drop=True)

return data_frame


def apply_to_query_args(database, table, joins, dimensions, metrics, filters, reference_groups, totals_dimensions,
operations):
filters = adjust_daterange_filter_for_rolling_window(dimensions, operations, filters)

return database, table, joins, dimensions, metrics, filters, reference_groups, totals_dimensions


def apply_operations_to_data_frame(operations, data_frame):
data_frame = adjust_dataframe_for_rolling_window(operations, data_frame)

return data_frame

0 comments on commit 05ccb9f

Please sign in to comment.