# Description

In a lot of kernels I see that when doing preprocessing or postprocessing we use regular single proces way of coding. A lot of times it takes only a few extra lines of code to implement multiprocessing and get a nice performance boost.

When we do deeplearning we want it to run as fast as possible with parallel processes on a GPU. So why not also use parallel processes for the pre- or postprocessing parts that we implement.

To show you that it is quitte easy I will use the preprocessing code as was written in this very nice [kernel](https://www.kaggle.com/iafoss/image-preprocessing-128x128). We will take a look at the performance difference when running it with multiprocessing.

Note that you should be able to just modify this kernel to run your own pre- or postprocessing code.

And if you like the kernel then I would appreciate an upvote.

In [1]:
import cv2
import gc
import io
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from tqdm import tqdm_notebook as tqdm
import zipfile
import warnings
warnings.filterwarnings("ignore")

To implement support for multi-processing we need to import the multiprocessing module.

In [2]:
# To support Multiprocessing
import multiprocessing

Next the image preprocessing code as mentioned from the referenced kernel.

In [3]:
HEIGHT = 137
WIDTH = 236
SIZE = 128

TRAIN = ['/kaggle/input/bengaliai-cv19/train_image_data_0.parquet',
         '/kaggle/input/bengaliai-cv19/train_image_data_1.parquet',
         '/kaggle/input/bengaliai-cv19/train_image_data_2.parquet',
         '/kaggle/input/bengaliai-cv19/train_image_data_3.parquet']

In [4]:
def bbox(img):
    rows = np.any(img, axis=1)
    cols = np.any(img, axis=0)
    rmin, rmax = np.where(rows)[0][[0, -1]]
    cmin, cmax = np.where(cols)[0][[0, -1]]
    
    return rmin, rmax, cmin, cmax

def crop_resize(img0, size = SIZE, pad = 16):
    # Crop a box around pixels large than the threshold 
    # Some images contain line at the sides
    ymin,ymax,xmin,xmax = bbox(img0[5:-5,5:-5] > 80)
    
    # Cropping may cut too much, so we need to add it back
    xmin = xmin - 13 if (xmin > 13) else 0
    ymin = ymin - 10 if (ymin > 10) else 0
    xmax = xmax + 13 if (xmax < WIDTH - 13) else WIDTH
    ymax = ymax + 10 if (ymax < HEIGHT - 10) else HEIGHT
    img = img0[ymin:ymax,xmin:xmax]
    
    # Remove low intensity pixels as noise
    img[img < 28] = 0
    lx, ly = xmax-xmin,ymax-ymin
    l = max(lx,ly) + pad
    
    # Make sure that the aspect ratio is kept in rescaling
    img = np.pad(img, [((l-ly)//2,), ((l-lx)//2,)], mode = 'constant')
    
    return cv2.resize(img, (size, size))

# Single process
First lets run the preprocessing in the standard single process way and see what the processing time is.

In [5]:
# Start timer
start_time = time.time()

with zipfile.ZipFile('train_single.zip', 'w') as img_out:
    for fname in TRAIN:
        # Read parquet file into pandas.
        df = pd.read_parquet(fname)
        
        # The input inverted
        data = 255 - df.iloc[:, 1:].values.reshape(-1, HEIGHT, WIDTH).astype(np.uint8)
        for idx in tqdm(range(len(df))):
            name = df.iloc[idx,0]
            
            # Normalize each image by its max val
            img = (data[idx]*(255.0/data[idx].max())).astype(np.uint8)
            img = crop_resize(img)
        
            img = cv2.imencode('.png',img)[1]
            img_out.writestr(name + '.png', img)
            
# Total time
print('Processing time standard: {0} [sec]'.format(time.time() - start_time))

HBox(children=(FloatProgress(value=0.0, max=50210.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50210.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50210.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50210.0), HTML(value='')))


Processing time standard: 424.8512601852417 [sec]


We can see that looping over the training input files and processing each file separately takes roughlybetween 500-600 seconds. Note that this can vary slightly with each run.

# Multiprocess Version 1
Lets explore a first version of multiprocessing. Here we will loop over the train input files and process all the images in parallel processes. As an end result all the processed images will be added to the final zip file.

For setting up an effective multiprocessing code it is also import to take into account how much CPU cores are available, whats the available memory and also the disksubsystem. As a general guideline you can take the number of CPU cores as the number of processes that can run in parallel. If the CPU supports HyperTreading then you might get a small benefit by doubling the number of processes. However it does not always work. Some experimentation could be required ;-)

To use multiprocessing I define a function 'process_image_multi_v1'. It performs the complete processing for 1 image in combination with the earlier defined preprocessing functions.

In [6]:
def process_image_multi_v1(data, name):
    # Reshape data
    data = data.reshape(HEIGHT, WIDTH)
    
    # Normalize each image by its max val
    data = 255 - data.astype(np.uint8)
    img = (data*(255.0/data.max())).astype(np.uint8)
    
    # Crop, Resize and encode as PNG file
    img = crop_resize(img)
    img = cv2.imencode('.png',img)[1]
    
    return name, img

Verify and get the CPU count. We will use the cpu_count to specify the number of parallel processes.

In [7]:
# CPU core count
cpu_count = multiprocessing.cpu_count()
print(cpu_count)

4


Next we setup the multiprocessing pool. With pool.starmap we can send multiple iterable variables to the function 'process_image_multi_v1'. For each process that returns a processed image the name and image are used to write them to the output zip file.

In [8]:
# Start Multi Processing
start_multi_time_v1 = time.time()

try:
    # Setup multiprocessing pool
    pool = multiprocessing.Pool(processes = cpu_count)

    with zipfile.ZipFile('train_multi.zip', 'w') as img_out:
        for fname in TRAIN:
            # Read Parquet file
            df = pd.read_parquet(fname)
            
            # Prep the input for pool.starmap.
            data = df.iloc[:, 1:].values
            names = df.image_id.tolist()

            for name, img in pool.starmap(process_image_multi_v1, zip(data, names)):
                img_out.writestr(name + '.png', img)

finally:
    pool.close()
    pool.join()

# Total time
print('Processing time multi v1: {0} [sec]'.format(time.time() - start_multi_time_v1))

Processing time multi v1: 389.6807367801666 [sec]


We can see that there is a significant increase in performance by cutting down the required processing time because all images are processed in parallel.

# Multiprocess Version 2
Lets also explore a second option. This time we won't write to the final output .zip file but we will write each image directly to an output directory. This might save a little overhead for each process.

In [9]:
def process_image_multi_v2(data, name):
    # Reshape data
    data = data.reshape(HEIGHT, WIDTH)
    
    # Normalize each image by its max val
    data = 255 - data.astype(np.uint8)
    img = (data*(255.0/data.max())).astype(np.uint8)
    
    # Crop, Resize and encode as PNG file
    img = crop_resize(img)
    img = cv2.imencode('.png',img)[1]
    
    # Save image
    cv2.imwrite('./subdir/' + name + '.png', img)
    
# Start Multi Processing
start_multi_time_v2 = time.time()

try:
    pool = multiprocessing.Pool(processes = cpu_count)

    # Process Images Standard.
    for fname in TRAIN:
        # Read Parquet file
        df = pd.read_parquet(fname)

        # Invert the input
        data = df.iloc[:, 1:].values
        names = df.image_id.tolist()
        
        # Multi Process Images
        pool.starmap(process_image_multi_v2, zip(data, names))

finally:
    pool.close()
    pool.join()

# Total time
print('Processing time multi v2: {0} [sec]'.format(time.time() - start_multi_time_v2))

Processing time multi v2: 323.2609407901764 [sec]


As you can see we achieved a nice performance gain by cutting down the total processing times. May'be you would expect more in performance gain. However we have to take into account the time required for the loading of the train input files. This takes some amount of time.

I hope I showed a little of the benefits of using multiprocessing. With some tweaking you can just replace the existing preprocessing code and put in your own code. 

If you have any questions or remarks I'am always happy to help.