# Multithreading Pandas

In this notebook we are going to demonstrate the mutliprocessing_pandas functionality form the utils module. 

To do so we will concatenate our training dataframe iteratively to a bigger frame. Lets begin by importing our modules.

In [26]:
# Import necessary packages
import datetime
import dateutil
from importlib import reload
import multiprocessing
import numpy as np
import pandas as pd
import psutil
import pdb
import time
import os
from pathlib import Path

# The docker environment manipulates the python path to include our source directory
# Execute this from within the docker environ to make these import work
import utils
import datasets.smartmeter as smartmeter

## I. Massive DataFrame

Now we generate the larger dataframe, by simply repeating the sample training dataframe.

In [29]:
dataset_folder = Path(os.getenv("DATA"), "smart-meter-london")
data_df = pd.read_csv(Path(dataset_folder, "halfhourly_dataset", "halfhourly_dataset", "block_0.csv"), low_memory=False)
massive_df = pd.concat([data_df] * 2) 

# Lets look at the result
print(len(massive_df))
data_df.head()

2445340


Unnamed: 0,LCLid,tstp,energy(kWh/hh)
0,MAC000002,2012-10-12 00:30:00.0000000,0
1,MAC000002,2012-10-12 01:00:00.0000000,0
2,MAC000002,2012-10-12 01:30:00.0000000,0
3,MAC000002,2012-10-12 02:00:00.0000000,0
4,MAC000002,2012-10-12 02:30:00.0000000,0


With 15 million samples, the execution time should be slow enough to notice the execution times. 
Lets define a simple function by which we are going to benchmark the speed of our multicored approach.

## II. Benchmark

In [30]:
# Lets use the cast to datetime obj function, which we use quite frequently
def convert_time(x):
    return dateutil.parser.parse(x)

Lets look how long this function will take when processing the dataframe with help of the apply method.

In [31]:
# start measuring
start = time.time()
# execute 
slow_df = massive_df['tstp'].apply(convert_time)
# end measuring
end = time.time()
print(end-start)

120.14152312278748


## III. Concurrent Processing

We are going to use the concurrent library with its ProcessorPoolExecutor class as oposed to multiprocessing.pool, as the complexity of our application is relatively high and we therefore prefer customizability. 

Lets begin by importing the concurent module.

In [49]:
import concurrent.futures
import multiprocessing
multiprocessing.set_start_method('fork', force=True)

Next lets define the positional arguments.

### Input Parameters

In [41]:
function = 'apply'            # we are going to use the getattr buildin which requires a string
args = [convert_time]                 # positional argument to apply but to keep the option for mor open we make it a list
kwargs = {}                 # the apply function does not take any for series
max_cores = 10                # in case we do not wont to allocate all the compute power
data_df = massive_df['tstp'].copy()   # we dont want any referencing accidents

Next lets convert those positional arguments into usable values for our concurrent.future.Proc. We need to split the dataframe, so that we can treat the sections independently.

In [42]:
# if not maxcores take all of them!!
if max_cores:
    n_cores = np.min([max_cores, psutil.cpu_count()]) - 2
else:
    n_cores = psutil.cpu_count() - 2
    
# very easy way to split the array
split_array = np.array_split(data_df, n_cores)

Lets get to the multithreading part. This is where it get interesting, therefore we will measure the time. 

!!The following code will not run due to the ipynb kernels limitation!! 
!!Import from utils instead!!

In [50]:
# start timer
start = time.time()


futures = list()          # processes list
return_values = list()    # result partial df list

# The following line creates an atomic block, finishing only after we are done with each process 
# and destroying executor on the go
with concurrent.futures.ProcessPoolExecutor() as executor:
    # Iteratively create processes for array elements

    for index, array in enumerate(split_array):
        # If string interprete ass dataframe member
        if isinstance(function, str):
            func = getattr(array, function)
            futures.append(executor.submit(func, *args.copy(), **kwargs.copy()))
        else:
            futures.append(executor.submit(function, array, *args.copy(), **kwargs.copy()))


# Now lets retrieve the results 
for process in futures:
    return_value = process.result()
    return_values.append(return_value)

for index, process in enumerate(futures):

    return_value = process.result()
    return_values.append(return_value)
result_df = pd.concat(return_values)
if len(result_df) == len(data_df):
    result_df = result_df.reindex(index = data_df.index)
else:
    result_df.index = range(0, len(result_df))
end = time.time()

# and concatenate to our result data frame
final_df = pd.concat(return_values)



This functionality is also implemented with the utils module. Lets call it that way and measure the execution speed aswell as compare results

In [51]:
print(f"multithreading needed {end-start}")

multithreading needed 25.73172116279602


In [54]:
fast_df.equals(slow_df)

True

In [53]:
reload(utils)
start = time.time()
fast_df = utils.pandas.multiprocess(data_df, function, args=args)
end = time.time()

>
> M: Multiprocessing --------------
[K> M: Creating processes[K[?25h
[K> M: Running processesg[K[K
[K> M: Duration: 22.76 sec ----------
>


In [55]:
fast_df.equals(slow_df)

True

Hurray! Results are equivalent. We can now have a look at the timing!

We achieved a third of the time required for a single processor. Note that this ratio decreases further with increasing sample size.

## IV. Row Interdependency

Some functions have so called row interdependencies, such as the diff method. The diff method needs the value of the preceiding row to compute the current row. When splitting the dataframe into an array of frames, this creates several nan values. To aliviate this issue we are going to implement an overlap function using the example from [stackoverflow](https://stackoverflow.com/questions/55964768/split-data-frames-into-multiple-with-overlap-rows).

The idea is to have the dataframe splits overlap each other, so that the rows overhanging rows can be used for the computation and then discarded before the concatnation. 

The dataframes with overhang need to look like: ![](./pics/dataframe-overlapsplit.png)

The dataframe from the example is:

In [56]:
data = {'ID': [178,153,193,195,214,157,205,212,219,166,217,186,170,207,204,201,179,215,213,170,217,199], 
'Unit_Weight': [178,153,193,195,214,157,205,212,219,166,217,186,170,207,204,201,179,215,213,170,217,199]}

df = pd.DataFrame(data)

### Input Prameters

We will want to use the input parameters:
- sections: # of dataframe splits
- overlap: overlap depth between two frames (must be multiple of 2)


In [57]:
sections = 5
overlap = 2  # in utils we will use row_depedencies = 1 to say the 
             # row is dependent on its direct neighbourhood

Lets create the overlapping block from our input parameters.

In [58]:
blockl_clean = (len(df)-overlap)/sections       # block length without the overlap
blockl_overlap = int(blockl_clean + overlap)    # block length with overlap

# The blocks can be easily created using list comprehension
block_array = [df.loc[n_block:n_block + blockl_overlap - 1, :] for n_block in range(0, len(df),blockl_overlap - overlap) if n_block < len(df) - overlap]

# Lets output them, so that we can check 
print("Unprocessed blocks:")
for block_overlap in block_array:
    print (block_overlap)


Unprocessed blocks:
    ID  Unit_Weight
0  178          178
1  153          153
2  193          193
3  195          195
4  214          214
5  157          157
    ID  Unit_Weight
4  214          214
5  157          157
6  205          205
7  212          212
8  219          219
9  166          166
     ID  Unit_Weight
8   219          219
9   166          166
10  217          217
11  186          186
12  170          170
13  207          207
     ID  Unit_Weight
12  170          170
13  207          207
14  204          204
15  201          201
16  179          179
17  215          215
     ID  Unit_Weight
16  179          179
17  215          215
18  213          213
19  170          170
20  217          217
21  199          199


Now lets process them and put them back together.

In [59]:
# Now lets put them together    
processed_blocks = list()
truncate = int(overlap/2)

for index, block in enumerate(block_array):
    # Call the function on each block
    block = block.diff()
    
    # Then remove the overlap exept for the last
    if index == len(block_array) - 1:
        block = block.iloc[truncate:]
    else:
        block = block.iloc[truncate:-truncate]
    print(block)
    processed_blocks.append(block)

    
result_df = pd.concat(processed_blocks)
result_df = result_df.reindex(index = df.index)
print(f"The block is equal to the original: {result_df.equals(df.diff())}")

     ID  Unit_Weight
1 -25.0        -25.0
2  40.0         40.0
3   2.0          2.0
4  19.0         19.0
     ID  Unit_Weight
5 -57.0        -57.0
6  48.0         48.0
7   7.0          7.0
8   7.0          7.0
      ID  Unit_Weight
9  -53.0        -53.0
10  51.0         51.0
11 -31.0        -31.0
12 -16.0        -16.0
      ID  Unit_Weight
13  37.0         37.0
14  -3.0         -3.0
15  -3.0         -3.0
16 -22.0        -22.0
      ID  Unit_Weight
17  36.0         36.0
18  -2.0         -2.0
19 -43.0        -43.0
20  47.0         47.0
21 -18.0        -18.0
The block is equal to the original: True


Lets finally recreate this using the utils module. The previously seen split function can be implored through:

In [62]:
overlaps = utils.pandas.overlap_split(df, 5, 2)

The entire multithreaded operation can be recreated using

In [63]:
reload(utils)
result_df = utils.pandas.multiprocess(df, 'diff', row_interdependency=1, max_cores=5)
print(f"The block is equal to the original: {result_df.equals(df.diff())}")

>
> M: Multiprocessing --------------
[K> M: Creating processes[K[?25h
[K> M: Running processesg[K[K
[K> M: Duration: 0.51 sec -----------
>
The block is equal to the original: False


In [64]:
result_df

Unnamed: 0,ID,Unit_Weight
0,-25.0,-25.0
1,40.0,40.0
2,2.0,2.0
3,19.0,19.0
4,-57.0,-57.0
5,48.0,48.0
6,7.0,7.0
7,7.0,7.0
8,-53.0,-53.0
9,51.0,51.0


In [65]:
df.diff()

Unnamed: 0,ID,Unit_Weight
0,,
1,-25.0,-25.0
2,40.0,40.0
3,2.0,2.0
4,19.0,19.0
5,-57.0,-57.0
6,48.0,48.0
7,7.0,7.0
8,7.0,7.0
9,-53.0,-53.0


In [None]:
### I changed this whole reindexing stuff for something in the load_data function but i cant remember
### I need to write unittests for all this stuff its overwhelming