In [1]:
# Import pandas and numpy, and create our test dataframe (100k rows, random normal)
import pandas as pd
import numpy as np

df = pd.DataFrame(np.random.randn(100000, 2), columns=['A','B'])

In [3]:
# The naive implementation of our calculation function.
def intensive_calculation(row):
    x = row['A'] + row['B']
    return x * (row['A'] * row['A'] / row['B'] * row['B'])

In [4]:
%%timeit
# Running our naive implementation and measuring the time.
df['C'] = df.apply(intensive_calculation, axis=1)

1 loop, best of 3: 11.4 s per loop


In [21]:
%load_ext cython

In [23]:
%%cython
# Inline Cython implementation
cpdef double intensive_calc_cython(double a, double b) except *:
    cdef double x

    x = a + b

    return x * (a * a / b * b)

In [6]:
%%timeit
# Running our Cython implementation
import pyximport
pyximport.install()

from helper import intensive_calc_cython

df['C'] = df.apply(lambda row: intensive_calc_cython(row['A'], row['B']), axis=1)

1 loop, best of 3: 4.46 s per loop


In [15]:
from datetime import datetime
start_ts = datetime.now()

# Running our naive implementation with multiprocessing
from multiprocessing import Pool
from multiprocessing import cpu_count

def parallelize(df, func):
    """
    This function splits our dataframe and performs the passed function on each split, then combines.
    """
    
    num_cores = cpu_count()

    df_splits = np.array_split(df, num_cores)
    pool = Pool(num_cores)

    out_df = pd.concat(pool.map(func, df_splits))

    pool.close()
    pool.join()

    return out_df


def intensive_calc_wrapper(df):
    """
    This is the actual function that is being applied.
    """
    return df.apply(lambda row: intensive_calculation(row), axis=1)

df['C'] = parallelize(df, intensive_calc_wrapper)

print "1 loop: " + str((datetime.now() - start_ts).total_seconds()) + "s per loop."

1 loop: 3.131078s per loop.


In [14]:
from datetime import datetime
start_ts = datetime.now()

# Running our Cython implementation with multiprocessing
import pyximport
pyximport.install()

from helper import intensive_calc_cython

from multiprocessing import Pool
from multiprocessing import cpu_count

def parallelize(df, func):
    """
    This function splits our dataframe and performs the passed function on each split, then combines.
    """
    
    num_cores = cpu_count()

    df_splits = np.array_split(df, num_cores)
    pool = Pool(num_cores)

    out_df = pd.concat(pool.map(func, df_splits))

    pool.close()
    pool.join()

    return out_df


def intensive_calc_wrapper(df):
    """
    This is the actual function that is being applied.
    """
    return df.apply(lambda row: intensive_calc_cython(row['A'], row['B']), axis=1)

df['C'] = parallelize(df, intensive_calc_wrapper)

print "1 loop: " + str((datetime.now() - start_ts).total_seconds()) + "s per loop."

1 loop: 1.288652s per loop.


In [28]:
from datetime import datetime
start_ts = datetime.now()

# using Numba
from numba import jit, float32

# The Numba implementation of our calculation function.
@jit(float32(float32, float32), nopython=True, cache=True)
def intensive_calculation(a, b):
    x = a + b
    return x * (a * a / b * b)

df['C'] = df.apply(lambda row: intensive_calculation(row['A'], row['B']), axis=1)

print "1 loop: " + str((datetime.now() - start_ts).total_seconds()) + "s per loop."

1 loop: 5.293451s per loop.


In [16]:
%%timeit
# The fully vectorized time
df['C'] = (df['A'] + df['B']) * (df['A'] * df['A'] / df['B'] * df['B'])

100 loops, best of 3: 4.82 ms per loop
