Learning Objective:

* Understand terms multiprocessing, multithreading, and multitasking
* Demonstration of conversion of pandas dataframes to dask dataframes and application of pandas methods to dask datafames
* Understand why dask might not be preferable to pandas


https://www.geeksforgeeks.org/difference-between-multitasking-multithreading-and-multiprocessing/

Multiprogramming – A computer running more than one program at a time (like running Excel and Firefox simultaneously).

Multiprocessing – A computer using more than one CPU at a time.

Multitasking – Tasks sharing a common resource (like 1 CPU).

Multithreading is an extension of multitasking.

In [1]:
import pandas as pd
import time
import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing import cpu_count
import numpy as np
from dask import dataframe as dd
from dask.multiprocessing import get

In [2]:
#import the dataset
dataframe = pd.read_csv('/Users/lindseyclark/Documents/formula_1_project/formula-1-race-data-19502017/lapTimes.csv')

In [5]:
dataframe.head(10)

Unnamed: 0,raceId,driverId,lap,position2,time,milliseconds
0,841,20,1,1,01:38.1,98109
1,841,20,2,1,01:33.0,93006
2,841,20,3,1,01:32.7,92713
3,841,20,4,1,01:32.8,92803
4,841,20,5,1,01:32.3,92342
5,841,20,6,1,01:32.6,92605
6,841,20,7,1,01:32.5,92502
7,841,20,8,1,01:32.5,92537
8,841,20,9,1,01:33.2,93240
9,841,20,10,1,01:32.6,92572


In [6]:
#define a function to perform a transformation on the dataset, here is splitting a column
def create_split_cols(dataframe):
    dataframe2 = dataframe.time.apply(lambda x: pd.Series(str(x).split(".")))
    return dataframe2

# Without Multiprocess

In [7]:
#without multiprocessing
start_time = time.time()
dataframe2 = create_split_cols(dataframe)
end_time = time.time()
time_elapsed = end_time-start_time
print('The runtime without multiprocess is %s seconds' %(time_elapsed))

The runtime without multiprocess is 88.06641411781311 seconds


# With Multiprocess

In [8]:
split_process = mp.Process(target=create_split_cols, args=(dataframe,))

In [9]:
start_time = time.time()
split_process.start()
split_process.join()
split_process.terminate()
end_time = time.time()
time_elapsed = end_time-start_time
print('The runtime with multiprocess is %s seconds' %(time_elapsed))

The runtime with multiprocess is 92.99149179458618 seconds


# With Multiprocess and Pool

In [None]:
#http://www.racketracer.com/2016/07/06/pandas-in-parallel/
#The Process class sends each task to a different processor, 
#and the Pool class sends sets of tasks to different processors.
#https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

In [10]:
num_partitions = 10 #number of partitions to split dataframe
num_cores = 4 #number of cores on your machine

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [11]:
start_time = time.time()
parallelize_dataframe(dataframe, create_split_cols)
end_time = time.time()
time_elapsed = end_time-start_time
print('The runtime with multiprocess and pool is %s seconds' %(time_elapsed))

The runtime with multiprocess and pool is 29.389177799224854 seconds


# Dask

In [7]:
dataframe_dask = dd.from_pandas(dataframe, npartitions=8)

In [8]:
def compute_mean_dask():
    return dataframe_dask.milliseconds.mean()
def compute_mean_pandas():
    return dataframe.milliseconds.mean()

def compute_max_dask():
    return dataframe_dask.milliseconds.max()
def compute_max_pandas():
    return dataframe.milliseconds.max()

In [9]:
%time compute_mean_dask()

CPU times: user 4.37 ms, sys: 192 µs, total: 4.56 ms
Wall time: 4.54 ms


dd.Scalar<series-..., dtype=float64>

In [10]:
%time compute_mean_pandas()

CPU times: user 2.07 ms, sys: 797 µs, total: 2.86 ms
Wall time: 1.4 ms


95802.22103540982

In [11]:
%time compute_max_dask()

CPU times: user 1.33 ms, sys: 26 µs, total: 1.36 ms
Wall time: 1.35 ms


dd.Scalar<series-..., dtype=int64>

In [12]:
%time compute_max_pandas()

CPU times: user 1.45 ms, sys: 583 µs, total: 2.04 ms
Wall time: 986 µs


7507547

In [None]:
#https://github.com/dask/dask-tutorial/blob/master/04_dataframe.ipynb