-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
1,203 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
[flake8] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
"""This module contains common helper functions for sanity checks and | ||
conversions. | ||
""" | ||
|
||
import collections | ||
from typing import Any, Tuple | ||
|
||
|
||
def ensure_tuple(values: Any) -> Tuple[Any]: | ||
"""For convenience, some parameters may accept a single value (string | ||
for a column name) or multiple values (list of strings for column | ||
names). This function ensures that the output is always a tuple of values. | ||
Parameters | ||
---------- | ||
values: Any | ||
Input values to be converted to tuples. | ||
Returns | ||
------- | ||
tupled: Tuple[Any] | ||
""" | ||
|
||
# None remains None | ||
if values is None: | ||
return None | ||
|
||
# if not iterable, return tuple with single value | ||
elif not isinstance(values, collections.Iterable): | ||
return (values, ) | ||
|
||
# handle single string which is iterable but still is only one value | ||
elif isinstance(values, str): | ||
return (values, ) | ||
|
||
# anything else should ok to be converted to tuple | ||
else: | ||
return tuple(values) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
"""This module contains computation engine independent wrangler interfaces | ||
and corresponding descriptions. | ||
""" | ||
|
||
from typing import Any, Iterable, Union | ||
|
||
from pywrangler.util import sanitizer | ||
from pywrangler.wranglers.base import BaseWrangler | ||
|
||
TYPE_COLUMNS = Union[str, Iterable[str]] | ||
|
||
|
||
class IntervalIdentifier(BaseWrangler): | ||
"""Defines the reference interface for the interval identification | ||
wrangler. | ||
An interval is defined as a range of values beginning with an opening | ||
marker and ending with a closing marker (e.g. the interval daylight may be | ||
defined as all events/values occurring between sunrise and sunset). | ||
The interval identification wrangler assigns ids to values such that values | ||
belonging to the same interval share the same interval id. For example, all | ||
values of the first daylight interval are assigned with id 1. All values of | ||
the second daylight interval will be assigned with id 2 and so on. | ||
Values which do not belong to any valid interval are assigned the value 0 | ||
by definition. | ||
Only the shortest valid interval is identified. Given multiple opening | ||
markers in sequence without an intermittent closing marker, only the last | ||
opening marker is relevant and the rest is ignored. Given multiple | ||
closing markers in sequence without an intermittent opening marker, only | ||
the first closing marker is relevant and the rest is ignored. | ||
Opening and closing markers are included in their corresponding interval. | ||
Parameters | ||
---------- | ||
marker_column: str | ||
Name of column which contains the opening and closing markers. | ||
marker_start: Any | ||
A value defining the start of an interval. | ||
marker_end: Any | ||
A value defining the end of an interval. | ||
order_columns: str, Iterable[str], optional | ||
Column names which define the order of the data (e.g. a timestamp | ||
column). Sort order can be defined with the parameter `ascending`. | ||
groupby_columns: str, Iterable[str], optional | ||
Column names which define how the data should be grouped/split into | ||
separate entities. For distributed computation engines, groupby columns | ||
should ideally reference partition keys to avoid data shuffling. | ||
ascending: bool, Iterable[bool], optional | ||
Sort ascending vs. descending. Specify list for multiple sort orders. | ||
If a list is specified, length of the list must equal length of | ||
`order_columns`. Default is True. | ||
target_column_name: str, optional | ||
Name of the resulting target column. | ||
""" | ||
|
||
def __init__(self, | ||
marker_column: str, | ||
marker_start: Any, | ||
marker_end: Any, | ||
order_columns: TYPE_COLUMNS = None, | ||
groupby_columns: TYPE_COLUMNS = None, | ||
ascending: Union[bool, Iterable[bool]] = None, | ||
target_column_name: str = "iids"): | ||
|
||
self.marker_column = marker_column | ||
self.marker_start = marker_start | ||
self.marker_end = marker_end | ||
self.order_columns = sanitizer.ensure_tuple(order_columns) | ||
self.groupby_columns = sanitizer.ensure_tuple(groupby_columns) | ||
self.ascending = sanitizer.ensure_tuple(ascending) | ||
self.target_column_name = target_column_name | ||
|
||
# sanity checks for sort order | ||
if self.ascending: | ||
|
||
# check for equal number of items of order and sort columns | ||
if len(self.order_columns) != len(self.ascending): | ||
raise ValueError('`order_columns` and `ascending` must have ' | ||
'equal number of items.') | ||
|
||
# check for correct sorting keywords | ||
if not all([isinstance(x, bool) for x in self.ascending]): | ||
raise ValueError('Only `True` and `False` are ' | ||
'as arguments for `ascending`') | ||
|
||
# set default sort order if None is given | ||
elif self.order_columns: | ||
self.ascending = [True] * len(self.order_columns) | ||
|
||
@property | ||
def preserves_sample_size(self) -> bool: | ||
return True |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
"""This module contains the pandas base wrangler. | ||
""" | ||
|
||
from typing import Tuple | ||
|
||
import pandas as pd | ||
|
||
from pywrangler.wranglers.base import BaseWrangler | ||
|
||
|
||
class PandasWrangler(BaseWrangler): | ||
"""Contains methods common to all pandas based wranglers. | ||
""" | ||
|
||
@property | ||
def computation_engine(self): | ||
return "pandas" | ||
|
||
def validate_output_shape(self, df_in: pd.DataFrame, df_out: pd.DataFrame): | ||
"""If wrangler implementation preserves sample size, assert equal | ||
sample sizes between input and output dataframe. | ||
Using pandas, all data is in memory. Hence, getting shape information | ||
is cheap and this check can be done regularly (in contrast to pyspark | ||
where `df.count()` can be very expensive). | ||
Parameters | ||
---------- | ||
df_in: pd.DataFrame | ||
Input dataframe. | ||
df_out: pd.DataFrame | ||
Output dataframe. | ||
""" | ||
|
||
if self.preserves_sample_size: | ||
shape_in = df_in.shape[0] | ||
shape_out = df_out.shape[0] | ||
|
||
if shape_in != shape_out: | ||
raise ValueError('Number of input samples ({}) does not match ' | ||
'number of ouput samples ({}) which should ' | ||
'be the case because wrangler is supposed to ' | ||
'preserve the number of samples.' | ||
.format(shape_in, shape_out)) | ||
|
||
@staticmethod | ||
def validate_empty_df(df: pd.DataFrame): | ||
"""Check for empty dataframe. By definition, wranglers operate on non | ||
empty dataframe. Therefore, raise error if dataframe is empty. | ||
Parameters | ||
---------- | ||
df: pd.DataFrame | ||
Dataframe to check against. | ||
""" | ||
|
||
if df.empty: | ||
raise ValueError('Dataframe is empty.') | ||
|
||
@staticmethod | ||
def validate_columns(df: pd.DataFrame, columns: Tuple[str]): | ||
"""Check that columns exist in dataframe and raise error if otherwise. | ||
Parameters | ||
---------- | ||
df: pd.DataFrame | ||
Dataframe to check against. | ||
columns: Tuple[str] | ||
Columns to be validated. | ||
""" | ||
|
||
for column in columns: | ||
if column not in df.columns: | ||
raise ValueError('Column with name `{}` does not exist. ' | ||
'Please check parameter settings.') |
Oops, something went wrong.