In [1]:
import pandas as pd
import numpy as np

# Modern Pipelines

In the notebooks so far, we have focussed on what pandas has to offer. However, we have not discussed how to use pandas in practice. In this notebook we will highlight a problem with certain workflows and demonstrate a workflow that could be adopted instead. 

Let's pretend that we've read in a timeseries and that this is the new data.

In [2]:
def make_ts_df():
    dates = [str(_) for _ in pd.date_range("2018-01-01", "2019-01-01")]
    values = [np.nan if np.random.random() < 0.05 else _ for _ in np.random.normal(0, 1, 366)]
    return pd.DataFrame({"date": dates, "value": values})

date_df = make_ts_df()
date_df

Unnamed: 0,date,value
0,2018-01-01 00:00:00,0.154148
1,2018-01-02 00:00:00,-0.091428
2,2018-01-03 00:00:00,-0.998811
3,2018-01-04 00:00:00,-0.336875
4,2018-01-05 00:00:00,-0.592172
...,...,...
361,2018-12-28 00:00:00,0.316704
362,2018-12-29 00:00:00,0.519023
363,2018-12-30 00:00:00,-0.954728
364,2018-12-31 00:00:00,1.495983


In [8]:
date_df.sample(10)

Unnamed: 0,date,value
240,2018-08-29 00:00:00,-0.527705
285,2018-10-13 00:00:00,-2.117248
320,2018-11-17 00:00:00,-2.007343
276,2018-10-04 00:00:00,-1.198087
297,2018-10-25 00:00:00,1.765589
348,2018-12-15 00:00:00,3.411831
81,2018-03-23 00:00:00,0.265947
203,2018-07-23 00:00:00,1.575602
169,2018-06-19 00:00:00,0.771236
248,2018-09-06 00:00:00,-1.274859


In [7]:
date_df.dtypes

date      object
value    float64
dtype: object

Before we start analysing the data, let's imagine we want to do the following:

- Get rid of the redundant hours.
- Clean the `nan` values.
- Remove outliers. 

One way of doing it could be like so.

In [None]:
def parse_dates(dataf):
    return dataf.assign(date=lambda d: pd.to_datetime(d['date']))

In [9]:
(
    date_df
    .assign(date=lambda d: pd.to_datetime(d['date']))
    .dropna()
    .loc[lambda d: d.value > -2.0]
    .loc[lambda d: d.value < 2.0]
)

Unnamed: 0,date,value
0,2018-01-01,0.154148
1,2018-01-02,-0.091428
2,2018-01-03,-0.998811
3,2018-01-04,-0.336875
4,2018-01-05,-0.592172
...,...,...
361,2018-12-28,0.316704
362,2018-12-29,0.519023
363,2018-12-30,-0.954728
364,2018-12-31,1.495983


This is the way we've been doing it so far, but we can do better.

If you were to just look at the code above it could be a bit hard to understand what is going on.

Also, if we were to get a new date dataframe, we'd have to start all over again. 

Whilst this is not a big issue when we are only doing 3 processing steps, as the amount of processing increases it could become time consuming.

## Pipeline abstraction

In [23]:
def add_two_numbers(a, b):
    return a + b

add_two_numbers(4, 5)

9

In [22]:
def new_function(dataf):
    return dataf.dropna()

def a_new_function(df):
    return df.assign(new_col = 2)

new_function(date_df)

Unnamed: 0,date,value,new_col
0,2018-01-01 00:00:00,0.154148,2
1,2018-01-02 00:00:00,-0.091428,2
2,2018-01-03 00:00:00,-0.998811,2
3,2018-01-04 00:00:00,-0.336875,2
4,2018-01-05 00:00:00,-0.592172,2
...,...,...,...
361,2018-12-28 00:00:00,0.316704,2
362,2018-12-29 00:00:00,0.519023,2
363,2018-12-30 00:00:00,-0.954728,2
364,2018-12-31 00:00:00,1.495983,2


In [43]:
def parse_types(dataf):
    """Remove the hours from dates"""
    return (dataf
            .assign(date=lambda d: pd.to_datetime(d['date'])))

def clean_nan(dataf):
    """Get rid of rows with missing values"""
    return (dataf.dropna())

def fill_nan(dataf):
    """Fill nan values with 0"""
    return (dataf.fillna(0))

def remove_outliers(dataf, min_val, max_val):
    """Remove values greater than 2 and less than -2"""
    return (dataf
            .loc[lambda d: d['value'] > min_val]
            .loc[lambda d: d['value'] < max_val])

prep_df = (date_df
           .pipe(parse_types)
           .pipe(clean_nan)
           .pipe(remove_outliers, min_val=0, max_val=1)
          )
prep_df

Unnamed: 0,date,value
2,2018-01-03,0.333304
4,2018-01-05,0.067505
8,2018-01-09,0.113506
9,2018-01-10,0.128021
11,2018-01-12,0.544540
...,...,...
351,2018-12-18,0.172514
357,2018-12-24,0.093678
361,2018-12-28,0.432925
363,2018-12-30,0.994623


In [15]:
remove_outliers?

The `.pipe()` method allows us to pass a function that accepts a dataframe as it's first argument. This is a very nice flow. 

- We can easily use this pipeline (or parts of this pipeline) for different datasets.

<img src="../images/lego.jpg" width="400" height="400" align="center"/>

- If there is ever a bug this pipeline will make it easier for us to figure out where it is. Since every step is merely a function, we'll know eactly where the process is breaking. 

- We can give the function a descriptive name and on a pipeline level this allows us to see "what" is happening "when". 


In [None]:
# e.g. You may not have seen how the parse_types function works yet
? parse_types

### Caveats 

We should be careful when we are writing `.pipe`-lines. The function going into a `.pipe()` might not be stateless. Here's an example:

In [30]:
def rename_columns(dataf):
    dataf.columns = ["a", "b"]
    return dataf 

date_df = make_ts_df()
date_df.pipe(rename_columns).columns, date_df.columns

(Index(['a', 'b'], dtype='object'), Index(['a', 'b'], dtype='object'))

In [31]:
date_df.head()

Unnamed: 0,a,b
0,2018-01-01 00:00:00,1.998746
1,2018-01-02 00:00:00,-1.223321
2,2018-01-03 00:00:00,1.551503
3,2018-01-04 00:00:00,-0.663083
4,2018-01-05 00:00:00,1.380297


In such a situation it is best to include a `.copy()` command. 

In [32]:
def rename_columns(dataf):
    dataf = dataf.copy()
    dataf.columns = ["a", "b"]
    return dataf 

date_df = make_ts_df()
date_df.pipe(rename_columns).columns, date_df.columns

(Index(['a', 'b'], dtype='object'), Index(['date', 'value'], dtype='object'))

Be careful with this. We want our functions to be stateless, otherwise we lose the benefits.

Alternatively, you could use the `.rename()` method.

In [33]:
def rename_columns(dataf):
    return dataf.rename(columns={"date": "a", "value": "b"})

date_df = make_ts_df()
date_df.pipe(rename_columns).columns, date_df.columns

(Index(['a', 'b'], dtype='object'), Index(['date', 'value'], dtype='object'))

## Pipeline abstraction on higher Levels

To fully appreciate what the pandas pipelines can do, let us rewrite one function.

In [47]:
def remove_outliers(dataf, min_value=-2.0, max_value=2.0):
    return (dataf
            .loc[lambda d: d['value'] > min_value]
            .loc[lambda d: d['value'] < max_value])

prep_df = (date_df
           .pipe(parse_types)
           .pipe(clean_nan)
           .pipe(remove_outliers, max_value=3))
prep_df

Unnamed: 0,date,value
0,2018-01-01,-0.182247
1,2018-01-02,-0.078724
2,2018-01-03,0.333304
3,2018-01-04,-0.568482
4,2018-01-05,0.067505
...,...,...
361,2018-12-28,0.432925
362,2018-12-29,-1.618329
363,2018-12-30,0.994623
364,2018-12-31,0.318080


The `.pipe()` can accept keyword arguments. This allows you to change, say, threshold values on a high level. No need to change the original function, you can change things from a higher level. This is great because it will encourage you to write functions that are general. 

# Conclusion 

> **"Pipelines are the only correct way to write pandas."**

This is a bold statement, but some of people very strongly about this. 

Even if you take this statement with a grain of salt, it is important to write your code in such a way that your notebooks remains clear - if it takes a lot of effort to understand the code of your colleagues, then your team will be slower than you want it to be. 

A notebook is a great scratchpad, but that is no excuse to write unclear code!

In [48]:
def remove_outliers(dataf, min_value=-2.0, max_value=2.0):
    return (dataf
            .loc[lambda d: d['value'] > min_value]
            .loc[lambda d: d['value'] < max_value])

df.????

NameError: name 'chickweight' is not defined