In [1]:
import sys
sys.path.insert(0, '..')

# MLMR tutorial

This library will help you easily parallelize your python code for all kind of data transformations. Core functions are built on Map-Reduce paradigm. In this library **Map** part is parallelized using native python `multiprocessing` module. Let's define what Map-Reduce paradigm is about.

## Installation

```bash
pip install mlmr
```

## MapReduce
Wikipedia says:
> MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster.

Looks like something complex, like something you could use only on a big cluster or on an enormous dataset, right? But it is very easy, let's explain it in a couple of sentences and 1 example.
MapReduce is divided into two parts:
Map — function, which somehow transforms an input into an output. This function is called on every sample or sequence of samples of data, which have to be processed. This step could be easily parallelized, because for each sample or sequence of samples, in fact, you do the same operation.
Reduce — function, that aggregates all output of the map function. This step is sequential.

## Sum of squares example
For example, you have to calculate the sum of squares of an array of data. Sequential implementation would look like:

In [2]:
arr = [1, 2, 3, 4, 5]
result = 0
for i in arr:
    result += i**2
print(result)

55


Let’s convert this task into MapReduce:
1. Map — calculate the square of the number
1. Reduce — calculate the sum of map results

Sequential implementation in python:

In [3]:
from functools import reduce
arr = [1, 2, 3, 4, 5]
map_arr = map(lambda x: x**2, arr)
result = reduce(lambda x, y: x + y, map_arr)
print(result)

55


## MapReduce task definition
So now I hope you have brief understanding of the MapReduce concept, and we can move to what this library offers. Actually for any MapReduce task you have to define:
1. **Map function**
1. **Map function input** - you have to define how you data would be splited into parts, which would be fed to map function in parallel. Most obvious data splitting methods are data split into single samples or split data into N equal size splits.
1. **Data split function** - function which will turn data into list of map outputs.
1. **Reduce function**
1. **Reduce function input** - actually reduce output is list of map outputs

Hand's up! Let's show you usage of library functions on real examples. Let's use the example from above section.

## mlmr.function API interface definition on examples:
```python
def map_reduce(
    data,
    data_split_func,
    map_func,
    reduce_func,
    n_jobs
):

```
Base function for performing parallel MapReduce on data. Firstly data are splitted into data splits using `data_split_func` function. From `n_jobs` argument, number of processes to run in parallel is calculated. Then `map_func` is applied on each data split in parallel. After calculation is complete `reduce_func` is sequentially applied on list of `map_func` results. `reduce_func` result is returned. Data preserves initial ordering.

**Arguments:**
1. `data` (*Iterable*) - data on which MapReduce would be performed.
2. `data_split_func` (*Callable*) - function that would be used to split data to perform Map operation. Data split function signature: *func(Iterable) -> Iterable\[Any\]*
3. `map_func` (*Callable*) - Map function. Map function signature: *func(DataSplit) -> Any*. **Function can't be lambda or local function!**
4. `reduce_func` - (*Callable*) - Reduce function. Reduce function signature: *func(Iterable\[MapResult\]) -> Any*.
5. `n_jobs` - number of jobs to run in parallel. `-1` means using all processors.

**Returns**: Transformed (MapReduced) data.

Let's apply it to `Sum of squares task`:

In [4]:
from mlmr.function import map_reduce

def square(x): # our map function
    return x**2

arr = [1, 2, 3, 4, 5]

split_data = lambda x: x # identity function, because we want to split it by element

result = map_reduce(
    data=arr,
    data_split_func=split_data,
    map_func=square,
    reduce_func=sum, # python builtin function
    n_jobs=2
)

print(result)

55


We also can define alternative splitting function. Let's say that we want each process to calculate `sum of squares` on its slice of data. So that we will dicrease the processes communication latency. 

This change could be actually done in a several ways:

**Map** will calculate square of each number and return list of squares. **Reduce** will flatten **Map** results and calculate sum over all numbers (Could be slower version, because sequential part performs more work) 


In [5]:
import numpy as np
from mlmr.function import map_reduce

arr = [1, 2, 3, 4, 5]

def squares_of_slice(arr_slice): # our map function
    return list(map(lambda x: x**2, arr_slice))

def flatten_sum(map_results): # our reduce function
    return sum([item for sublist in map_results for item in sublist])

def get_split_data_func(n_slices): # wrapper function of split data function
    def split_data(data):
        return np.array_split(data, n_slices)
    return split_data

n_jobs = 2

result = map_reduce(
    data=arr,
    data_split_func=get_split_data_func(n_jobs), # split data into n_jobs slices
    map_func=squares_of_slice,
    reduce_func=flatten_sum,
    n_jobs=n_jobs
)

print(result)

55


**Map** will calculate square of each number and their sum (actually perform partial reduction). **Reduce** will have the same input, so will not be affected.

In [6]:
import numpy as np
from mlmr.function import map_reduce

arr = [1, 2, 3, 4, 5]

def squares_of_slice(arr_slice): # our map function, with partial reduction
    return sum(map(lambda x: x**2, arr_slice))

def get_split_data_func(n_slices): # wrapper function of split data function
    def split_data(data):
        return np.array_split(data, n_slices)
    return split_data

n_jobs = 2

result = map_reduce(
    data=arr,
    data_split_func=get_split_data_func(n_jobs), # split data into n_jobs slices
    map_func=squares_of_slice,
    reduce_func=sum,
    n_jobs=n_jobs
)

print(result)

55


```python
def map_reduce_splits(
    data_splits,
    map_func,
    reduce_func,
    n_jobs
):

```
Base function for performing parallel MapReduce on `data_splits`. From `n_jobs` argument, number of processes to run in parallel is calculated. Then `map_func` is applied on each element of `data_splits` in parallel. After calculation is complete `reduce_func` is sequentially applied on list of `map_func` results. `reduce_func` result is returned. Data preserves initial ordering.

**Arguments:**
1. `data_splits` (*Iterable*) - data splits on which MapReduce would be performed.
3. `map_func` (*Callable*) - Map function. Map function signature: *func(DataSplit) -> Any*. **Function can't be lambda or local function!**
4. `reduce_func` - (*Callable*) - Reduce function. Reduce function signature: *func(Iterable\[MapResult\]) -> Any*.
5. `n_jobs` - number of jobs to run in parallel. `-1` means using all processors.

**Returns**: Transformed (MapReduced) data splits.

This function is just a variation of  the previous one. So we can a little bit rewrite code to be able to use it.

In [7]:
from mlmr.function import map_reduce_splits

arr = [1, 2, 3, 4, 5]

def squares_of_slice(arr_slice): # our map function, with partial reduction
    return sum(map(lambda x: x**2, arr_slice))

def get_split_data_func(n_slices): # wrapper function of split data function
    def split_data(data):
        return np.array_split(data, n_slices)
    return split_data

n_jobs = 2

data_slices = get_split_data_func(n_jobs)(arr)

result = map_reduce_splits(
    data_splits=data_slices,
    map_func=squares_of_slice,
    reduce_func=sum,
    n_jobs=n_jobs
)

print(result)

55


## Data transformation parallelization for basic ML stack (pandas+sklean)

```python
def transform_concat(
    data: Iterable,
    transform_func: Callable[[Iterable], Any],
    n_jobs: int = 1
):

```
Function for performing parallel data transformations on `data` (pd.DataFrame, pd.Series). From `n_jobs` argument, number of processes to run in parallel is calculated. Data is evenly divided into number of processes slices. Then `transform_func` is applied on each slice in parallel. After calculation is complete all transformation results are flattened. Flattened result is returned. Data preserves initial ordering.

**Arguments:**
1. `data` (*Iterable*) - data on which transformation using MapReduce would be performed.
3. `transform_func` (*Callable*) - transformation function of a `data` **split** . Transform function signature: *func(Union\[pd.DataFrame, pd.Series\]) -> Union\[pd.DataFrame, pd.Series\]*. **Function can't be lambda or local function!**
5. `n_jobs` - number of jobs to run in parallel. `-1` means using all processors.

**Returns**: Transformed data.

Actually here I want to stop you attention. During data exploration I often suffer from slow data transformation execution. Sometimes it takes ten's of minutes to get the result of a single transformation. I was surfing web actually for a long time and haven't found good solution haow to parallelize usual ml stack transformations not quiting using pandas (of course you can you dask and stuff, but some times it is just too much). So I have prepared an easy solution for you to use, without making a big effort. 

I'll show you an example of parallelization `pd.Series.apply` method using `mlmr.function.transform_concat`, that I have used in my old project. The task is transparent in this case, common NLP pipeline of text lemmatization.

In [8]:
import numpy as np
import pandas as pd

texts = pd.Series([
    """
    Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
    Ut maximus consequat turpis et condimentum. 
    Duis ullamcorper dictum posuere.
    Curabitur auctor quis sapien congue aliquet. 
    Aliquam dignissim suscipit rhoncus. 
    Fusce vitae cursus dui, eu aliquam dui. 
    Nulla et ultrices lacus, at iaculis arcu. 
    Sed fermentum metus libero, sed egestas libero ultrices sed. 
    Duis erat leo, ultricies quis dapibus non, lacinia ut tellus.
    """
]*50000)
print('Datset size:', len(texts))

Datset size: 50000


In [9]:
import string
import spacy 
import en_core_web_sm
from nltk.tokenize import word_tokenize

nlp = en_core_web_sm.load()

def preprocess_texts_df(df): # our transform(map) function
    return df.apply(preprocess_text)

def remove_punct(doc):
    return [t for t in doc if t.text not in string.punctuation]

def remove_stop_words(doc):
    return [t for t in doc if not t.is_stop]

def lemmatize(doc):
    return ' '.join([t.lemma_ for t in doc])

def preprocess_text(text):
    doc = nlp(text)
    removed_punct = remove_punct(doc)
    removed_stop_words = remove_stop_words(removed_punct)
    return lemmatize(removed_stop_words)

In [10]:
from mlmr.function import transform_concat

In this example I would like you to show the execution speed up. You can see that until using 8 processes the speed up is almost linear and the it start to stognate. Well you can't speed up to the infinity by just adding working processes.

In [11]:
%%time

texts_preprocessed = preprocess_texts_df(texts) # sequential implementation

CPU times: user 8min 36s, sys: 31.7 ms, total: 8min 36s
Wall time: 8min 36s


In [12]:
%%time

texts_preprocessed = transform_concat(texts, preprocess_texts_df, n_jobs=2) # using 2 processes

CPU times: user 173 ms, sys: 72.1 ms, total: 245 ms
Wall time: 5min 22s


In [13]:
%%time

texts_preprocessed = transform_concat(texts, preprocess_texts_df, n_jobs=4) # using 4 processes

CPU times: user 140 ms, sys: 64.1 ms, total: 204 ms
Wall time: 2min 51s


In [14]:
%%time

texts_preprocessed = transform_concat(texts, preprocess_texts_df, n_jobs=8) # using 8 processes

CPU times: user 152 ms, sys: 92.1 ms, total: 244 ms
Wall time: 2min 30s


In [15]:
%%time

texts_preprocessed = transform_concat(texts, preprocess_texts_df, n_jobs=12) # using 12 processes

CPU times: user 192 ms, sys: 132 ms, total: 324 ms
Wall time: 2min 19s
