### Multiprocessing over Pandas Dataframe Columns

<br>

#### Objective:
Iterate over pandas dataframe and perform column-wise operations in separate processes that honour shared _write-on-copy_ (due to _fork()_) memory

#### Steps:
  * Monitor used and available memory
  * Allocate large dataframe
  * Do row counts in multiple processes on columns of that dataframe


In [1]:
import multiprocessing
from functools import partial
import numpy as np
import pandas as pd
import time
import psutil

System memory checkin

In [2]:
print('used: {}% free: {:.2f}GB'.format(psutil.virtual_memory().percent, float(psutil.virtual_memory().free)/1024**3))

used: 55.8% free: 5.25GB


Allocation of a reasonably large frame (for `cols=6` and `rows=10^8` this results in `4.47GB` of data)

In [3]:
cols = 6
columns = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
rows = 10**8
df = pd.DataFrame(np.random.randn(rows,cols), columns=columns[0:cols])
print('{0:.2f}GB'.format(float(df.memory_usage(index=True).sum())/1024**3))

4.47GB


In [4]:
# Another memory check-in
print('used: {}% free: {:.2f}GB'.format(psutil.virtual_memory().percent, float(psutil.virtual_memory().free)/1024**3))

used: 85.1% free: 0.65GB


Perform multiprocessing across each of the columns

In [10]:
from multiprocessing import Pool, cpu_count
import subprocess
import shlex

def parallelise_cols(func, df, num_processes=None):
    if num_processes==None:
        num_processes = min(df.shape[1], cpu_count())

    with Pool(num_processes) as pool:
        # we need a sequence of columns to pass pool.map
        #seq = [df[col_name] for col_name in df.columns]
        #l = [1,2,3,4,5,6,7,8,9,10]
        # pool.map returns results as a list
        results_list = pool.map(func, df.iteritems())
        # return list of processed columns, concatenated together as a new dataframe
        #return pd.concat(results_list, axis=1)

def action(data):
    print('name: {}; row.count: {}; used: {}% free: {:.2f}GB'.format(data[0], 
                                                                     len(data[1].index)
                                                                     psutil.virtual_memory().percent, 
                                                                     float(psutil.virtual_memory().free)/1024**3))
    time.sleep(2)
    print('done sleeping {}'.format(multiprocessing.current_process()))

In [7]:
parallelise_cols(action, df, num_processes=2)


name: a; row.count: 100000000; used: 94.3% free: 0.86GB
done sleeping <ForkProcess(ForkPoolWorker-1, started daemon)>
name: b; row.count: 100000000; used: 92.5% free: 1.14GB
done sleeping <ForkProcess(ForkPoolWorker-2, started daemon)>
name: c; row.count: 100000000; used: 94.4% free: 0.85GB
done sleeping <ForkProcess(ForkPoolWorker-1, started daemon)>
name: d; row.count: 100000000; used: 90.4% free: 1.44GB
done sleeping <ForkProcess(ForkPoolWorker-2, started daemon)>
name: e; row.count: 100000000; used: 90.5% free: 1.40GB
done sleeping <ForkProcess(ForkPoolWorker-1, started daemon)>
name: f; row.count: 100000000; used: 84.0% free: 2.38GB
done sleeping <ForkProcess(ForkPoolWorker-2, started daemon)>


In [11]:
parallelise_cols(action, df, num_processes=4)


name: a; row.count: 100000000; used: 85.3% free: 2.05GB
done sleeping <ForkProcess(ForkPoolWorker-7, started daemon)>
name: b; row.count: 100000000; used: 86.5% free: 1.86GB
done sleeping <ForkProcess(ForkPoolWorker-8, started daemon)>
name: c; row.count: 100000000; used: 87.3% free: 1.74GB
done sleeping <ForkProcess(ForkPoolWorker-9, started daemon)>
name: d; row.count: 100000000; used: 84.1% free: 2.22GB
done sleeping <ForkProcess(ForkPoolWorker-10, started daemon)>
name: e; row.count: 100000000; used: 86.3% free: 1.88GB
done sleeping <ForkProcess(ForkPoolWorker-7, started daemon)>
name: f; row.count: 100000000; used: 79.8% free: 2.86GB
done sleeping <ForkProcess(ForkPoolWorker-8, started daemon)>
