-
Notifications
You must be signed in to change notification settings - Fork 20
/
special_cases.py
77 lines (55 loc) · 2.9 KB
/
special_cases.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from datetime import timedelta
import pandas as pd
from fireant.slicer.dimensions import DatetimeDimension
from fireant.slicer.filters import RangeFilter
from fireant.slicer.operations import RollingOperation
def adjust_daterange_filter_for_rolling_window(dimensions, operations, filters):
has_datetime_dimension_in_first_dimension_pos = not len(dimensions) \
or not isinstance(dimensions[0], DatetimeDimension)
if has_datetime_dimension_in_first_dimension_pos:
return filters
has_rolling = any([isinstance(operation, RollingOperation)
for operation in operations])
if not has_rolling:
return filters
dim0 = dimensions[0]
filters_on_dim0 = [filter_
for filter_ in filters
if isinstance(filter_, RangeFilter)
and str(filter_.definition.term) == str(dim0.definition)]
if not 0 < len(filters_on_dim0):
return filters
max_rolling_period = max(operation.window
for operation in operations
if isinstance(operation, RollingOperation))
for filter_ in filters_on_dim0:
# Monkey patch the update start date on the date filter
args = {dim0.interval + 's': max_rolling_period} \
if 'quarter' != dim0.interval \
else {'months': max_rolling_period * 3}
filter_.definition.start.value -= timedelta(**args)
return filters
def adjust_dataframe_for_rolling_window(operations, data_frame):
has_rolling = any([isinstance(operation, RollingOperation)
for operation in operations])
if not has_rolling:
return data_frame
max_rolling_period = max(operation.window
for operation in operations
if isinstance(operation, RollingOperation))
if isinstance(data_frame.index, pd.DatetimeIndex):
return data_frame.iloc[max_rolling_period - 1:]
if isinstance(data_frame.index, pd.MultiIndex) \
and isinstance(data_frame.index.levels[0], pd.DatetimeIndex):
num_levels = len(data_frame.index.levels)
return data_frame.groupby(level=list(range(1, num_levels))) \
.apply(lambda df: df.iloc[max_rolling_period - 1:]) \
.reset_index(level=list(range(num_levels - 1)), drop=True)
return data_frame
def apply_to_query_args(database, table, joins, dimensions, metrics, filters, reference_groups, totals_dimensions,
operations):
filters = adjust_daterange_filter_for_rolling_window(dimensions, operations, filters)
return database, table, joins, dimensions, metrics, filters, reference_groups, totals_dimensions
def apply_operations_to_data_frame(operations, data_frame):
data_frame = adjust_dataframe_for_rolling_window(operations, data_frame)
return data_frame