In [39]:
import numpy as np
import time
import os
from netCDF4 import Dataset

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed


In [40]:
##task functions

##writing an image to file (I/O-bound)
def write_img(job_id):
    print(f'job {job_id} start...', flush=True)

    ##mimic some delay in access
    time.sleep(np.random.randint(3, 5))

    ny, nx = 4000, 4000
    img = np.random.normal(0, 1, (ny, nx))

    os.system('mkdir -p data')
    with Dataset(f'data/file{job_id:04d}.nc', 'w') as f:
        f.createDimension('x', nx)
        f.createDimension('y', ny)
        f.createVariable('img', float, ('y', 'x'))
        f['img'][:] = img

    print(f'job {job_id} finish', flush=True)


##smoothing an image (CPU-bound)
def smooth_img(job_id):
    print(f'job {job_id} start...', flush=True)

    ny, nx = 2000, 2000
    img = np.random.normal(0, 1, (ny, nx))

    new_img = np.zeros(img.shape)
    ny, nx = img.shape
    for j in range(1, ny-1):
        for i in range(1, nx-1):
            new_img[j, i] = (img[j-1, i-1] + img[j-1, i] + img[j-1, i+1] +
                             img[j,   i-1] + img[j,   i] + img[j,   i+1] +
                             img[j+1, i-1] + img[j+1, i] + img[j+1, i+1]) / 9  
  
    print(f'job {job_id} finish', flush=True)


In [30]:
%%time

##running the CPU-bound tasks serially
for job_id in range(10):
    smooth_img(job_id)


job 0 start...
job 0 finish
job 1 start...
job 1 finish
job 2 start...
job 2 finish
job 3 start...
job 3 finish
job 4 start...
job 4 finish
job 5 start...
job 5 finish
job 6 start...
job 6 finish
job 7 start...
job 7 finish
job 8 start...
job 8 finish
job 9 start...
job 9 finish
CPU times: user 1min 27s, sys: 153 ms, total: 1min 27s
Wall time: 1min 27s


In [34]:
%%time

##use multi-thread to speed up the CPU-bound tasks?
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    for job_id in range(10):
        future = executor.submit(smooth_img, job_id)
        futures.append(future)


job 0 start...
job 1 start...
job 2 start...
job 3 start...
job 4 start...
job 4 finish
job 5 start...
job 1 finish
job 6 start...
job 3 finish
job 7 start...
job 0 finish
job 8 start...
job 2 finish
job 9 start...
job 5 finish
job 6 finish
job 7 finish
job 8 finish
job 9 finish
CPU times: user 1min 29s, sys: 751 ms, total: 1min 30s
Wall time: 1min 28s


In [35]:
%%time

##use multi-process to speed up the CPU-bound tasks
with ProcessPoolExecutor(max_workers=5) as executor:
    futures = []
    for job_id in range(10):
        future = executor.submit(smooth_img, job_id)
        futures.append(future)


job 3 start...job 0 start...job 2 start...job 1 start...job 4 start...




job 3 finish
job 5 start...
job 2 finish
job 4 finish
job 6 start...
job 7 start...
job 0 finish
job 8 start...
job 1 finish
job 9 start...
job 5 finish
job 8 finish
job 6 finish
job 7 finish
job 9 finish
CPU times: user 22 ms, sys: 33.2 ms, total: 55.2 ms
Wall time: 19.2 s


In [36]:
%%time

##run the IO-bound tasks serially
for job_id in range(10):
    write_img(job_id)


job 0 start...
job 0 finish
job 1 start...
job 1 finish
job 2 start...
job 2 finish
job 3 start...
job 3 finish
job 4 start...
job 4 finish
job 5 start...
job 5 finish
job 6 start...
job 6 finish
job 7 start...
job 7 finish
job 8 start...
job 8 finish
job 9 start...
job 9 finish
CPU times: user 4.23 s, sys: 1.77 s, total: 6 s
Wall time: 39.4 s


In [42]:
%%time

##use multi-thread to speed up the IO-bound tasks
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = []
    for job_id in range(20):
        future = executor.submit(write_img, job_id)
        futures.append(future)



job 1 start...
job 2 start...
job 3 start...
job 4 start...
job 5 start...
job 6 start...
job 7 start...
job 8 start...
job 9 start...
job 0 finish
job 10 start...
job 5 finish
job 11 start...
job 7 finish
job 12 start...
job 9 finish
job 13 start...
job 1 finish
job 14 start...
job 15 start...
job 2 finish
job 16 start...
job 4 finish
job 17 start...
job 6 finish
job 18 start...
job 8 finish
job 19 start...
job 10 finish
job 15 finish
job 17 finish
job 12 finish
job 11 finish
job 14 finish
job 13 finish
job 16 finish
job 18 finish
job 19 finish
CPU times: user 8.6 s, sys: 3.23 s, total: 11.8 s
Wall time: 15 s


In [31]:
##to handle errors and make sure all jobs finished
for future in as_completed(futures):
    try:
        result = future.result()
    except Exception as e:
        print(f'Error: {e}')
