Skip to content

Commit

Permalink
Merge pull request #9 from mansenfranzen/refactoring_structure
Browse files Browse the repository at this point in the history
Refactoring structure
  • Loading branch information
mansenfranzen committed Jul 7, 2019
2 parents fa6ebb7 + 326a501 commit 6b91924
Show file tree
Hide file tree
Showing 54 changed files with 664 additions and 556 deletions.
16 changes: 8 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
language: python

python:
- '3.5'
- '3.6'
# - '3.5'
# - '3.6'
- '3.7'

env:
- ENV_STRING=master
- ENV_STRING=pandas0.24.1
# - ENV_STRING=pandas0.24.1
# - ENV_STRING=pandas0.24.0
#
# - ENV_STRING=pandas0.23.4
Expand All @@ -33,9 +33,9 @@ env:
# - ENV_STRING=pandas0.19.0
#
# - ENV_STRING=pyspark2.4.0
- ENV_STRING=pyspark2.3.1
# - ENV_STRING=pyspark2.3.1

- ENV_STRING=dask1.1.5
# - ENV_STRING=dask1.1.5


# Remove python/pandas version interactions which do not have wheels on pypi
Expand Down Expand Up @@ -67,17 +67,17 @@ matrix:
dist: xenial

before_install:
- source tests/travis_java_install.sh
- source travisci/java_install.sh

install:
- travis_retry pip install --upgrade pip
- travis_retry pip install --upgrade setuptools
- travis_retry pip install coveralls flake8 tox

script:
- tox -e $(echo py$TRAVIS_PYTHON_VERSION-$ENV_STRING | tr -d .)
- source travisci/tox_invocation.sh

after_success:
- source tests/travis_coveralls_master.sh
- source travisci/coveralls_master.sh

cache: pip
5 changes: 5 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ testing =
pytest-cov
tox
memory_profiler
pyarrow

dev =
sphinx
Expand All @@ -53,6 +54,10 @@ norecursedirs =
dist
build
.tox
markers =
pandas: marks all pandas tests
pyspark: marks all pyspark tests
dask: marks all dask tests

testpaths = tests

Expand Down
11 changes: 9 additions & 2 deletions src/pywrangler/wranglers/base.py → src/pywrangler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
"""

from abc import ABC, abstractmethod

from pywrangler.util import _pprint
from pywrangler.util.helper import get_param_names


class BaseWrangler:
class BaseWrangler(ABC):
"""Defines the basic interface common to all data wranglers.
In analogy to sklearn transformers (see link below), all wranglers have to
Expand All @@ -24,7 +26,7 @@ class BaseWrangler:
should contain any logic behind parameter parsing and conversion.
In contrast to sklearn, wranglers do only accept dataframes like objects
(like pandas, spark or dask dataframes) as inputs to `fit` and `transform`.
(like pandas/pyspark/dask dataframes) as inputs to `fit` and `transform`.
The relevant columns and their respective meaning is provided via the
`__init__` method. In addition, wranglers may accept multiple input
dataframes with different shapes. Also, the number of samples may also
Expand All @@ -42,10 +44,12 @@ class BaseWrangler:
"""

@property
@abstractmethod
def preserves_sample_size(self) -> bool:
raise NotImplementedError

@property
@abstractmethod
def computation_engine(self) -> str:
raise NotImplementedError

Expand Down Expand Up @@ -91,12 +95,15 @@ def set_params(self, **params):

return self

@abstractmethod
def fit(self, *args, **kwargs):
raise NotImplementedError

@abstractmethod
def transform(self, *args, **kwargs):
raise NotImplementedError

@abstractmethod
def fit_transform(self, *args, **kwargs):
raise NotImplementedError

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dask.dataframe import DataFrame

from pywrangler.wranglers.base import BaseWrangler
from pywrangler.base import BaseWrangler


class DaskWrangler(BaseWrangler):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dask.diagnostics import ResourceProfiler

from pywrangler.benchmark import MemoryProfiler, TimeProfiler
from pywrangler.wranglers.dask.base import DaskWrangler
from pywrangler.dask.base import DaskWrangler


class DaskBaseProfiler:
Expand Down
File renamed without changes.
82 changes: 82 additions & 0 deletions src/pywrangler/pandas/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""This module contains the pandas base wrangler.
"""

import pandas as pd

from pywrangler.base import BaseWrangler


class PandasWrangler(BaseWrangler):
"""Pandas wrangler base class.
"""

@property
def computation_engine(self):
return "pandas"

def _validate_output_shape(self, df_in: pd.DataFrame,
df_out: pd.DataFrame):
"""If wrangler implementation preserves sample size, assert equal
sample sizes between input and output dataframe.
Using pandas, all data is in memory. Hence, getting shape information
is cheap and this check can be done regularly (in contrast to pyspark
where `df.count()` can be very expensive).
Parameters
----------
df_in: pd.DataFrame
Input dataframe.
df_out: pd.DataFrame
Output dataframe.
"""

if self.preserves_sample_size:
shape_in = df_in.shape[0]
shape_out = df_out.shape[0]

if shape_in != shape_out:
raise ValueError('Number of input samples ({}) does not match '
'number of ouput samples ({}) which should '
'be the case because wrangler is supposed to '
'preserve the number of samples.'
.format(shape_in, shape_out))


class PandasSingleNoFit(PandasWrangler):
"""Mixin class defining `fit` and `fit_transform` for all wranglers with
a single data frame input and output with no fitting necessary.
"""

def fit(self, df: pd.DataFrame):
"""Do nothing and return the wrangler unchanged.
This method is just there to implement the usual API and hence work in
pipelines.
Parameters
----------
df: pd.DataFrame
"""

return self

def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply fit and transform in sequence at once.
Parameters
----------
df: pd.DataFrame
Returns
-------
result: pd.DataFrame
"""

return self.fit(df).transform(df)
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import pandas as pd

from pywrangler.benchmark import MemoryProfiler, TimeProfiler
from pywrangler.pandas.base import PandasWrangler
from pywrangler.util import sanitizer
from pywrangler.wranglers.pandas.base import PandasWrangler


class PandasTimeProfiler(TimeProfiler):
Expand Down Expand Up @@ -133,7 +133,7 @@ def profile(self, *dfs: pd.DataFrame, **kwargs):

# usage output
dfs_output = self._wrangler.fit_transform(*dfs)
dfs_output = sanitizer.ensure_tuple(dfs_output)
dfs_output = sanitizer.ensure_iterable(dfs_output)
self._usage_output = self._memory_usage_dfs(*dfs_output)

# usage during fit_transform
Expand Down
75 changes: 75 additions & 0 deletions src/pywrangler/pandas/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""This module contains utility functions (e.g. validation) commonly used by
pandas wranglers.
"""

import numpy as np
import pandas as pd

from pywrangler.util.sanitizer import ensure_iterable
from pywrangler.util.types import TYPE_ASCENDING, TYPE_COLUMNS


def validate_empty_df(df: pd.DataFrame):
"""Check for empty dataframe. By definition, wranglers operate on non
empty dataframe. Therefore, raise error if dataframe is empty.
Parameters
----------
df: pd.DataFrame
Dataframe to check against.
"""

if df.empty:
raise ValueError('Dataframe is empty.')


def validate_columns(df: pd.DataFrame, columns: TYPE_COLUMNS):
"""Check that columns exist in dataframe and raise error if otherwise.
Parameters
----------
df: pd.DataFrame
Dataframe to check against.
columns: Tuple[str]
Columns to be validated.
"""

if not columns:
return

columns = ensure_iterable(columns)

for column in columns:
if column not in df.columns:
raise ValueError('Column with name `{}` does not exist. '
'Please check parameter settings.'
.format(column))


def sort_values(df: pd.DataFrame,
order_columns: TYPE_COLUMNS,
ascending: TYPE_ASCENDING) -> pd.DataFrame:
"""Convenient function to return sorted dataframe while taking care of
optional order columns and order (ascending/descending).
"""

if order_columns:
return df.sort_values(order_columns, ascending=ascending)
else:
return df


def groupby(df: pd.DataFrame, groupby_columns: TYPE_COLUMNS):
"""Convenient function to group by a dataframe while taking care of
optional groupby columns. Always returns a `DataFrameGroupBy` object.
"""

if groupby_columns:
return df.groupby(groupby_columns)
else:
return df.groupby(np.zeros(df.shape[0]))
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import pandas as pd

from pywrangler.wranglers.interfaces import IntervalIdentifier
from pywrangler.wranglers.pandas.base import PandasSingleNoFit
from pywrangler.pandas import util
from pywrangler.pandas.base import PandasSingleNoFit
from pywrangler.wranglers import IntervalIdentifier


class _BaseIntervalIdentifier(PandasSingleNoFit, IntervalIdentifier):
Expand All @@ -33,10 +34,10 @@ def validate_input(self, df: pd.DataFrame):
"""

self.validate_columns(df, self.marker_column)
self.validate_columns(df, self.order_columns)
self.validate_columns(df, self.groupby_columns)
self.validate_empty_df(df)
util.validate_columns(df, self.marker_column)
util.validate_columns(df, self.order_columns)
util.validate_columns(df, self.groupby_columns)
util.validate_empty_df(df)

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Extract interval ids from given dataframe.
Expand All @@ -56,8 +57,8 @@ def transform(self, df: pd.DataFrame) -> pd.DataFrame:
self.validate_input(df)

# transform
df_ordered = self.sort_values(df, self.order_columns, self.ascending)
df_grouped = self.groupby(df_ordered, self.groupby_columns)
df_ordered = util.sort_values(df, self.order_columns, self.ascending)
df_grouped = util.groupby(df_ordered, self.groupby_columns)

df_result = df_grouped[self.marker_column]\
.transform(self._transform)\
Expand All @@ -66,7 +67,7 @@ def transform(self, df: pd.DataFrame) -> pd.DataFrame:
.to_frame(self.target_column_name)

# check output
self.validate_output_shape(df, df_result)
self._validate_output_shape(df, df_result)

return df_result

Expand Down
File renamed without changes.
Loading

0 comments on commit 6b91924

Please sign in to comment.