In [1]:
#https://www.geeksforgeeks.org/difference-between-multitasking-multithreading-and-multiprocessing/
import pandas as pd
import time
import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing import cpu_count
import numpy as np
from dask import dataframe as dd
from dask.multiprocessing import get

In [2]:
#import the dataset
dataframe = pd.read_csv('/Users/lindseyclark/Documents/formula_1_project/formula-1-race-data-19502017/lapTimes.csv')

In [3]:
#describe the dataset
dataframe.describe()

Unnamed: 0,raceId,driverId,lap,position,milliseconds
count,426633.0,426633.0,426633.0,426633.0,426633.0
mean,423.120853,186.503817,29.827172,9.646896,95802.22
std,381.542422,316.123374,18.402245,5.580664,74755.41
min,1.0,1.0,1.0,1.0,67411.0
25%,100.0,14.0,14.0,5.0,82382.0
50%,205.0,26.0,29.0,9.0,90800.0
75%,881.0,71.0,44.0,14.0,102738.0
max,988.0,843.0,78.0,24.0,7507547.0


In [4]:
dataframe

Unnamed: 0,raceId,driverId,lap,position,time,milliseconds
0,841,20,1,1,1:38.109,98109
1,841,20,2,1,1:33.006,93006
2,841,20,3,1,1:32.713,92713
3,841,20,4,1,1:32.803,92803
4,841,20,5,1,1:32.342,92342
...,...,...,...,...,...,...
426628,988,825,50,13,1:43.928,103928
426629,988,825,51,13,1:44.138,104138
426630,988,825,52,13,1:43.934,103934
426631,988,825,53,13,1:44.164,104164


In [5]:
dataframe.head(10)

Unnamed: 0,raceId,driverId,lap,position,time,milliseconds
0,841,20,1,1,1:38.109,98109
1,841,20,2,1,1:33.006,93006
2,841,20,3,1,1:32.713,92713
3,841,20,4,1,1:32.803,92803
4,841,20,5,1,1:32.342,92342
5,841,20,6,1,1:32.605,92605
6,841,20,7,1,1:32.502,92502
7,841,20,8,1,1:32.537,92537
8,841,20,9,1,1:33.240,93240
9,841,20,10,1,1:32.572,92572


In [6]:
#define a function to perform a transformation on the dataset, here is splitting a column
def create_split_cols(dataframe):
    dataframe2 = dataframe.time.apply(lambda x: pd.Series(str(x).split(".")))
    return dataframe2

# Without Multiprocess

In [7]:
#without multiprocessing
start_time = time.time()
dataframe2 = create_split_cols(dataframe)
end_time = time.time()
time_elapsed = end_time-start_time
print('The runtime without multiprocess is %s seconds' %(time_elapsed))

The runtime without multiprocess is 88.06641411781311 seconds


# With Multiprocess

In [8]:
split_process = mp.Process(target=create_split_cols, args=(dataframe,))

In [9]:
start_time = time.time()
split_process.start()
split_process.join()
split_process.terminate()
end_time = time.time()
time_elapsed = end_time-start_time
print('The runtime with multiprocess is %s seconds' %(time_elapsed))

The runtime with multiprocess is 92.99149179458618 seconds


# With Multiprocess and Pool

In [None]:
#http://www.racketracer.com/2016/07/06/pandas-in-parallel/
#The Process class sends each task to a different processor, 
#and the Pool class sends sets of tasks to different processors.
#https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

In [10]:
num_partitions = 10 #number of partitions to split dataframe
num_cores = 4 #number of cores on your machine

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [11]:
start_time = time.time()
parallelize_dataframe(dataframe, create_split_cols)
end_time = time.time()
time_elapsed = end_time-start_time
print('The runtime with multiprocess and pool is %s seconds' %(time_elapsed))

The runtime with multiprocess and pool is 29.389177799224854 seconds


# Dask

In [12]:
dataframe_dask = dd.from_pandas(dataframe, npartitions=8)

In [18]:
def compute_mean_dask():
    return dataframe_dask.milliseconds.mean()#.compute()
def compute_mean_pandas():
    return dataframe.milliseconds.mean()

def compute_max_dask():
    return dataframe_dask.milliseconds.max()#.compute()
def compute_max_pandas():
    return dataframe.milliseconds.max()

In [19]:
%time compute_mean_dask()

CPU times: user 1.65 ms, sys: 5 µs, total: 1.66 ms
Wall time: 1.67 ms


dd.Scalar<series-..., dtype=float64>

In [20]:
%time compute_mean_pandas()

CPU times: user 1.45 ms, sys: 620 µs, total: 2.07 ms
Wall time: 1.01 ms


95802.22103540982

In [21]:
%time compute_max_dask()

CPU times: user 992 µs, sys: 25 µs, total: 1.02 ms
Wall time: 1.01 ms


dd.Scalar<series-..., dtype=int64>

In [22]:
%time compute_max_pandas()

CPU times: user 1.11 ms, sys: 457 µs, total: 1.57 ms
Wall time: 768 µs


7507547

In [None]:
#https://github.com/dask/dask-tutorial/blob/master/04_dataframe.ipynb