<img src='images/pandas_pipelines/panda-paint.jpeg' width='200px' align='right' style="padding: 15px">

# Modern Pipelines in pandas

This notebook considers `pandas` in practice and how we can adopt great practice when working with data. 

Let's pretend that we've read in a timeseries and that this is the new data.

In [None]:
import pandas as pd
import numpy as np

In [None]:
def make_ts_df():
    dates = [str(_) for _ in pd.date_range("2018-01-01", "2019-01-01")]
    values = [np.nan if np.random.random() < 0.05 else _ for _ in np.random.normal(0, 1, 366)]
    return pd.DataFrame({"date": dates, "value": values})

date_df = make_ts_df()
date_df

Before we start analysing the data, let's imagine we want to do the following:

- Get rid of the redundant hours.
- Clean the `nan` values.
- Remove outliers. 

One way of doing it could be like so.

In [None]:
(
    date_df
    .assign(date=lambda df: pd.to_datetime(df['date']).dt.normalize())
    .dropna()
    .loc[lambda df: df['value'] > -2.0]
    .loc[lambda df: df['value'] < 2.0]
)

This is the way we've been doing it so far, but we can do better.

If you were to just look at the code above it could be a bit hard to understand what is going on.

Also, if we were to get a new date dataframe, we'd have to start all over again. 

Whilst this is not a big issue when we are only doing 3 processing steps, as the amount of processing increases it could become time consuming.

## Pipeline abstraction

In [None]:
def parse_dates(dataf):
    """Removes the hours from dates"""
    return (dataf
            .assign(date=lambda d: pd.to_datetime(d['date']).dt.normalize()))

def remove_nan_rows(dataf):
    """Removes rows with missing values"""
    return (dataf.dropna())

def fill_nan(dataf):
    """Replaces NaN values with 0"""
    return (dataf.fillna(0))

def remove_outliers(dataf):
    """Removes values less than -2 and greater than 2"""
    return (dataf
            .loc[lambda d: d['value'] > -2.0]
            .loc[lambda d: d['value'] < 2.0])

prep_df = (date_df
           .pipe(parse_dates)
           .pipe(remove_nan_rows)
           .pipe(remove_outliers))
prep_df

In [None]:
prep_df = (
    date_df
   .pipe(parse_dates)
   .pipe(remove_nan_rows)
   .pipe(remove_outliers)
)
prep_df

The `.pipe()` method allows us to pass a function that accepts a dataframe as it's first argument. This is a very nice flow. 

- We can easily use this pipeline (or parts of this pipeline) for different datasets.

<img src='images/lego.png' width='400px'  style="padding: 15px">

- If there is ever a bug this pipeline will make it easier for us to figure out where it is. Since every step is merely a function, we'll know eactly where the process is breaking. 

- We can give the function a descriptive name and on a pipeline level this allows us to see "what" is happening "when". 

In [None]:
# e.g. You may not have seen how the parse_dates function works yet
help(parse_dates)

### Caveats 

We should be careful when we are writing `.pipe`-lines. The function going into a `.pipe()` might not be ***stateless***. Here's an example:

In [None]:
date_df = make_ts_df() 

In [None]:
def rename_columns(dataf):
    dataf.columns = ["a", "b"]
    return dataf 

In [None]:
date_df.pipe(rename_columns).columns, date_df.columns

In such a situation it is best to include a `.copy()` command - or better - use a stateless method like `.rename()`. 

In [None]:
# let's recreate the random data
date_df = make_ts_df()

def rename_columns(dataf):
    return dataf.rename(columns = {'date':'a','value':'b'}) 

In [None]:
date_df.pipe(rename_columns).columns, date_df.columns

Be careful with this. We want our functions to be stateless, otherwise we might accidentally change the original data.

## Pipeline abstraction on higher Levels

To fully appreciate what the pandas pipelines can do, let us rewrite one function.

In [None]:
def remove_outliers(dataf, min_value=None, max_value=None):
    """Removes outliers less than min_value and greater than max_value"""
    
    if not (min_value and max_value):
        raise ValueError('Hey silly, you need to state a min and max!')
    
    return (dataf
            .loc[lambda d: d['value'] > min_value]
            .loc[lambda d: d['value'] < max_value])

(
    date_df
    .pipe(parse_dates)
    .pipe(remove_nan_rows)
    .pipe(remove_outliers, min_value=-2, max_value=2)
)

The `.pipe()` can accept keyword arguments. This allows you to change, say, threshold values on a high level. No need to change the original function, you can change things from a higher level. This is great because it will encourage you to write functions that are general. 

## <mark>Exercise</mark>

Rewrite the following as a pandas pipeline:

In [None]:
sanfran = pd.read_csv('data/san_fran_crime_sample.csv')
sanfran.head()

In [None]:
(
    sanfran
    .rename(columns=str.lower)
    .rename(columns={'dates': 'date'})
    .assign(date = lambda df: pd.to_datetime(df['date']).dt.normalize())
    .set_index('date')
    .sort_index()
    .loc['2004':'2014']
    .resample('ME')[['category']].count()
    .assign(category_rolling = lambda df: df['category'].rolling(10).mean())
    .plot(figsize=(9,5), title='Crime Count in San Fransisco')
)

In [None]:
# %load answers/pandas_pipelines/pipeline.py

### <mark>Bonus Exercise: Add a decorator<mark>

Familiar with decorators? Add a decorator to log:
    
- the shape of the dataframe before and after (see decorator `log_shape`)
- the time it takes to run the function (create a decorator called `log_time`)
    
We can add a little more power here and add some logging functionality with **a decorator**:

**Example**: See below for example with decorators on the dataframe `date_df`

In [None]:
from functools import wraps


def log_shape(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        shape_before = args[0].shape
        shape_after = result.shape
        print(f"{func.__name__} => before shape:{shape_before} after shape:{shape_after}")
        return result
    return wrapper

@log_shape
def parse_dates(dataf):
    return (dataf
            .assign(date=lambda d: pd.to_datetime(d.date)))

@log_shape
def remove_nan_rows(dataf):
    return (dataf.dropna())

@log_shape
def remove_outliers(dataf, min_val=-2.0, max_val=2.0):
    return (dataf
            .loc[lambda d: d['value'] > min_val]
            .loc[lambda d: d['value'] < max_val])

prep_df = (date_df
           .pipe(parse_dates)
           .pipe(remove_nan_rows)
           .pipe(remove_outliers, min_val=-1))

In [None]:
# %load answers/pandas_pipelines/pipeline-decorator.py

Note the benefit of having a standard decorator that can log pandas steps: 

1. When writing code, this might help you in discovering what is happening. If you see rows dissapear while they shouldn't this log might give you a proxy. 
2. When this pandas code goes to production you will have some logging for free in airflow. If something goes wrong there you may also be able to debug more easily.

# Conclusion 

> **"Pipelines are the only correct way to write pandas."**

This is a bold statement, but some of people very strongly about this. 

Even if you take this statement with a grain of salt, it is important to write your code in such a way that your notebooks remains clear - if it takes a lot of effort to understand the code of your colleagues, then your team will be slower than you want it to be. 

A notebook is a great scratchpad, but that is no excuse to write unclear code!