# Dask
Dask is a Python library for parallel and distributed computing

https://www.dask.org/

# pandarallel
pandarallel is a simple and efficient tool to parallelize Pandas operations on all available CPUs.

https://pypi.org/project/pandarallel/

https://nalepae.github.io/pandarallel/

# mapply
provides a sensible multi-core apply function for Pandas.

https://mapply.readthedocs.io/en/stable/README.html#mapply-vs-pandarallel-vs-swifter

In [1]:
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
import numpy as np
import timeit
from pandarallel import pandarallel

pandarallel.initialize(progress_bar=True)

INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


In [2]:
N = 5000000
A_list = np.random.randint(1, 200, N)
B_list = np.random.randint(1, 200, N)
data = pd.DataFrame({'A': A_list, 'B': B_list})
data.head()

Unnamed: 0,A,B
0,129,80
1,122,88
2,63,117
3,60,20
4,41,10


In [3]:
import multiprocessing

multiprocessing.cpu_count()

16

In [32]:
ddata = dd.from_pandas(data, npartitions=16)

In [28]:
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(scheduler='threads')  
  
def vectorized(): return myfunc(data['A'], data['B']  )

In [7]:
t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

67.09131490020081


In [33]:
t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1)) # 56 seg

57.42132580000907


In [9]:
def myfunc_2(row): return row.A*(row.B**2+1)

In [10]:
# Benchmark the Pandarallel approach
def pandarallel_benchmark():
    data.parallel_apply(myfunc_2, axis=1)

In [11]:
pandarallel_time = timeit.timeit(pandarallel_benchmark, number=1)
pandarallel_time

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=625000), Label(value='0 / 625000')…

40.357989199925214

In [11]:
pandarallel_time

40.24564839992672

In [12]:
t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.11390030011534691


In [13]:
67 / 0.11

609.0909090909091

In [14]:
name_series = pd.Series(np.random.choice(['adam', 'chang', 'eliza', 'odom'], replace=True, size=100000))
dname = pd.DataFrame(name_series, columns=["name"])
def parse_name_pp(name):
    if name.lower().startswith('c'):
        return name
    elif name.lower().startswith('a'):
        return 'A'
    elif name.lower().startswith('e'):
        return 'E'
    elif name.lower().startswith('o'):
        return 'O'
    elif name.lower().startswith('i'):
        return 'I'
    elif name.lower().startswith('u'):
        return 'U'
    return name

def parse_name_ppr(x):
    if x["name"].lower().startswith('c'):
        return str(x["name"])
    elif x["name"].lower().startswith('a'):
        return 'A'
    elif x["name"].lower().startswith('e'):
        return 'E'
    elif x["name"].lower().startswith('o'):
        return 'O'
    elif x["name"].lower().startswith('i'):
        return 'I'
    elif x["name"].lower().startswith('u'):
        return 'U'
    return str(x["name"])

In [15]:
dname_series = dd.from_pandas(dname, npartitions=16)

In [16]:
dname_series.partitions[0].tail()

Unnamed: 0,name
6245,odom
6246,chang
6247,chang
6248,adam
6249,adam


In [17]:
len(dname_series.partitions[0])
    

6250

In [18]:
def apply_myfunc_to_S(df): return df.apply(lambda row: parse_name_ppr(row), axis=1)

In [74]:
# def pandas_apply_2(): return parse_name_pp(name_series)
# def dask_apply_2(): return dname_series.map_partitions(apply_myfunc_to_S).compute(scheduler='threads')  

In [19]:
%timeit name_series.apply(parse_name_pp)

140 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [20]:
%timeit dname_series.map_partitions(apply_myfunc_to_S, meta=("name",str)).compute(scheduler='threads')  

11.4 s ± 166 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [21]:
# Benchmark the Pandarallel approach
def pandarallel_benchmark():
    dname.parallel_apply(parse_name_ppr, axis=1)

In [22]:
pandarallel_time = timeit.timeit(pandarallel_benchmark, number=1)
pandarallel_time

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=12500), Label(value='0 / 12500')))…

4.0005551003851

In [23]:
4000 / 140

28.571428571428573