In [1]:
%load_ext autoreload
%autoreload 2
import pandas as pd
import time
from pandarallel import pandarallel
import math
import numpy as np

# Initialize pandarallel

In [2]:
pandarallel.initialize()

INFO: Pandarallel will run on 32 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


# DataFrame.apply

In [3]:
df_size = int(5e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=np.random.rand(df_size)))

In [4]:
def func(x):
    return math.sin(x.a**2) + math.sin(x.b**2)

In [5]:
%%time
res = df.apply(func, axis=1)

CPU times: user 1min 19s, sys: 1.6 s, total: 1min 21s
Wall time: 1min 21s


In [6]:
%%time
res_parallel = df.parallel_apply(func, axis=1)

CPU times: user 128 ms, sys: 252 ms, total: 380 ms
Wall time: 7.67 s


In [10]:
res.equals(res_parallel)

True

# DataFrame.applymap

In [11]:
df_size = int(1e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=np.random.rand(df_size)))

In [12]:
def func(x):
    return math.sin(x**2) - math.cos(x**2)

In [13]:
%%time
res = df.applymap(func)

CPU times: user 13 s, sys: 1.35 s, total: 14.4 s
Wall time: 14.4 s


In [14]:
%%time
res_parallel = df.parallel_applymap(func)

CPU times: user 128 ms, sys: 560 ms, total: 688 ms
Wall time: 1.81 s


In [15]:
res.equals(res_parallel)

True

# DataFrame.groupby.apply

In [16]:
df_size = int(3e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),
                       b=np.random.rand(df_size)))

In [17]:
def func(df):
    dum = 0
    for item in df.b:
        dum += math.log10(math.sqrt(math.exp(item**2)))
        
    return dum / len(df.b)

In [18]:
%%time
res = df.groupby("a").apply(func)

CPU times: user 16.8 s, sys: 4.6 s, total: 21.4 s
Wall time: 21.4 s


In [19]:
%%time
res_parallel = df.groupby("a").parallel_apply(func)

CPU times: user 4.84 s, sys: 6.18 s, total: 11 s
Wall time: 12.3 s


In [20]:
res.equals(res_parallel)

True

# DataFrame.groupby.rolling.apply

In [21]:
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),
                       b=np.random.rand(df_size)))

In [22]:
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

In [23]:
%%time
res = df.groupby('a').b.rolling(4).apply(func, raw=False)

CPU times: user 46.7 s, sys: 88 ms, total: 46.8 s
Wall time: 46.8 s


In [24]:
%%time
res_parallel = df.groupby('a').b.rolling(4).parallel_apply(func, raw=False)

CPU times: user 720 ms, sys: 356 ms, total: 1.08 s
Wall time: 4.99 s


In [25]:
res.equals(res_parallel)

True

# DataFrame.groupby.expanding.apply

In [26]:
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),
                       b=np.random.rand(df_size)))

In [27]:
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

In [28]:
%%time
res = df.groupby('a').b.expanding(4).apply(func, raw=False)

CPU times: user 45.9 s, sys: 64 ms, total: 45.9 s
Wall time: 45.9 s


In [29]:
%%time
res_parallel = df.groupby('a').b.expanding(4).parallel_apply(func, raw=False)

CPU times: user 680 ms, sys: 332 ms, total: 1.01 s
Wall time: 5.02 s


# Series.map

In [30]:
df_size = int(5e7)
df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1))

In [31]:
def func(x):
    return math.log10(math.sqrt(math.exp(x**2)))

In [32]:
%%time
res = df.a.map(func)

CPU times: user 26.4 s, sys: 5.49 s, total: 31.9 s
Wall time: 31.8 s


In [33]:
%%time
res_parallel = df.a.parallel_map(func)

CPU times: user 236 ms, sys: 1.84 s, total: 2.08 s
Wall time: 4.63 s


In [34]:
res.equals(res_parallel)

True

# Series.apply

In [35]:
df_size = int(3.5e7)
df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1))

In [36]:
def func(x, power, bias=0):
    return math.log10(math.sqrt(math.exp(x**power))) + bias

In [37]:
%%time
res = df.a.apply(func, args=(2,), bias=3)

CPU times: user 28.3 s, sys: 2.68 s, total: 31 s
Wall time: 31 s


In [38]:
%%time
res_parallel = df.a.parallel_apply(func, args=(2,), bias=3)

CPU times: user 212 ms, sys: 952 ms, total: 1.16 s
Wall time: 4 s


In [39]:
res.equals(res_parallel)

True

# Series.rolling.apply

In [40]:
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=list(range(df_size))))

In [41]:
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

In [42]:
%%time
res = df.b.rolling(4).apply(func, raw=False)

CPU times: user 44.4 s, sys: 28 ms, total: 44.4 s
Wall time: 44.4 s


In [45]:
%%time
res_parallel = df.b.rolling(4).parallel_apply(func, raw=False)

CPU times: user 76 ms, sys: 152 ms, total: 228 ms
Wall time: 3.94 s


In [46]:
res.equals(res_parallel)

True

# CountDownLatch

In [59]:
import threading

def process(*a):
    """
    input parameters with dynamic length
    """
    print("process in thread {} - taskId {}".format(a[0], a[1]))

# https://stackoverflow.com/questions/10236947/does-python-have-a-similar-control-mechanism-to-javas-countdownlatch
class CountDownLatch(object):
    def __init__(self, count=1):
        self.count = count
        self.lock = threading.Condition()

    def count_down(self, *a):
        self.lock.acquire()
        self.count -= 1
        process(*a)
        if self.count <= 0:
            self.lock.notifyAll()
        self.lock.release()

    def wait(self):
        self.lock.acquire()
        while self.count > 0:
            self.lock.wait()
        self.lock.release()

In [61]:
import multiprocessing, threading
tasks = range(100)
latch = CountDownLatch(len(tasks))

def start_process(taskId):
    latch.count_down(threading.get_ident(), taskId)
pool = multiprocessing.pool.ThreadPool(processes=32)

print("start")
pool.map(start_process, tasks)
latch.wait()
print("finished")

start
process in thread 139809686427392 - taskId 0
process in thread 139809686427392 - taskId 1
process in thread 139809716668160 - taskId 2
process in thread 139809716668160 - taskId 5
process in thread 139809699882752 - taskId 4
process in thread 139809686427392 - taskId 3
process in thread 139809686427392 - taskId 11
process in thread 139809678034688 - taskId 7
process in thread 139809699882752 - taskId 8
process in thread 139809708275456 - taskId 9
process in thread 139809708275456 - taskId 18
process in thread 139809716668160 - taskId 6
process in thread 139809716668160 - taskId 22
process in thread 139809678034688 - taskId 13
process in thread 139809661249280 - taskId 14
process in thread 139809652856576 - taskId 15
process in thread 139809699882752 - taskId 16
process in thread 139809699882752 - taskId 35
process in thread 139809669641984 - taskId 10
process in thread 139809708275456 - taskId 19
process in thread 139809708275456 - taskId 40
process in thread 139809708275456 - ta