# Embarassingly Parallel For Loops in Python: Joblib Demo

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import datetime, itertools, shutil, os, time, math
import numpy as np
from joblib import Parallel, delayed
from joblib.externals.loky import get_reusable_executor

### Single For Loop

In [3]:
def do_work(i, out_memmap:np.memmap = None):
    out = []
    for j in range(M):
        temp = i+j
        temp = temp**2
        temp = math.sqrt(temp)
        
        if out_memmap is None: out.append(temp)
        else: out_memmap[i,j] = temp
    return out

def parfor(func, iterable, n_jobs=32, use_memmap=True, memmap_folder=None, out_shape=(None,None)):
    if use_memmap:
        if not memmap_folder:
            memmap_folder = os.getcwd()+"/memmap"
        if not os.path.exists(memmap_folder):
            os.makedirs(memmap_folder)
        out_memmap = np.memmap(memmap_folder+"/curr_memmap", dtype='float', mode='w+', shape=out_shape)
        print("Storing memmap in: ", memmap_folder)

        _ = Parallel(n_jobs=n_jobs, verbose=0)(delayed(func)(item, out_memmap) for item in iterable)

        try:
            shutil.rmtree(memmap_folder)
        except:
            print('Could not clean-up automatically.')

        return out_memmap
    else:
        unstruct_out = Parallel(n_jobs=n_jobs, verbose=0)(delayed(func)(item) for item in iterable)
        out = np.array(unstruct_out).reshape(out_shape)
        return out

N, M = 100000, 100
tic = datetime.datetime.now()
out_memmap = parfor(do_work, range(N), n_jobs=32, use_memmap=True, out_shape=(N,M))
print("[PARALLEL MEMMAP: SINGLE FOR] Time elapsed: ", datetime.datetime.now() - tic)

tic = datetime.datetime.now()
out = parfor(do_work, range(N), n_jobs=32, use_memmap=False, out_shape=(N,M))
print("[PARALLEL: SINGLE FOR] Time elapsed: ", datetime.datetime.now() - tic)

out_seq = np.zeros((N,M))
tic = datetime.datetime.now()
for i in range(N):
    for j in range(M):
        temp = i+j
        temp = temp**2
        temp = math.sqrt(temp)
        out_seq[i,j] = temp
        # time.sleep(0.005)

print("[SEQUENTIAL] Time elapsed: ", datetime.datetime.now() - tic)


Storing memmap in:  /data/home/rgura001/pyparfor/memmap
[PARALLEL MEMMAP: SINGLE FOR] Time elapsed:  0:00:03.234038
[PARALLEL: SINGLE FOR] Time elapsed:  0:00:09.229952
[SEQUENTIAL] Time elapsed:  0:00:07.633385


### [WORK IN PROGRESS] Nested Loops

In [None]:
# def do_work(i,j, res2):
#     temp = i+j
#     temp = temp**2
#     res2[i,j] = temp
#     # time.sleep(0.005)

# memmap_folder = "/home/rgura001/pyparfor/memmap"
# if not os.path.exists(memmap_folder): os.makedirs(memmap_folder)
# res_nested_par_memmap = np.memmap(memmap_folder+"/res_nested_par_memmap", dtype='float', mode='w+', shape=(N,M))
# tic = datetime.datetime.now()
# out = Parallel(n_jobs=32, verbose=1)(delayed(do_work)(i,j, res_nested_par_memmap) for (i,j) in list(itertools.product(range(N), range(M))))
# print("[PARALLEL MEMMAP: NESTED FOR] Time elapsed: ", datetime.datetime.now() - tic)

# try:
#     shutil.rmtree(memmap_folder)
# except:
#     print('Could not clean-up automatically.')

# get_reusable_executor().shutdown(wait=True)

# def do_work(i,j):
#     out = []
#     temp = i+j
#     temp = temp**2
#     out.append(temp)
#     # time.sleep(0.005)
#     return out

# tic = datetime.datetime.now()
# out = Parallel(n_jobs=32, verbose=1)(delayed(do_work)(i,j) for (i,j) in list(itertools.product(range(N), range(M))))
# res_nested_par = np.reshape(np.array(out), (N,M))
# print("[PARALLEL: NESTED FOR] Time elapsed: ", datetime.datetime.now() - tic)

# get_reusable_executor().shutdown(wait=True)

### [WORK IN PROGRESS] Packaging Neatly

In [12]:
# class PyParFor:
#     def __init__(self, n_jobs=32, use_memmap=True, memmap_folder=None, out_shape=(None,None)):
#         self.n_jobs = n_jobs
#         self.use_memmap = use_memmap
#         self.memmap_folder = memmap_folder
#         self.out_shape = out_shape

#     def __enter__(self):
#         if self.use_memmap:
#             if not self.memmap_folder:
#                 self.memmap_folder = os.getcwd()+"/memmap"
#             if not os.path.exists(self.memmap_folder):
#                 os.makedirs(self.memmap_folder)
#             print("Storing memmap at: ", self.memmap_folder)

#     def __exit__(self, exc_type, exc_val, exc_tb):
#         try:
#             shutil.rmtree(self.memmap_folder)
#         except:
#             print('Could not clean-up automatically.')
        
#         get_reusable_executor().shutdown(wait=True)

#     def __call__(self, func, iterable):
#         if self.use_memmap:
#             out_memmap = np.memmap(self.memmap_folder + "/out_memmap", dtype='float', mode='w+', shape=self.out_shape)
#             Parallel(n_jobs=self.n_jobs, verbose=0)(delayed(func)(item, out_memmap) for item in iterable)
#         else:
#             out = Parallel(n_jobs=self.n_jobs, verbose=0)(delayed(func)(item) for item in iterable)
#             out = np.array(out).reshape(self.out_shape)
        
#         if self.use_memmap:
#             return out_memmap
#         else:
#             return out

In [None]:
# import os, datetime

# from numpy import memmap

# N, M = 1000, 100

# def do_work(i, out_memmap) -> np.ndarray:
#     for j in range(M):
#         temp = i+j
#         temp = temp**2
#         temp = math.sqrt(temp)
#         out_memmap[i,j] = temp

# with PyParFor(n_jobs=32, use_memmap=True, out_shape=(N,M)) as pyparfor:
#     tic = datetime.datetime.now()
#     out = pyparfor(do_work, range(N))
#     print("[PARALLEL: SINGLE FOR] Time elapsed: ", datetime.datetime.now() - tic)