# MultiProcessing

In [1]:
import pandas as pd
import numpy as np
import multiprocessing as mp
import os
import gc

In [2]:
fldrs = ["data/csv/atmc10", "data/parq/atmc10",
         "data/csv/atmc50", "data/parq/atmc50"]

In [3]:
def parallelize(fun, vec, pool):
    with mp.Pool(pool) as p:
        res = p.map(fun, vec)
    return(res)


def funCSV(x):
    df = pd.read_csv(x)
    return [df["key"].unique()[0], df["values"].mean()]


def funPARQ(x):
    df = pd.read_parquet(x)
    return [df["key"].unique()[0], df["values"].mean()]

ncpu = os.cpu_count()

In [11]:
ncpu

16

## csv

In [6]:
%%timeit -n5
fldr = fldrs[0]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funCSV, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

3.24 s ± 56.8 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


In [7]:
%%timeit -n5
fldr = fldrs[0]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funCSV, files, ncpu)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

2.53 s ± 114 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


In [8]:
%%timeit -n5
fldr = fldrs[2]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funCSV, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] = out["key"].astype(str)
out = out.sort_values("key").set_index("key")

15.8 s ± 576 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


In [9]:
%%timeit -n5
fldr = fldrs[2]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funCSV, files, ncpu)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] = out["key"].astype(str)
out = out.sort_values("key").set_index("key")

11.6 s ± 94.4 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


## parquet

In [4]:
%%timeit -n5
fldr = fldrs[1]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funPARQ, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

4.82 s ± 83.4 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


In [5]:
%%timeit -n5
fldr = fldrs[1]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funPARQ, files, ncpu)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

3.33 s ± 89.8 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


In [6]:
%%timeit -n5
fldr = fldrs[3]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funPARQ, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

24.5 s ± 303 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


In [4]:
%%timeit -n5
fldr = fldrs[3]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funPARQ, files, ncpu)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

15.7 s ± 276 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


# Multiprocessing + Dask

In [1]:
import pandas as pd
import numpy as np
import multiprocessing as mp
import os
import gc

def parallelize(fun, vec, pool):
    with mp.Pool(pool) as p:
        res = p.map(fun, vec)
    return(res)


def funCSV(x):
    df = pd.read_csv(x)
    return [df["key"].unique()[0], df["values"].mean()]


def funPARQ(x):
    df = pd.read_parquet(x)
    return [df["key"].unique()[0], df["values"].mean()]

ncpu = os.cpu_count()

fldrs = ["data/csv/split10", "data/parq/split10",
         "data/csv/split50", "data/parq/split50"]

## 10M

### csv

In [2]:
%%timeit -n5
fldr = fldrs[0]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funCSV, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

823 ms ± 28.7 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


### parquet

In [3]:
%%timeit -n5
fldr = fldrs[1]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funPARQ, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

846 ms ± 32.8 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


## 50M
### csv

In [4]:
%%timeit -n5
fldr = fldrs[2]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funCSV, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

2.39 s ± 38 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


### parquet

In [5]:
%%timeit -n5
fldr = fldrs[3]
files = [os.path.join(fldr, x) for x in os.listdir(fldr)]
out = parallelize(funPARQ, files, ncpu//2)
out = pd.DataFrame(out, columns=["key","values"])
out["key"] =  out["key"].astype(str)
out = out.sort_values("key").set_index("key")

2.18 s ± 21 ms per loop (mean ± std. dev. of 7 runs, 5 loops each)


# Pandas

In [1]:
import pandas as pd

## 10M

### csv

In [2]:
%%time
df = pd.read_csv("data/file10SRTD.csv")
out = df.groupby("key")["values"].mean()

CPU times: user 4.03 s, sys: 631 ms, total: 4.66 s
Wall time: 3.02 s


In [2]:
%%time
df = pd.read_csv("data/file10SHFFL.csv")
out = df.groupby("key")["values"].mean()

CPU times: user 4.94 s, sys: 625 ms, total: 5.56 s
Wall time: 3.94 s


### parquet

In [2]:
%%time
df = pd.read_parquet("data/file10SRTD.parq")
out = df.groupby("key")["values"].mean()

CPU times: user 3.18 s, sys: 949 ms, total: 4.13 s
Wall time: 2.59 s


In [2]:
%%time
df = pd.read_parquet("data/file10SHFFL.parq")
out = df.groupby("key")["values"].mean()

CPU times: user 3.45 s, sys: 1.08 s, total: 4.53 s
Wall time: 2.99 s


## 50M

### csv

In [2]:
%%time
df = pd.read_csv("data/file50SRTD.csv")
out = df.groupby("key")["values"].mean()

CPU times: user 14.5 s, sys: 2.6 s, total: 17.1 s
Wall time: 15.2 s


In [2]:
%%time
df = pd.read_csv("data/file50SHFFL.csv")
out = df.groupby("key")["values"].mean()

CPU times: user 33.2 s, sys: 2.7 s, total: 35.9 s
Wall time: 34 s


### parquet

In [3]:
%%time
df = pd.read_parquet("data/file50SRTD.parq")
out = df.groupby("key")["values"].mean()

CPU times: user 9.88 s, sys: 5.7 s, total: 15.6 s
Wall time: 13.3 s


In [2]:
%%time
df = pd.read_parquet("data/file50SHFFL.parq")
out = df.groupby("key")["values"].mean()

CPU times: user 15.3 s, sys: 5.37 s, total: 20.7 s
Wall time: 18.5 s


# Dask

In [1]:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

In [2]:
cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:37609  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 63.32 GB


In [3]:
fldrs = ["data/csv/atmc10", "data/parq/atmc10",
         "data/csv/atmc50", "data/parq/atmc50"]

## 10M

In [4]:
%%time
df = dd.read_csv(fldrs[0]+"/*.csv")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 43.1 s, sys: 1.6 s, total: 44.7 s
Wall time: 44.8 s


In [7]:
client.restart()

0,1
Client  Scheduler: tcp://127.0.0.1:41195  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 63.32 GB


In [8]:
%%time
df = dd.read_parquet(fldrs[1]+"/*.parq")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 27.3 s, sys: 1.57 s, total: 28.8 s
Wall time: 28.9 s


## 50M

In [9]:
client.restart()

0,1
Client  Scheduler: tcp://127.0.0.1:41195  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 63.32 GB


In [4]:
%%time
df = dd.read_csv(fldrs[2]+"/*.csv")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 3min 36s, sys: 7.64 s, total: 3min 44s
Wall time: 3min 42s


In [11]:
client.restart()

0,1
Client  Scheduler: tcp://127.0.0.1:41195  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 63.32 GB


In [4]:
%%time
df = dd.read_parquet(fldrs[3]+"/*.parq")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 2min 15s, sys: 7.49 s, total: 2min 23s
Wall time: 2min 23s


# Dask Map Partitions

In [1]:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

fldrs = ["data/csv/atmc10", "data/parq/atmc10",
         "data/csv/atmc50", "data/parq/atmc50"]

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:37229  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 63.32 GB


## 10M

In [2]:
%%time
df = dd.read_csv(fldrs[0]+"/*.csv")
out = df.map_partitions(lambda x:[x["key"].unique()[0], x["values"].mean()]).compute()

CPU times: user 49.3 s, sys: 2.79 s, total: 52.1 s
Wall time: 50.4 s


In [3]:
%%time
df = dd.read_parquet(fldrs[1]+"/*.parq")
out = df.map_partitions(lambda x:[x["key"].unique()[0], x["values"].mean()]).compute()

CPU times: user 31.5 s, sys: 2.29 s, total: 33.8 s
Wall time: 32.5 s


## 50M

In [2]:
%%time
df = dd.read_csv(fldrs[2]+"/*.csv")
out = df.map_partitions(lambda x:[x["key"].unique()[0], x["values"].mean()]).compute()

CPU times: user 4min 13s, sys: 13.9 s, total: 4min 27s
Wall time: 4min 17s


In [2]:
%%time
df = dd.read_parquet(fldrs[3]+"/*.parq")
out = df.map_partitions(lambda x:[x["key"].unique()[0], x["values"].mean()]).compute()

CPU times: user 2min 45s, sys: 11.7 s, total: 2min 57s
Wall time: 2min 48s


## 50M chuncks

In [1]:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

fldrs = ["data/csv/split10", "data/parq/split10",
         "data/csv/split50", "data/parq/split50"]

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:37537  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 63.32 GB


In [2]:
%%time
df = dd.read_csv(fldrs[0]+"/*.csv")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 328 ms, sys: 67.3 ms, total: 396 ms
Wall time: 1.46 s


In [2]:
%%time
df = dd.read_parquet(fldrs[1]+"/*.parq")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 379 ms, sys: 63.1 ms, total: 442 ms
Wall time: 1.34 s


## 50M

In [2]:
%%time
df = dd.read_csv(fldrs[2]+"/*.csv")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 801 ms, sys: 97.4 ms, total: 899 ms
Wall time: 3.95 s


In [2]:
%%time
df = dd.read_parquet(fldrs[3]+"/*.parq")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 715 ms, sys: 138 ms, total: 853 ms
Wall time: 3.87 s


In [2]:
%%time
df = dd.read_parquet(fldrs[3]+"/*.parq")
out = df.groupby("key")["values"].mean()\
        .compute()

CPU times: user 737 ms, sys: 101 ms, total: 838 ms
Wall time: 3.58 s
