# Python Multiprocessing: Run Code in Concurrently Using the Threading Module

We usually have two types of tasks:
- IO Bound: Waitinng for input and output to be completed 
 - File system operation, file system operation : Threading helps us here!!
 
- CPU Bound:Number crunching: Multiprocessing helps us here!!

In [27]:
import platform
import time
print('Os Version:',platform.version())
print('Os Version:',platform.python_version())

Os Version: #36-Ubuntu SMP Wed Dec 9 09:14:40 UTC 2020
Os Version: 3.9.0+


In [34]:
def do_someting():
    print('Sleeping 1 second')
    time.sleep(1)
    print('Done sleeping...\n')
do_someting()
do_someting()
do_someting()

Sleeping 1 second
Done sleeping...

Sleeping 1 second
Done sleeping...

Sleeping 1 second
Done sleeping...



<img src='https://github.com/iqbalamo93/Datasets/blob/master/Multiprocessing.png?raw=true'>

In [40]:
import multiprocessing as mp
start = time.perf_counter()
p1 = mp.Process(target=do_someting)
p2 = mp.Process(target=do_someting)
p1.start()
p2.start()
finish = time.perf_counter()
print(f'\nFinished in {round(finish-start,2)} seconds',end='\n\n')
text = '''WHAT HAPPEND HERE IS AS P1,P2 STARTED AND THEN CODE MOVED TO NEXT LINE..
IT WAS LIKE: GUY'S YOU CARRY ON YOUR TASK I AM MOVING AHEAD!!!\n'''
print(text)

Sleeping 1 second
Sleeping 1 second
Finished in 0.02 seconds

WHAT HAPPEND HERE IS AS P1,P2 STARTED AND THEN CODE MOVED TO NEXT LINE..
IT WAS LIKE: GUY'S YOU CARRY ON YOUR TASK I AM MOVING AHEAD!!!


Done sleeping...

Done sleeping...



In [38]:
import multiprocessing as mp
start = time.perf_counter()
p1 = mp.Process(target=do_someting)
p2 = mp.Process(target=do_someting)
p1.start()
p2.start()
p1.join()
p2.join()

finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')
text='''WHAT HAPPEND HERE IS AS P1,P2 STARTED AND ENDED:
SYNCHRONOUSLY.JOIN WILL MAKE SURE PROCESS TO FINISH BEFORE MOVING TO NEXT PART!!!'''
print(text)

Sleeping 1 second
Sleeping 1 second
Done sleeping...

Done sleeping...

Finished in 1.04 seconds

WHAT HAPPEND HERE IS AS P1,P2 STARTED AND ENDED:
SYNCHRONOUSLY.JOIN WILL MAKE SURE PROCESS TO FINISH BEFORE MOVING TO NEXT PART!!!


In [41]:
start = time.perf_counter()
processess = []
for _ in range(10):
    p = mp.Process(target=do_someting)
    p.start()
    processess.append(p)
    
for process in processess:
    process.join()
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')
'''Even running 10 times it didn't took 10 seconds, because they were moved to other core.'''

Sleeping 1 secondSleeping 1 second

Sleeping 1 second
Sleeping 1 secondSleeping 1 secondSleeping 1 second
Sleeping 1 second
Sleeping 1 second
Sleeping 1 secondSleeping 1 second



Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...
Done sleeping...


Done sleeping...
Done sleeping...
Done sleeping...



Finished in 1.22 seconds



In [42]:
def do_someting(seconds):
    print(f'Sleeping {seconds} second(s)....')
    time.sleep(seconds)
    print('Done sleeping...\n')

In [43]:
start = time.perf_counter()
processess = []
for _ in range(10):
    p = mp.Process(target=do_someting,args=[1.5])
    p.start()
    processess.append(p)
    
for process in processess:
    process.join()
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Sleeping 1.5 second(s)....Sleeping 1.5 second(s)....

Sleeping 1.5 second(s)....
Sleeping 1.5 second(s)....Sleeping 1.5 second(s)....

Sleeping 1.5 second(s)....
Sleeping 1.5 second(s)....Sleeping 1.5 second(s)....Sleeping 1.5 second(s)....

Sleeping 1.5 second(s)....

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...
Done sleeping...


Done sleeping...

Finished in 1.7 seconds



### Process Pool executor : using concurrent.futures Module

In [67]:
def do_someting(seconds):
    print(f'Sleeping {seconds} second(s)....')
    time.sleep(seconds)
    return 'Done sleeping...\n'

In [68]:
import concurrent.futures 

## Submit method executes function one at a time

- Submit method schedules a function to be executed and returns a <b>future object</b>
- Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.

- Submit method gives future object:
- Future Objects :The Future class encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit().
- Methods
        -    cancel()
        -    running()
        -    result()
<a herf='https://docs.python.org/3/library/concurrent.futures.html'>Read Here</a>

In [116]:
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    future_1 = executor.submit(do_someting,1)
    future_2 = executor.submit(do_someting,1)
    future_3 = executor.submit(do_someting,1)
    print(future_1.result())
    print(future_2.result())
    print(future_3.result())
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Sleeping 1 second(s)....Sleeping 1 second(s)....

Sleeping 1 second(s)....
Done sleeping...1
Done sleeping...1
Done sleeping...1
Finished in 1.09 seconds



In [117]:
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    future_states = [executor.submit(do_someting,1) for _ in range(10)] #returnds future state object 
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Sleeping 1 second(s)....Sleeping 1 second(s)....

Sleeping 1 second(s)....
Sleeping 1 second(s)....
Sleeping 1 second(s)....
Sleeping 1 second(s)....
Sleeping 1 second(s)....Sleeping 1 second(s)....

Sleeping 1 second(s)....
Sleeping 1 second(s)....
Finished in 3.08 seconds



## concurrent.futures.as_completed

This gives us an iterator that we can loop over that will yeild the results
of our prcosess as they are completed

In [72]:
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    Future_states = [executor.submit(do_someting,1) for _ in range(10)]
    
    for f in concurrent.futures.as_completed(Future_states):# gives the resulst as thet are completed
        print(f.result())
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Sleeping 1 second(s)....Sleeping 1 second(s)....
Sleeping 1 second(s)....

Sleeping 1 second(s)....
Sleeping 1 second(s)....
Sleeping 1 second(s)....Sleeping 1 second(s)....

Sleeping 1 second(s)....
Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Sleeping 1 second(s)....
Sleeping 1 second(s)....
Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Done sleeping...

Finished in 3.1 seconds



In [73]:
def do_someting(seconds):
    print(f'Sleeping {seconds} second(s)....')
    time.sleep(seconds)
    return f'Done sleeping...{seconds}'

In [74]:
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    secs = [5,4,3,2,1]
    results = [executor.submit(do_someting,sec) for sec in secs]
    
    for f in concurrent.futures.as_completed(results):#results in movement as it's completed
        print(f.result())
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Sleeping 5 second(s)....Sleeping 4 second(s)....

Sleeping 3 second(s)....Sleeping 2 second(s)....

Sleeping 1 second(s)....
Done sleeping...2
Done sleeping...3
Done sleeping...1
Done sleeping...4
Done sleeping...5
Finished in 5.08 seconds



In [61]:
import os
os.cpu_count()
'''AS system is of 4 core, it starte 5,4,3,2 first then started 1.'''

'AS system is of 4 core, it starte 5,4,3,2 first then started 1.'

Submit function submits each function one at a time
but in order to run submit on a entire list then we need to do map !

In [77]:
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    secs = [5,4,3,2,1]
    results = executor.map(do_someting,secs)#Here map returns the results, in order they were started 
    for result in results:
        print(result)
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Sleeping 5 second(s)....
Sleeping 4 second(s)....
Sleeping 3 second(s)....
Sleeping 2 second(s)....
Sleeping 1 second(s)....
Done sleeping...5
Done sleeping...4
Done sleeping...3
Done sleeping...2
Done sleeping...1
Finished in 5.05 seconds



# Image examples

In [80]:
from PIL import Image, ImageFilter

In [83]:
images_path = os.path.join('./','MP_IMAGES')

In [95]:
os.chdir(images_path)
os.makedirs('./Processed')

FileNotFoundError: [Errno 2] No such file or directory: './MP_IMAGES'

In [110]:
img_names = [file for file in os.listdir() if file.endswith('.jpg') ]

In [111]:
size = (1200, 1200)
start = time.perf_counter()
for img_name in img_names:
    img = Image.open(img_name)
    img = img.filter(ImageFilter.GaussianBlur(15))
    img.thumbnail(size)
    img.save(f'Processed/{img_name}')
    print(f'{img_name} was processed...')
    
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

photo-1532009324734-20a7a5813719.jpg was processed...
photo-1513938709626-033611b8cc03.jpg was processed...
photo-1549692520-acc6669e2f0c.jpg was processed...
photo-1564135624576-c5c88640f235.jpg was processed...
photo-1516972810927-80185027ca84.jpg was processed...
photo-1530224264768-7ff8c1789d79.jpg was processed...
photo-1504198453319-5ce911bafcde.jpg was processed...
photo-1516117172878-fd2c41f4a759.jpg was processed...
photo-1530122037265-a5f1f91d3b99.jpg was processed...
photo-1550439062-609e1531270e.jpg was processed...
photo-1522364723953-452d3431c267.jpg was processed...
photo-1493976040374-85c8e12f0c0e.jpg was processed...
photo-1507143550189-fed454f93097.jpg was processed...
photo-1524429656589-6633a470097c.jpg was processed...
photo-1541698444083-023c97d3f4b6.jpg was processed...
Finished in 43.83 seconds



In [114]:
def process_image(img_name):
    img = Image.open(img_name)
    img = img.filter(ImageFilter.GaussianBlur(15))
    img.thumbnail(size)
    img.save(f'processed/{img_name}')
    print(f'{img_name} was processed...')

In [115]:
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(process_image, img_names)
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Finished in 23.57 seconds



## Revision:
#### Process pool executor : concurrent.futures
    - Submit method 
    - submit and as_completed
    - map

In [None]:
# ALSO, IS IT REALLY CPU BOUND NY IO BOUND?

In [118]:
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(process_image, img_names)
finish = time.perf_counter()
print(f'Finished in {round(finish-start,2)} seconds',end='\n\n')

Finished in 19.95 seconds

