In [2]:
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get 
from gensim.models.doc2vec import TaggedDocument
from nltk.tokenize import word_tokenize
import time
import multiprocessing
from functools import partial
import numpy as np


# Pools and Processes

In [3]:
cores = multiprocessing.cpu_count()

In [4]:
def square_function(lst):
    arr = np.zeros_like(lst)
    for i in range(lst.shape[0]):
        for j in range(lst.shape[1]):
            arr[i][j] = lst[i][j] ** 2
    return arr

array = np.random.randint(1, 9, (2**10, 10000))
data = np.array_split(array, cores)

# without multiprocessing

In [6]:
%%timeit
res =[]
for arr in data:
    res.append(square_function(arr))
    

9.68 s ± 175 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Pool

In [8]:
from multiprocessing import Pool, Process

In [34]:
%%timeit
with Pool(cores) as p:
    res = p.map(square_function, data)
    p.close()
    p.join()

1.8 s ± 41.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
%%timeit
processes = []
for i in range(cores):
    p = Process(target=square_function, args=(data[i],))
    processes.append(p)
    p.start()
for p in processes:
    p.join()

1.32 s ± 6.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Dask

In [11]:
n = 30000
data = pd.read_csv("/usr/share/data_quality_poc/data.csv").head(n)

  interactivity=interactivity, compiler=compiler, result=result)


In [3]:
def get_tagged_doc(row):
    return TaggedDocument(words=word_tokenize(row['features']),tags=[row['outcome']])


In [4]:
def tag_data_dask(data):
    ddata = dd.from_pandas(data,npartitions=60)
    return ddata.map_partitions(lambda df:df.apply(lambda row:get_tagged_doc(row),axis=1)).compute(scheduler='processes')



In [5]:
def tag_data_pandas(data):
    return data.apply(lambda row:get_tagged_doc(row),axis=1)

In [6]:
def tag_for_loop(data):
    res =[]
    for index,row in data.iterrows():
        res.append(get_tagged_doc(row))
    return res


In [56]:
start = time.time()
tag_for_loop(data)
print(time.time() - start)

24.968457460403442


In [54]:
start = time.time()
tag_data_pandas(data)
print(time.time() - start)

20.084665536880493


In [8]:
# %timeit
start = time.time()
tag_data_dask(data)
print(time.time() - start)

17.158838033676147


# Numba
# Dask on a cluster
# Spark
# Containerize and run on a cluster