<img align="right" width="400" src="https://www.fhnw.ch/de/++theme++web16theme/assets/media/img/fachhochschule-nordwestschweiz-fhnw-logo.svg" alt="FHNW Logo">


# Text Preprocessor

by Fabian Märki

## Summary
The aim of this notebook is to understand how to write a generic Preprocessor that allows for parallel processing and can be used in a [scikit-learn](https://scikit-learn.org) [Pipeline](https://scikit-learn.org/stable/modules/compose.html). A [Pipeline](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline) is useful to chain individual processing steps into e.g. one classifier. This is convenient since there is often a fixed sequence of steps involved in data processing like cleaning, normalization, tokenization, steaming/lemmantization, vectorization, classification.

Preprocessing steps like cleaning, normalization, tokenization, steaming/lemmantization are are often done before model training starts (since these steps need to be done once only whereas model training is an iterative process). The aim of this generic Preprocessor is to allow for a seamless integration of these preprocessing steps into a production pipeline (where these preprocessing steps need to be applied) and thanks to the support for parallel execution also for faster batch processing.

## Links
- [Text Preprocessing Steps and Universal Reusable Pipeline](https://towardsdatascience.com/text-preprocessing-steps-and-universal-pipeline-94233cb6725a)
- [Make Pandas Apply Functions Faster using Parallel Processing](https://towardsdatascience.com/make-your-own-super-pandas-using-multiproc-1c04f41944a1)

This notebook contains assigments: <font color='red'>Questions are written in red.</font>

<a href="https://colab.research.google.com/github/markif/2021_HS_CAS_NLP_LAB_Notebooks/blob/master/04_a_Text_Preprocessor.ipynb">
  <img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [1]:
%%capture

!pip install 'fhnw-nlp-utils>=0.2.13,<0.3.0'

from fhnw.nlp.utils.processing import is_iterable
from fhnw.nlp.utils.storage import download
from fhnw.nlp.utils.storage import save_dataframe
from fhnw.nlp.utils.storage import load_dataframe


import pandas as pd
import numpy as np

In [2]:
from fhnw.nlp.utils.system import system_info
print(system_info())

OS name: posix
Platform name: Linux
Platform release: 5.11.0-40-generic
Python version: 3.6.9
Tensorflow version: 2.5.1
GPU is available


In [3]:
%%capture

!pip install multiprocess
!pip install scikit-learn

<font color='red'>**TASK: Implement the function `transform` so that it allows for parallel processing.**</font>

Possible Inspirations:
- [Text Preprocessing Steps and Universal Reusable Pipeline](https://towardsdatascience.com/text-preprocessing-steps-and-universal-pipeline-94233cb6725a) (at the very end)

In [4]:
import pandas as pd
import numpy as np

from sklearn.base import TransformerMixin, BaseEstimator
from multiprocess import Pool


class TextPreprocessor(BaseEstimator, TransformerMixin):
    def __init__(self,
                 func,
                 field_read,
                 field_write,
                 n_jobs = -1):
        """  
        func : function
            The preprocessing function to apply (e.g. tokenization)
        field_read : str
            Specifies the data column name to read
        field_write : str
            Specifies the column name to write the result of applying the preprocessing function
        n_jobs : int 
            Specifies the number of parallel processes to spawn
        """
        self.func = func
        self.field_read = field_read
        self.field_write = field_write
        
        if n_jobs <= 0:
            import psutil
            self.n_jobs = psutil.cpu_count(logical=True)
        else:
            self.n_jobs = n_jobs

    def fit(self, X, y=None):
        # nothing to do
        return self

    def transform(self, X, *_):
        
        # TODO: !!! place your code here !!!
        ####################################
        # !!! this needs rework !!!
        write_df = self.transform_sub_df(X)
        
        
        ###################
        # TODO: !!! end !!!
        
        return pd.concat([X, write_df], axis=1)
        
    def transform_sub_df(self, df):
        # apply the pre-processing function to the cells of the column
        series = df[self.field_read].map(
            lambda x: self.func(x) if isinstance(x, str) or is_iterable(x) else raise_(TypeError("Only string or iterable (e.g. list) is supported. Received a "+ str(type(x))))
        )
    
        # convert the pd.Series into a pd.DataFrame
        return series.to_frame(self.field_write)
        
    def raise_(ex):
        # see https://stackoverflow.com/a/8294654
        raise ex

Download data and load it into a pandas dataframe.

In [5]:
download("https://drive.google.com/uc?id=17nFv7PKC6YJttZT4D1txk4CCUgduq3pc", "data/german_doctor_reviews_original.parq")
data = load_dataframe("data/german_doctor_reviews_original.parq")
data.shape

(357899, 2)

In [6]:
data.head(3)

Unnamed: 0,text_original,rating
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0


Let's try a simple use case - tokenization

In [7]:
from nltk.tokenize import word_tokenize

def tokenize(text):
    """Tokenizes a text

    Parameters
    ----------
    text : str, iterable
        The text either as string or iterable of tokens (in this case tokenization is not applied)
        
    Returns
    -------
    list
        The tokenized text
    """
    
    if isinstance(text, str):
        word_tokens = word_tokenize(text)
    else:
        raise TypeError("Only string is supported. Received a "+ str(type(text)))

    return word_tokens

In [8]:
%%capture

import nltk
from nltk.corpus import stopwords

nltk.download('punkt')
nltk.download('stopwords')

stopwords = set(stopwords.words("german"))
empty_stopwords = set()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Let's see how long it takes on a single core...

In [9]:
%%time

preprocessor = TextPreprocessor(func=tokenize, field_read="text_original", field_write="tokenized", n_jobs=1)
data_test = data.head(50000)
data_test = preprocessor.transform(data_test)
data_test.head(3)

CPU times: user 1.52 s, sys: 188 ms, total: 1.71 s
Wall time: 41.1 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[Ich, bin, franzose, und, bin, seit, ein, paar..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[Dieser, Arzt, ist, das, unmöglichste, was, mi..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[Hatte, akute, Beschwerden, am, Rücken, ., Her..."


Let's see how long it takes on all available cores...

In [10]:
import psutil

n_jobs = psutil.cpu_count(logical=False)
print(n_jobs)

6


In [11]:
%%time

preprocessor = TextPreprocessor(func=tokenize, field_read="text_original", field_write="tokenized", n_jobs=n_jobs)
data_test = data.head(50000)
data_test = preprocessor.transform(data_test)
data_test.head(3)

CPU times: user 1.43 s, sys: 218 ms, total: 1.65 s
Wall time: 8.12 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[Ich, bin, franzose, und, bin, seit, ein, paar..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[Dieser, Arzt, ist, das, unmöglichste, was, mi..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[Hatte, akute, Beschwerden, am, Rücken, ., Her..."


Is there a speedup if we take hyperthreading into account...

In [12]:
import psutil

n_jobs = psutil.cpu_count(logical=True)
print(n_jobs)

12


In [13]:
%%time

preprocessor = TextPreprocessor(func=tokenize, field_read="text_original", field_write="tokenized", n_jobs=n_jobs)
data_test = data.head(50000)
data_test = preprocessor.transform(data_test)
data_test.head(3)

CPU times: user 1.91 s, sys: 474 ms, total: 2.39 s
Wall time: 7.42 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[Ich, bin, franzose, und, bin, seit, ein, paar..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[Dieser, Arzt, ist, das, unmöglichste, was, mi..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[Hatte, akute, Beschwerden, am, Rücken, ., Her..."


### fhnw-nlp-utils

**Preprocessor** and **parallelize_dataframe** are available through **fhnw-nlp-utils**. 

Here are two functions that preprocess text (as placeholder for any other text preprocessing function).
The function **tokenize** only access one field of a row (i.e. it processes a single columns) whereas **tokenize_by_row** could potentially access all fields of a row (as an example of a more advanced function that needs to access several columns (per row) for its computation).

In [14]:
def tokenize(text, stopwords):
    """Tokenizes a text and removes stopwords

    Parameters
    ----------
    text : str, iterable
        The text either as string or iterable of tokens (in this case tokenization is not applied)
    stopwords : set
        A set of stopword to remove from the tokens
        
    Returns
    -------
    list
        The tokenized text
    """
    from nltk.tokenize import word_tokenize
    from fhnw.nlp.utils.processing import is_iterable

    if isinstance(text, str):
        word_tokens = word_tokenize(text)
    elif is_iterable(text):
        word_tokens = text
    else:
        raise TypeError("Only string or iterable (e.g. list) is supported. Received a "+ str(type(text)))

    return [word.lower() for word in word_tokens if word.lower() not in stopwords]

def tokenize_by_row(row, stopwords):
    """Tokenizes a text and removes stopwords

    Parameters
    ----------
    row : pd.Series
        A row of the pandas Dataframe
    stopwords : set
        A set of stopword to remove from the tokens
        
    Returns
    -------
    list
        The tokenized text
    """
    from nltk.tokenize import word_tokenize

    word_tokens = word_tokenize(row["text_original"])
    return [word.lower() for word in word_tokens if word.lower() not in stopwords]

Let's see how this works...

The first example uses a function (i.e. **tokenize**) that accesses a single field of a row (i.e. one column).

In [15]:
from fhnw.nlp.utils.processing import parallelize_dataframe

In [16]:
%%time
data_test = data.head(50000)
data_test = parallelize_dataframe(data_test, tokenize, stopwords=stopwords, field_read="text_original", field_write="tokenized")
data_test.head(3)

CPU times: user 1.22 s, sys: 443 ms, total: 1.66 s
Wall time: 6.09 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[franzose, seit, paar, wochen, muenchen, ., za..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[arzt, unmöglichste, leben, je, begegnet, ,, u..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[akute, beschwerden, rücken, ., herr, magura, ..."


The second example uses a function (i.e. **tokenize_by_row**) that potentially could access several fields of a row (note that the parameter *field_read* is not provided in this example which acts as the switch between the *single field* and the *complete row* mode).

In [17]:
%%time
data_test = data.head(50000)
data_test = parallelize_dataframe(data_test, tokenize_by_row, stopwords=stopwords, field_write="tokenized")
data_test.head(3)

CPU times: user 2.08 s, sys: 564 ms, total: 2.65 s
Wall time: 7.87 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[ich, bin, franzose, und, bin, seit, ein, paar..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[dieser, arzt, ist, das, unmöglichste, was, mi..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[hatte, akute, beschwerden, am, rücken, ., her..."


Wrap this into a generic Preprocessor *Estimator* that can be used in a scikit-learn Pipeline. 

During inference (i.e. running live data in production) it can be handy to build a complete *processing pipeline*. This generic Preprocessor allows to re-use the same functions/setup as it was used during training without further modifications (simple to use and less error prone).

In [18]:
from fhnw.nlp.utils.processing import Preprocessor

In [19]:
%%time

preprocessor = Preprocessor(tokenize, stopwords=stopwords, field_read="text_original", field_write="tokenized")
data_test = data.head(50000)
data_test = preprocessor.transform(data_test)
data_test.head(3)

CPU times: user 1.14 s, sys: 560 ms, total: 1.7 s
Wall time: 6.12 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[franzose, seit, paar, wochen, muenchen, ., za..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[arzt, unmöglichste, leben, je, begegnet, ,, u..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[akute, beschwerden, rücken, ., herr, magura, ..."


In [20]:
%%time

preprocessor = Preprocessor(tokenize_by_row, stopwords=stopwords, field_write="tokenized")
data_test = data.head(50000)
data_test = preprocessor.transform(data_test)
data_test.head(3)

CPU times: user 1.65 s, sys: 566 ms, total: 2.22 s
Wall time: 7.51 s


Unnamed: 0,text_original,rating,tokenized
0,Ich bin franzose und bin seit ein paar Wochen ...,2.0,"[ich, bin, franzose, und, bin, seit, ein, paar..."
1,Dieser Arzt ist das unmöglichste was mir in me...,6.0,"[dieser, arzt, ist, das, unmöglichste, was, mi..."
2,Hatte akute Beschwerden am Rücken. Herr Magura...,1.0,"[hatte, akute, beschwerden, am, rücken, ., her..."
