## Parallelize functions using Joblib

In [1]:
from joblib import Parallel, delayed
import joblib
import pandas as pd
from tqdm import tqdm

## Download data

1. Go to the following Kaggle link:
https://www.kaggle.com/c/bengaliai-cv19/data?select=train_image_data_0.parquet

2. Download the following parquet file and copy it over to this repo `dataset` folder:
`train_image_data_0.parquet`

## Load parquet file

Make sure you have installed `pyarrow` and `fastparquet` as the backend for pandas:

`conda install pyarrow -n lang`

`conda install fastparquet -n lang`

Otherwise, you might encounter the following error:

`ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.`

In [2]:
df = pd.read_parquet('../dataset/train_image_data_0.parquet')

In [3]:
pd.io.parquet.get_engine('auto')

<pandas.io.parquet.PyArrowImpl at 0x12058b4a8>

## Data description

`(train).parquet`

Each parquet file contains tens of thousands of 137x236 grayscale images. The images have been provided in the parquet format for I/O and space efficiency. Each row in the parquet files contains an image_id column, and the flattened image.

See Kaggle page for more details:
https://www.kaggle.com/c/bengaliai-cv19/data?select=train_image_data_0.parquet

In [4]:
df.head(3)

Unnamed: 0,image_id,0,1,2,3,4,5,6,7,8,...,32322,32323,32324,32325,32326,32327,32328,32329,32330,32331
0,Train_0,254,253,252,253,251,252,253,251,251,...,253,253,253,253,253,253,253,253,253,251
1,Train_1,251,244,238,245,248,246,246,247,251,...,255,255,255,255,255,255,255,255,255,254
2,Train_2,251,250,249,250,249,245,247,252,252,...,254,253,252,252,253,253,253,253,251,249


## Get image ids

In [5]:
image_ids = df.image_id.values
df = df.drop("image_id", axis=1)
img_array = df.values

In [16]:
n = 10000
sampled_image_ids = image_ids[:n]

In [17]:
for i, img_id in tqdm(enumerate(sampled_image_ids), total=len(sampled_image_ids)):
    joblib.dump(img_array[i, :], "../dataset/image_pickles/{}.pkl".format(img_id))





  0%|          | 0/10000 [00:00<?, ?it/s][A[A[A[A



  0%|          | 24/10000 [00:00<00:42, 236.00it/s][A[A[A[A



  0%|          | 41/10000 [00:00<00:47, 209.48it/s][A[A[A[A



  1%|          | 59/10000 [00:00<00:49, 199.35it/s][A[A[A[A



  1%|          | 76/10000 [00:00<00:52, 188.01it/s][A[A[A[A



  1%|▏         | 126/10000 [00:00<00:42, 230.86it/s][A[A[A[A



  2%|▏         | 178/10000 [00:00<00:35, 276.54it/s][A[A[A[A



  2%|▏         | 228/10000 [00:00<00:30, 318.27it/s][A[A[A[A



  3%|▎         | 267/10000 [00:00<00:28, 336.52it/s][A[A[A[A



  3%|▎         | 306/10000 [00:00<00:27, 348.16it/s][A[A[A[A



  3%|▎         | 344/10000 [00:01<00:27, 354.18it/s][A[A[A[A



  4%|▍         | 382/10000 [00:01<00:27, 347.34it/s][A[A[A[A



  4%|▍         | 421/10000 [00:01<00:26, 357.65it/s][A[A[A[A



  5%|▍         | 468/10000 [00:01<00:24, 383.41it/s][A[A[A[A



  5%|▌         | 508/10000 [00:01<00:24, 384.34it/s][A[A

 54%|█████▍    | 5384/10000 [00:12<00:13, 330.87it/s][A[A[A[A



 54%|█████▍    | 5418/10000 [00:12<00:14, 318.55it/s][A[A[A[A



 55%|█████▍    | 5453/10000 [00:12<00:13, 327.19it/s][A[A[A[A



 55%|█████▌    | 5503/10000 [00:13<00:12, 364.44it/s][A[A[A[A



 56%|█████▌    | 5558/10000 [00:13<00:10, 404.84it/s][A[A[A[A



 56%|█████▌    | 5602/10000 [00:13<00:10, 401.84it/s][A[A[A[A



 56%|█████▋    | 5648/10000 [00:13<00:10, 415.84it/s][A[A[A[A



 57%|█████▋    | 5692/10000 [00:13<00:10, 420.02it/s][A[A[A[A



 57%|█████▋    | 5735/10000 [00:13<00:10, 414.59it/s][A[A[A[A



 58%|█████▊    | 5779/10000 [00:13<00:10, 420.74it/s][A[A[A[A



 58%|█████▊    | 5822/10000 [00:13<00:10, 417.52it/s][A[A[A[A



 59%|█████▊    | 5865/10000 [00:13<00:10, 408.99it/s][A[A[A[A



 59%|█████▉    | 5908/10000 [00:14<00:12, 328.94it/s][A[A[A[A



 60%|█████▉    | 5953/10000 [00:14<00:11, 356.53it/s][A[A[A[A



 60%|█████▉    | 5994/10000 [00:14

## Parallelization

In [19]:
def save_img(i, img_id, img_array):
    joblib.dump(img_array[i, :], "../dataset/image_pickles/{}.pkl".format(img_id))

In [24]:
Parallel(n_jobs=4, backend="multiprocessing")(delayed(save_img)(i, img_id, img_array) for i, img_id in tqdm(enumerate(sampled_image_ids), total=len(sampled_image_ids)))







  0%|          | 0/10000 [00:00<?, ?it/s][A[A[A[A[A[A




  1%|          | 108/10000 [01:11<06:10, 26.70it/s][A[A[A[A[A





  0%|          | 8/10000 [00:30<10:24:55,  3.75s/it][A[A[A[A[A[A





  0%|          | 12/10000 [00:30<7:20:30,  2.65s/it][A[A[A[A[A[A





  0%|          | 16/10000 [00:30<5:09:50,  1.86s/it][A[A[A[A[A[A





  0%|          | 20/10000 [00:30<3:38:22,  1.31s/it][A[A[A[A[A[A





  0%|          | 24/10000 [00:30<2:34:24,  1.08it/s][A[A[A[A[A[A





  0%|          | 28/10000 [00:30<1:49:43,  1.51it/s][A[A[A[A[A[A





  0%|          | 32/10000 [00:30<1:18:35,  2.11it/s][A[A[A[A[A[A





  0%|          | 36/10000 [00:31<57:21,  2.89it/s]  [A[A[A[A[A[A





  0%|          | 40/10000 [00:31<41:49,  3.97it/s][A[A[A[A[A[A





  0%|          | 44/10000 [00:31<31:00,  5.35it/s][A[A[A[A[A[A





  0%|          | 48/10000 [00:31<23:39,  7.01it/s][A[A[A[A[A[A





  1%|          | 52/10000 [0

  4%|▍         | 436/10000 [00:46<06:16, 25.43it/s][A[A[A[A[A[A





  4%|▍         | 440/10000 [00:46<06:09, 25.87it/s][A[A[A[A[A[A





  4%|▍         | 444/10000 [00:46<05:46, 27.61it/s][A[A[A[A[A[A





  4%|▍         | 448/10000 [00:46<05:28, 29.11it/s][A[A[A[A[A[A





  5%|▍         | 452/10000 [00:46<05:16, 30.14it/s][A[A[A[A[A[A





  5%|▍         | 456/10000 [00:46<05:18, 29.95it/s][A[A[A[A[A[A





  5%|▍         | 460/10000 [00:47<05:45, 27.61it/s][A[A[A[A[A[A





  5%|▍         | 464/10000 [00:47<05:44, 27.67it/s][A[A[A[A[A[A





  5%|▍         | 468/10000 [00:47<06:07, 25.91it/s][A[A[A[A[A[A





  5%|▍         | 472/10000 [00:47<07:38, 20.76it/s][A[A[A[A[A[A





  5%|▍         | 476/10000 [00:47<07:04, 22.45it/s][A[A[A[A[A[A





  5%|▍         | 480/10000 [00:47<06:47, 23.34it/s][A[A[A[A[A[A





  5%|▍         | 484/10000 [00:48<06:39, 23.83it/s][A[A[A[A[A[A





  5%|▍         | 488/1000

  9%|▊         | 872/10000 [01:00<04:51, 31.31it/s][A[A[A[A[A[A





  9%|▉         | 876/10000 [01:00<04:47, 31.77it/s][A[A[A[A[A[A





  9%|▉         | 880/10000 [01:01<04:48, 31.58it/s][A[A[A[A[A[A





  9%|▉         | 884/10000 [01:01<04:42, 32.23it/s][A[A[A[A[A[A





  9%|▉         | 888/10000 [01:01<04:43, 32.18it/s][A[A[A[A[A[A





  9%|▉         | 892/10000 [01:01<04:44, 32.03it/s][A[A[A[A[A[A





  9%|▉         | 896/10000 [01:01<04:46, 31.83it/s][A[A[A[A[A[A





  9%|▉         | 900/10000 [01:01<04:41, 32.36it/s][A[A[A[A[A[A





  9%|▉         | 904/10000 [01:01<04:41, 32.27it/s][A[A[A[A[A[A





  9%|▉         | 908/10000 [01:01<04:39, 32.57it/s][A[A[A[A[A[A





  9%|▉         | 912/10000 [01:02<04:41, 32.30it/s][A[A[A[A[A[A





  9%|▉         | 916/10000 [01:02<04:34, 33.07it/s][A[A[A[A[A[A





  9%|▉         | 920/10000 [01:02<04:31, 33.50it/s][A[A[A[A[A[A





  9%|▉         | 924/1000

 13%|█▎        | 1304/10000 [01:14<04:25, 32.78it/s][A[A[A[A[A[A





 13%|█▎        | 1308/10000 [01:14<04:24, 32.89it/s][A[A[A[A[A[A





 13%|█▎        | 1312/10000 [01:14<04:41, 30.89it/s][A[A[A[A[A[A





 13%|█▎        | 1316/10000 [01:14<04:58, 29.12it/s][A[A[A[A[A[A





 13%|█▎        | 1320/10000 [01:15<05:32, 26.08it/s][A[A[A[A[A[A





 13%|█▎        | 1324/10000 [01:15<05:12, 27.73it/s][A[A[A[A[A[A





 13%|█▎        | 1328/10000 [01:15<04:51, 29.73it/s][A[A[A[A[A[A





 13%|█▎        | 1332/10000 [01:15<04:47, 30.19it/s][A[A[A[A[A[A





 13%|█▎        | 1336/10000 [01:15<04:46, 30.28it/s][A[A[A[A[A[A





 13%|█▎        | 1340/10000 [01:15<04:33, 31.62it/s][A[A[A[A[A[A





 13%|█▎        | 1344/10000 [01:15<04:32, 31.72it/s][A[A[A[A[A[A





 13%|█▎        | 1348/10000 [01:15<04:40, 30.79it/s][A[A[A[A[A[A





 14%|█▎        | 1352/10000 [01:16<04:35, 31.38it/s][A[A[A[A[A[A





 14%|█▎     

 17%|█▋        | 1732/10000 [01:28<04:50, 28.44it/s][A[A[A[A[A[A





 17%|█▋        | 1736/10000 [01:28<04:49, 28.51it/s][A[A[A[A[A[A





 17%|█▋        | 1740/10000 [01:28<04:55, 27.93it/s][A[A[A[A[A[A





 17%|█▋        | 1744/10000 [01:28<04:40, 29.42it/s][A[A[A[A[A[A





 17%|█▋        | 1748/10000 [01:29<04:28, 30.76it/s][A[A[A[A[A[A





 18%|█▊        | 1752/10000 [01:29<04:18, 31.92it/s][A[A[A[A[A[A





 18%|█▊        | 1756/10000 [01:29<04:09, 33.09it/s][A[A[A[A[A[A





 18%|█▊        | 1760/10000 [01:29<04:23, 31.30it/s][A[A[A[A[A[A





 18%|█▊        | 1764/10000 [01:29<04:20, 31.67it/s][A[A[A[A[A[A





 18%|█▊        | 1768/10000 [01:29<04:16, 32.06it/s][A[A[A[A[A[A





 18%|█▊        | 1772/10000 [01:29<04:16, 32.05it/s][A[A[A[A[A[A





 18%|█▊        | 1776/10000 [01:29<04:20, 31.60it/s][A[A[A[A[A[A





 18%|█▊        | 1780/10000 [01:30<04:30, 30.38it/s][A[A[A[A[A[A





 18%|█▊     

  File "//anaconda/envs/lang/lib/python3.6/site-packages/joblib/pool.py", line 145, in get
    racquire()
  File "//anaconda/envs/lang/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
KeyboardInterrupt
  File "//anaconda/envs/lang/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "//anaconda/envs/lang/lib/python3.6/site-packages/joblib/pool.py", line 147, in get
    return recv()
KeyboardInterrupt
  File "//anaconda/envs/lang/lib/python3.6/site-packages/joblib/pool.py", line 145, in get
    racquire()
  File "//anaconda/envs/lang/lib/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "//anaconda/envs/lang/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
KeyboardInterrupt
  File "//anaconda/envs/lang/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Process ForkPoolWor