# DataFrame PipeLine
> A pandas dataframe pipeline

This is a tutorial about tabular data processing pipeline

In [1]:
# default_exp pipe

## Dependence
This tool works on pandas DataFrame

In [2]:
# export
import numpy as np
import pandas as pd
from itertools import chain
from collections import Counter

## Basic Structure

* ```Node``` is the basic root structure of pipeline, it's a status of data
* ```Edge``` is the **change** between the nodes, usually a function

### Node

* Node, works on a DataFrame
* chunkNode, when the entire DataFrame is intimidating to the memory, a chunkNode works on a generator

In [9]:
# export
class Node(object):
    def __init__(self, df, verbose=1):
        super(Node,self).__init__()
        self.df = df
        self.verbose = verbose
        self.pipenames = list()
        self.pipe = list()

    def __repr__(self):
        return "<forge pipeline node>\n\t|"+"\n\t|".join(self.pipenames)

    def __or__(self, edge):
        """
        use it as:
        node|edge|edge
        :param process_step:
        :return:
        """
        self.pipe.append(edge)
        self.pipenames.append(edge.edge_name)
        return self

    def run(self):
        for pro_i in range(len(self.pipe)):
            if self.verbose > 0: print("[df edge]:%s" % self.pipenames[pro_i])
            pro = self.pipe[pro_i]
            self.df = pro.pro(self.df)
        return self.df

class chunkNode(Node):
    def __init__(self, df_chunk, verbose=1):
        """
        Use this class instead of Node class, for huge data sourse like big csv or huge SQL table
        chunkNode(pd.read_csv("xxx.csv",chunksize = 1000), verbose =1)
        :param df_chunk: pandas dataframe with chunksize parameter
        :param verbose:
        """
        super(chunkNode, self).__init__(df = df_chunk, verbose = verbose)
        self.df_chunk = self.df

    def run(self):
        """
        Running iterations on the entire dataset
        :return: None
        """
        for df in self.df:
            for pro_i in range(len(self.pipe)):
                if self.verbose > 0: print("[df edge]:%s" % self.pipenames[pro_i])
                pro = self.pipe[pro_i]
                df = pro.pro(df)

    def testrun(self):
        """
        testing for 1 iteration
        :return: the result dataframe
        """
        testdf = next(self.df)
        print("Please restart the generator after running test",flush=True)
        for pro_i in range(len(self.pipe)):
            if self.verbose > 0: print("[df edge]:%s" % self.pipenames[pro_i])
            pro = self.pipe[pro_i]
            testdf = pro.pro(testdf)

        return testdf

### Edges
There are 2 types of edges

In [3]:
# export
class frameEdge(object):
    def __init__(self, edge_name=None):
        super(frameEdge, self).__init__()
        if edge_name == None:
            edge_name = "DataFrame_Processing_Edge"
        self.edge_name = edge_name
        self.i = None

    def __mul__(self, cols):
        assert 0, "Only colEdge support * columns operation"

    def define(self, f):
        def wraper(df):
            return f(df)

        self.pro = wraper
        return wraper


class colEdge(object):
    def __init__(self, edge_name=None):
        super().__init__()
        if edge_name == None:
            edge_name = "DataSeries_Processing_Edge"
        self.edge_name = edge_name
        self.cols = []

    def __mul__(self, cols):
        self.cols = cols
        return self

    def __mod__(self,col):
        self.cols = [col]
        return self

    def define(self, f):
        def wraper(col):
            col = f(col)
            return col

        self.colpro = wraper
        return wraper

    def pro(self, df):
        for c in self.cols:
            df[c] = self.colpro(df[c])
        return df

nothing = frameEdge("empety_process")

@nothing.define
def donothing(df):
    return df

* A frameEdge defines what happens from a dataframe to dataframe.

In [None]:
grpby = frameEdge("groupby")

@grpby.define
def grpby_func(df):
    return df.groupby("item_id").count()

* A colEdge defines what happens from a column (data series) to a edited version of this column

In [None]:
double_value = colEdge("value double")

@double_value.define
def double_value_func(col):
    return col*2

## Some examples  on edge

In [29]:
# export
class fillNaEdge(colEdge):
    def __init__(self, fill=0.):
        super().__init__("fillna_%s"%(fill))
        self.fill = fill

    def colpro(self, col):
        return col.fillna(self.fill)


class engTokEdge(colEdge):
    def __init__(self, tokenizer=None, max_len=None):
        """
        Default using tweet tokenizer
        """
        super().__init__("En Tokenization")
        if tokenizer ==None:
            from nltk.tokenize import TweetTokenizer
            self.tokenizer = TweetTokenizer()
        else:
            self.tokenizer = tokenizer
        self.max_len = max_len

    def colpro(self, c):
        return c.apply(lambda x: self.tokenizer.tokenize(x)[:self.max_len])

eng_twt_tk = engTokEdge()

class CNTok(colEdge):
    def __init__(self):
        """
        cntok = CNTok()
        datanode = start_node|cntok*["col1","col2"]
        datanode.run()
        """
        super().__init__("chinese_tokenize")
        from jieba import cut
        self.cut = cut

    def colpro(self, col):
        col = col.apply(lambda x:list(self.cut(str(x))))
        return col

class capMinMaxEdge(colEdge):
    def __init__(self, min_ = None, max_ = None):
        super().__init__("cap min:%s max:%s"%(min_,max_))
        self.min_ = min_
        self.max_ = max_

    def colpro(self,col):
        col = np.clip(col.values,a_min = self.min_, a_max = self.max_)
        return col

class trackVocabEdge(colEdge):
    """
    a colEdge
    input column should contain python list
    This edge will keep track a vocabulary pandas DataFrame
    tck_vcb = TrackVocab()
    tck_vcb.vocab is the accumulated vocabulary
    """
    def __init__(self):
        super().__init__("track vocab")
        self.vocab = pd.DataFrame({"token": [], "cnt": []})

    def colpro(self, col):
        lists = list(col)
        biglist = list(chain.from_iterable(lists))
        self.vocab = self.combine_vocab(self.build_vocab(biglist))
        return col

    def get_token_count_dict(self, full_tok):
        """count the token to a list"""
        return Counter(full_tok)

    def build_vocab(self, full_tok):
        ct_dict = self.get_token_count_dict(full_tok)
        tk, ct = list(ct_dict.keys()), list(ct_dict.values())

        return pd.DataFrame({"token": tk, "cnt": ct})

    def combine_vocab(self, new_vocab):
        combinedf = pd.concat([self.vocab, new_vocab]).groupby("token").sum().reset_index()
        return combinedf.sort_values(by="cnt", ascending=False).reset_index().rename(columns = {"index":"idx"})

    def save_vocab(self, json_url):
        self.vocab.to_json(json_url)


class saveCSV(frameEdge):
    """
    DataFrame Edge
    SaveCsv("/path/to/file.csv")
    """
    def __init__(self, csvpath, tocsv_conf={"sep": "\t", "index":False}):
        super().__init__("save to csv")
        self.csvpath = csvpath
        self.tocsv_conf = tocsv_conf
        self.header = True

    def pro(self, df):
        df.to_csv(self.csvpath,header=self.header, mode="a", **self.tocsv_conf)
        self.header = False
        return df

class saveSQL(frameEdge):
    """
    DataFrame Edge
    SaveSQL("table_name", con)
    """
    def __init__(self, table_name, con, tosql_conf={"index": False, "if_exists":"append"}):
        super().__init__("save to sql_table")
        self.table_name = table_name
        self.con = con
        self.tosql_conf = tosql_conf

    def pro(self, df):
        df.to_sql(self.table_name, con=self.con, **self.tosql_conf)
        return df

## Pipe in simple look

#### Column edges
```edge``` defines a method of processing data.
* colEdge and its sub-class process column
* frameEdge and its cub_class process a dataframe (multiple columns)

### Nodes/Edges relationship mapping
* node1 ==edge1==> node2 : 
    * 
    ```python
    node2 = node1 | edge1
    ```
* node1 ==edge1==>edge2==> node2 : 
    * 
    ```python
    node2 = node1 | edge1 | edge2
    ```
    * 
    ```python
    node3 = node2 | edge3
    ```
    
### Relationship mapping with **columns specified**
* Specifying 1 column

In [None]:
node2 = node1 | edge1%"column_1"

* Specifying multiple columns

In [None]:
node2 = node1 | edge1*["column_2","column_3","column_4"]

## Tutorial

In [5]:
import pandas as pd
import numpy as np
from pathlib import Path
import os

Sample dataset: [New York City Airbnb Open Data](https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data)

In [6]:
HOME = Path(os.environ["HOME"])/"data"
DATA = HOME/"AB_NYC_2019.csv"

A preview on data set

In [17]:
df = pd.read_csv(DATA)

df.head()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


Start node, a pandas dataframe

### Real Data Example

In [None]:
from forgebox.pipe import *

In [30]:
start_node = Node(df)

In [31]:
cap_minmax_edge = capMinMaxEdge(max_ = 100)
fill_na_edge = fillNaEdge("")

Setting the node/edge pipeline

In [32]:
# clip minmax value on 1 column
# fill na to "", on 2 columns
# tokenize, on 2 columns
end_node = start_node|cap_minmax_edge %"number_of_reviews"\
                    |fill_na_edge *["name","room_type"]\
                    |eng_twt_tk*["name","room_type"]   

Print out the pipeline layout

In [33]:
end_node

<forge pipeline node>
	|cap min:None max:100
	|fillna_
	|En Tokenization

Excute the processing

In [34]:
end_df = end_node.run()

[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization


Checking the processed data

In [35]:
end_df.head()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,"[Clean, &, quiet, apt, home, by, the, park]",2787,John,Brooklyn,Kensington,40.64749,-73.97237,"[Private, room]",149,1,9,2018-10-19,0.21,6,365
1,2595,"[Skylit, Midtown, Castle]",2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,"[Entire, home, /, apt]",225,1,45,2019-05-21,0.38,2,355
2,3647,"[THE, VILLAGE, OF, HARLEM, ..., NEW, YORK, !]",4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,"[Private, room]",150,3,0,,,1,365
3,3831,"[Cozy, Entire, Floor, of, Brownstone]",4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,"[Entire, home, /, apt]",89,1,100,2019-07-05,4.64,1,194
4,5022,"[Entire, Apt, :, Spacious, Studio, /, Loft, by...",7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,"[Entire, home, /, apt]",80,10,9,2018-11-19,0.1,1,0


### Working with bigger data
There is often the cases when we have to deal with csv > 500m in size or a huge sql table.

In some cases we want to work/ save the data batch by batch

Gladely, pandas offer a chunksize solution, so we can process huge structured data in batch(defined by the chunk size)

In [36]:
from forgebox.pipe import chunkNode,saveCSV, saveSQL

```saveCSV``` : saving file could be a part of the pipeline, we can follow that edge by ```saveSQL``` if we like

In [37]:
start_node = chunkNode(pd.read_csv(DATA, chunksize=5000), verbose = 1)

end_node = start_node|cap_minmax_edge %"number_of_reviews"\
                    |fill_na_edge *["name","room_type"]\
                    |eng_twt_tk*["name","room_type"]\
                    |saveCSV(HOME/"nyc_processed.csv")

Pipeline layout summary

In [38]:
end_node

<forge pipeline node>
	|cap min:None max:100
	|fillna_
	|En Tokenization
	|save to csv

**Notice**
* ```run``` function that 
    * start with a chunkNode has **no return**, 
    * start with a Node will ruturn the **result dataframe**. 
* This feature is purposefully designed, assuming the result data could also be huge and not suitable to remain its entirety in RAM.

Excute the processing， if you are annoyed by the verbosity, set ```verbose=0```

In [39]:
end_node.run()

[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv
[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:save to csv


In [40]:
!head -n3 {HOME/"nyc_processed.csv"}

0	2539	['Clean', '&', 'quiet', 'apt', 'home', 'by', 'the', 'park']	2787	John	Brooklyn	Kensington	40.647490000000005	-73.97237	['Private', 'room']	149	1	9	2018-10-19	0.21	6	365
1	2595	['Skylit', 'Midtown', 'Castle']	2845	Jennifer	Manhattan	Midtown	40.75362	-73.98376999999999	['Entire', 'home', '/', 'apt']	225	1	45	2019-05-21	0.38	2	355
2	3647	['THE', 'VILLAGE', 'OF', 'HARLEM', '...', 'NEW', 'YORK', '!']	4632	Elisabeth	Manhattan	Harlem	40.809020000000004	-73.9419	['Private', 'room']	150	3	0			1	365


### Define a new edge
Create a new edge

Define the processing function with the ```define``` decorator.

* col is a pandas data series, the concept of ```column``` in pandas
* In this case we use the ```apply``` function of data series, any decorated function would work as long as it return another data series

In [16]:
lower_case = colEdge("lower case")


def lowerList(x):
    return list(str(i).lower() for i in x)

@lower_case.define
def lower(col):
    return col.apply(lowerList)

In [17]:
# The DIYed edge will work on columns "name" and "room_type" after tokenization
df = pd.read_csv(DATA)
start_node = Node(df)
end_node = start_node|cap_minmax_edge %"number_of_reviews"\
                    |fill_na_edge *["name","room_type"]\
                    |eng_twt_tk*["name","room_type"]\
                    |lower_case*["name","room_type"]

In [18]:
end_node

<forge pipeline node>
	|cap min:None max:100
	|fillna_
	|En Tokenization
	|lower case

In [19]:
end_df = end_node.run()
end_df.head()

[df edge]:cap min:None max:100
[df edge]:fillna_
[df edge]:En Tokenization
[df edge]:lower case


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,"[clean, &, quiet, apt, home, by, the, park]",2787,John,Brooklyn,Kensington,40.64749,-73.97237,"[private, room]",149,1,9,2018-10-19,0.21,6,365
1,2595,"[skylit, midtown, castle]",2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,"[entire, home, /, apt]",225,1,45,2019-05-21,0.38,2,355
2,3647,"[the, village, of, harlem, ..., new, york, !]",4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,"[private, room]",150,3,0,,,1,365
3,3831,"[cozy, entire, floor, of, brownstone]",4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,"[entire, home, /, apt]",89,1,100,2019-07-05,4.64,1,194
4,5022,"[entire, apt, :, spacious, studio, /, loft, by...",7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,"[entire, home, /, apt]",80,10,9,2018-11-19,0.1,1,0
