**[Big Data and Cloud Computing]**

# Parallel programming in python3 using the multiprocessing module

Spark and several other modules in python give you tools that can automatically distribute and map threads or processes across processors
and disks (using single or multiple machines), but very often it is necessary to have more control over
your parallel tasks, understanding the backend implementation or implementing your own parallel code. In this class, we will use some alternatives to
program in parallel using the `multiprocessing` module of python and take advantage of your multicore machine. Be aware that this does not work with ***threads***, but with ***processes***.

Material for these practical exercises was taken from <a href="https://www.machinelearningplus.com/python/parallel-processing-python/">this
    site</a>.
    
Every piece of code is timed in order that you have an idea of how execution time differs among the choices for parallelization.

<b>Note:</b> When using the method `pool.apply_async` the function to be
invoked may need to be defined in a separate file and be imported to the
program, otherwise your code may not work.


__References__:

- [python3 multiprocessing](https://docs.python.org/3/library/multiprocessing.html)
- [Programming Guidelines for multiprocessing](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming)
- [Timing and profiling using colab (You may find this useful)](https://colab.research.google.com/github/jakevdp/PythonDataScienceHandbook/blob/master/notebooks/01.07-Timing-and-Profiling.ipynb)
- [Differences between pool.apply and others](https://discuss.python.org/t/differences-between-pool-map-pool-apply-and-pool-apply-async/6575)

Let's start with the basics: importing the relevant python3 module...(remember: we will be working with processes and not with threads)

In [None]:
import multiprocessing as mp

For this exercise we will need some extra modules.

In [None]:
import numpy as np
from time import time

We will also create some synthetic random data (a `numpy` array) but you can use your own data. Notice that I reduced the data dimension in order that you can better understand the sequence of results produced by the sequence of operations executed (sequentially and in parallel).

In [None]:
r = 10
m = 5
n = 10
#m = 200
#n = 2000


#MATRIX MxN
np.random.seed(100)
arr = np.random.randint(0, r, size=[m, n])
data = arr.tolist()
print(data[:10])

[[8, 8, 3, 7, 7, 0, 4, 2, 5, 2], [2, 2, 1, 0, 8, 4, 0, 9, 6, 2], [4, 1, 5, 3, 4, 4, 3, 7, 1, 1], [7, 7, 0, 2, 9, 9, 3, 2, 5, 8], [1, 0, 7, 6, 2, 0, 8, 2, 5, 1]]


Now, let's write a very simple sequential program that defines a function to count the number of values of a row of the array created above that falls in a given interval.

In [None]:
import multiprocessing as mp


def howmany_within_range(row, minimum, maximum):
    print(mp.current_process(),' ',row) #CURRENT PROCESS BEING USED (IT IS THE MAIN PROCESS)
    count = 0
    for num in row:
       if minimum <= num <= maximum:
          count = count + 1
    return count


start_time = time()
#CALCULATE THE AMOUNT OF VALUES BETWEEN [4, 8] IN EACH LINE
results = []
for row in data:
   results.append(howmany_within_range(row, minimum=4, maximum=8))


print(round(time() - start_time,8),'seconds')
print(results[:10])
#TOTAL TIME = 0.00507522 seconds

<_MainProcess name='MainProcess' parent=None started>   [8, 8, 3, 7, 7, 0, 4, 2, 5, 2]
<_MainProcess name='MainProcess' parent=None started>   [2, 2, 1, 0, 8, 4, 0, 9, 6, 2]
<_MainProcess name='MainProcess' parent=None started>   [4, 1, 5, 3, 4, 4, 3, 7, 1, 1]
<_MainProcess name='MainProcess' parent=None started>   [7, 7, 0, 2, 9, 9, 3, 2, 5, 8]
<_MainProcess name='MainProcess' parent=None started>   [1, 0, 7, 6, 2, 0, 8, 2, 5, 1]
0.00507522 seconds
[6, 3, 5, 4, 4]


Let's check some characteristics of our machine.

In [None]:
!lsb_release -a

No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 22.04.3 LTS
Release:	22.04
Codename:	jammy


In [None]:
!uname -a

Linux c7e2ea1b1080 6.1.58+ #1 SMP PREEMPT_DYNAMIC Sat Nov 18 15:31:17 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux


In [None]:
!cat /proc/cpuinfo

processor	: 0
vendor_id	: GenuineIntel
cpu family	: 6
model		: 79
model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
stepping	: 0
microcode	: 0xffffffff
cpu MHz		: 2199.998
cache size	: 56320 KB
physical id	: 0
siblings	: 2
core id		: 0
cpu cores	: 1
apicid		: 0
initial apicid	: 0
fpu		: yes
fpu_exception	: yes
cpuid level	: 13
wp		: yes
flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt arat md_clear arch_capabilities
bugs		: cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf mds swapgs taa mmio_stale_data retbleed
bogomips	: 4399.99
clflush size	: 64
cache_alignment	: 64
addres

In what follows, we will be using first the `Pool` class. Next, we will use the `Process` class.
When using `Pool`, threads or processes get launched as soon as the Pool is initialized (it happens in Pool__init__() - there is no need to submit tasks for this to happen) and wait for tasks. When a task arrives and is executed, threads or processes **do not exit**, they just go back to waiting state waiting for more work to come.

You can define it to work differently, though. You can add the `maxtasksperchild` parameter to your pool. As soon as a worker completes this amount of tasks, it exits, and a new worker is immediately launched (no need to give it a task first, it gets launched as soon as a worker exits). This is managed in the `Pool` class `Pool._maintain_pool()` and `Pool._repopulate_pool()` functions in the source code. `Pool` can use several different methods to distribute tasks. We will see some of them next.

In order to help parallelizing the code we will be using some mapping functions: `apply`, `map`, `starmap`, `apply_async` etc.

Let's parallelize our program that counts values within a range.

# **Option #1**: using `pool.apply`

In [None]:
import multiprocessing as mp


num_cpus = mp.cpu_count()
print('Num cpus = ', num_cpus) #USING 2 PROCESSERS


start_time = time()
pool = mp.Pool(mp.cpu_count()) #CREATE A POOL TO PARELLELIZE THE EXECUTION BETWEEN THE 2 PROCESSERS
print('Time to create pool: ',round(time() - start_time,8), 'seconds')
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data] #USING THE FUCTION APPLY
pool.close()


print('Total time: ',round(time() - start_time,8), 'seconds')
print(results[:10])
#TOTAL TIME = 0.18138671 seconds


#Q1: The parallel program is slower than the sequential, since the data provided is very small. The process creation in itself is already
#6 times bigger than the sequential program.

Num cpus =  2
<ForkProcess name='ForkPoolWorker-29' parent=197 started daemon>   [8, 8, 3, 7, 7, 0, 4, 2, 5, 2, 2, 2, 1, 0, 8, 4, 0, 9, 6, 2, 4, 1, 5, 3, 4, 4, 3, 7, 1, 1, 7, 7, 0, 2, 9, 9, 3, 2, 5, 8, 1, 0, 7, 6, 2, 0, 8, 2, 5, 1, 8, 1, 5, 4, 2, 8, 3, 5, 0, 9, 3, 6, 3, 4, 7, 6, 3, 9, 0, 4, 4, 5, 7, 6, 6, 2, 4, 2, 7, 1, 6, 6, 0, 7, 2, 3, 5, 4, 2, 4, 3, 7, 9, 0, 0, 5, 9, 6, 6, 5, 6, 4, 7, 3, 9, 2, 3, 8, 7, 1, 5, 9, 3, 0, 6, 2, 3, 4, 8, 9, 8, 5, 2, 7, 5, 9, 0, 9, 8, 6, 2, 0, 5, 3, 2, 3, 6, 4, 1, 3, 1, 4, 8, 8, 2, 2, 7, 2, 1, 2, 7, 1, 0, 5, 3, 5, 2, 6, 1, 1, 5, 9, 2, 5, 6, 4, 6, 7, 9, 7, 3, 0, 2, 5, 1, 1, 0, 3, 6, 7, 3, 6, 4, 8, 6, 5, 0, 0, 5, 1, 3, 2, 3, 1, 0, 6, 5, 2, 0, 0, 9, 1, 5, 2, 3, 6, 1, 4, 3, 1, 4, 4, 9, 5, 6, 3, 4, 3, 7, 7, 2, 4, 0, 2, 0, 6, 8, 1, 5, 6, 4, 6, 5, 0, 8, 8, 5, 9, 3, 2, 8, 7, 1, 4, 6, 0, 7, 3, 9, 1, 2, 7, 7, 6, 4, 3, 3, 3, 4, 7, 7, 4, 1, 1, 8, 7, 8, 8, 8, 7, 9, 0, 6, 0, 7, 4, 5, 1, 5, 5, 3, 4, 3, 6, 5, 0, 0, 6, 1, 9, 8, 0, 3, 9, 2, 6, 5, 4, 5, 7, 3, 0, 7, 8, 2, 0, 

# **Q1: Is your parallel program slower than the sequential? Why?**

Let's parallelize this program using an alternative function.

# **Option #2**: using `pool.map`

In [None]:
import multiprocessing as mp


def howmany_within_range_rowonly(row, minimum=4, maximum=8):
  print(mp.current_process(),' ',row) #CURRENT PROCESS BEING USED
  count = 0
  for num in row:
     if minimum <= num <= maximum:
        count = count + 1
  return count


start_time = time()
pool = mp.Pool(mp.cpu_count()) #CREATE A POOL TO PARELLELIZE THE EXECUTION BETWEEN THE 2 PROCESSERS
print("time:" , start_time)
results = pool.map(howmany_within_range_rowonly, [row for row in data]) #USING THE FUCTION MAP
pool.close()


print(round(time() - start_time,8), 'seconds')
print(results[:10])
#TOTAL TIME = 0.15351677 seconds


#Q2: Using "Map" (option #1) instead of "Apply" (option #2), makes the program faster. In the "Apply" each process in the pool is assigned
#one task at a time, it waits for that task to complete before moving on to the next one. With map, the pool automatically distributes the
#rows of data among the worker processes in the pool, each process independently executes the function on its assigned data.


#Q3: After increasing the data size we got the following proccessing times: Apply took 5.5 seconds, Map took 3.5 seconds. Showing again
#better results using "Map", where this time the difference between both is much bigger.

<ForkProcess name='ForkPoolWorker-31' parent=197 started daemon><ForkProcess name='ForkPoolWorker-32' parent=197 started daemon>    [4, 6, 6, 6, 0, 9, 3, 5, 9, 6, 7, 8, 2, 7, 5, 7, 4, 7, 8, 2, 7, 8, 5, 2, 5, 6, 1, 0, 6, 7, 3, 8, 7, 9, 7, 3, 5, 4, 2, 7, 2, 4, 6, 3, 0, 4, 8, 0, 5, 5, 2, 6, 3, 6, 6, 9, 7, 0, 5, 3, 7, 0, 3, 8, 9, 9, 7, 9, 6, 4, 9, 4, 3, 3, 8, 6, 0, 9, 5, 6, 2, 3, 0, 2, 5, 4, 9, 6, 7, 8, 0, 9, 0, 5, 6, 8, 0, 6, 6, 8, 7, 3, 2, 6, 7, 4, 0, 4, 3, 4, 6, 5, 1, 9, 9, 7, 1, 4, 6, 7, 2, 7, 1, 5, 7, 8, 3, 1, 1, 1, 1, 8, 3, 1, 0, 2, 3, 0, 7, 8, 3, 3, 3, 9, 1, 6, 1, 4, 9, 5, 5, 2, 7, 7, 1, 6, 0, 5, 0, 7, 7, 4, 3, 2, 2, 2, 7, 6, 2, 0, 5, 0, 4, 8, 8, 5, 7, 6, 8, 8, 2, 2, 2, 6, 5, 2, 6, 9, 6, 0, 9, 3, 2, 4, 9, 0, 1, 3, 6, 4, 3, 6, 9, 6, 3, 0, 9, 2, 7, 3, 2, 1, 1, 4, 5, 7, 2, 0, 2, 6, 8, 7, 3, 1, 5, 7, 6, 5, 6, 7, 6, 3, 9, 4, 0, 7, 6, 1, 7, 3, 6, 5, 7, 8, 8, 6, 9, 7, 6, 9, 8, 5, 5, 1, 4, 0, 1, 8, 0, 3, 6, 8, 9, 1, 2, 3, 5, 7, 6, 9, 2, 9, 3, 4, 0, 2, 9, 7, 9, 1, 3, 7, 7, 8, 6, 0, 0, 8, 4, 

# **Q2: What is the difference between Option #1 and Option #2? In other words, what is the difference between `apply` and `map`? Which one is slower? Why?**

# **Q3: Try increasing the dimension of your data. Do you see any improvement in performance or not? Why?**

# **Option #3**: using `pool.starmap`

In [None]:
import multiprocessing as mp


start_time = time()
pool = mp.Pool(mp.cpu_count())
results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data]) #USING THE FUCTION STARMAP
pool.close()


print(round(time() - start_time,8), 'seconds')
print(results[:10])
#TOTAL TIME = 0.12449646 seconds

<ForkProcess name='ForkPoolWorker-38' parent=197 started daemon><ForkProcess name='ForkPoolWorker-37' parent=197 started daemon>      [8, 8, 3, 7, 7, 0, 4, 2, 5, 2][2, 2, 1, 0, 8, 4, 0, 9, 6, 2]

<ForkProcess name='ForkPoolWorker-38' parent=197 started daemon><ForkProcess name='ForkPoolWorker-37' parent=197 started daemon>      [7, 7, 0, 2, 9, 9, 3, 2, 5, 8][4, 1, 5, 3, 4, 4, 3, 7, 1, 1]
<ForkProcess name='ForkPoolWorker-38' parent=197 started daemon>
   [1, 0, 7, 6, 2, 0, 8, 2, 5, 1]
0.12449646 seconds
[6, 3, 5, 4, 4]


# **Option #4**: using `pool.apply_async`

Let's try a little bit different parallelization approach where we let processes run asynchronously.

In [None]:
import multiprocessing as mp


start_time = time()
pool = mp.Pool(mp.cpu_count())
results = []


def howmany_within_range2(i, row, minimum, maximum):
   count = 0
   for num in row:
      if minimum <= num <= maximum:
         count = count + 1
   return (i, count)


def collect_result(result):
    global results
    results.append(result)


for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8, ), callback=collect_result)


pool.close()
pool.join()
print(round(time() - start_time,8), 'seconds')
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]
print(results_final[:10])


#TOTAL TIME = 0.04562044 seconds

0.05064511 seconds
[6, 3, 5, 4, 4]


This does not run! The reason is because when you use the method `pool.apply_async` the function to be
invoked needs to be defined in a separate file and be imported to the
program, otherwise your code will not work (**why??**). Let's do this.
(One way of doing it is to upload the file with the function `howmany_within_range2` to your drive, and then copying it to this colab machine. I created a file called `howmany.py`, uploaded it to my drive and copied to my current directory at the colab machine, as shown next).


In [None]:
!cp /content/drive/MyDrive/howmany2.py .
!ls -la

total 24
drwxr-xr-x 1 root root 4096 Apr 17 15:43 .
drwxr-xr-x 1 root root 4096 Apr 17 14:36 ..
drwxr-xr-x 4 root root 4096 Apr 15 13:25 .config
drwx------ 5 root root 4096 Apr 17 15:31 drive
-rw-r--r-- 1 root root  169 Apr 17 15:45 howmany2.py
drwxr-xr-x 1 root root 4096 Apr 15 13:26 sample_data


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Corrected `pool.async`, where the function goes in to a separate file.

In [None]:
import multiprocessing as mp


pool = mp.Pool(mp.cpu_count())
results = []


import howmany2


def collect_result(result):
    global results
    results.append(result)


start_time = time()
for i, row in enumerate(data):
    pool.apply_async(howmany2.howmany_within_range2, args=(i, row, 4, 8, ), callback=collect_result)


pool.close()
pool.join()
print(round(time() - start_time,8), 'seconds')
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]
print(results_final[:10])



#TOTAL TIME = 0.05367613 seconds

0.05367613 seconds
[6, 3, 5, 4, 4]


In [None]:
#Parallel processing with Pool.apply_async() same model and comp pattern as the others
#apply_async does not work like apply in parallel
#if implemented like here, without a loop to spawn multiple tasks, it does not run

import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())
results = []

# Step 1: function howmany... is defined in another file
import howmany2

# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    print(result)
    results.append(result)
#    return results


start_time = time()


# Step 3: Use loop to parallelize
# for i, row in enumerate(data):
#    pool.apply_async(howmany.howmany_within_range2, args=(i, row, 4, 8, ), callback=collect_result)

#results_final = [pool.apply_async(howmany_within_range2, args = (row, 4, 8)) for row in data]


# Step 4: Close Pool and wait for all processes to complete
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.


print(round(time() - start_time,8), 'seconds')


# Step 5: Sort results [OPTIONAL]
# results.sort(key=lambda x: x[0])
# results_final = [r for i, r in results]
print(results_final[0].get())
#TOTAL TIME = 0.01162577 seconds
#WITH ERRORS

0.01162577 seconds


TypeError: howmany_within_range2() missing 1 required positional argument: 'maximum'

# **Option #5: using `Process()`**

Let's try yet another option, **not using Pool**. Now, using `Process()`

In [None]:
import multiprocessing as mp


def howmany_within_range3(i, row, minimum, maximum):
   """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
   count = 0
   global results
   for num in row:
      if minimum <= num <= maximum:
         count = count + 1
   results[i] = count


start_time = time()
processes = []


for i, row in enumerate(data):
   p = mp.Process(target=howmany_within_range3, args=(i, row, 4, 8, ))
   processes.append(p)
   p.start()


for process in processes:
   process.join()


print(round(time() - start_time,8), 'seconds')
print(results[:10])
#TOTAL TIME = 0.22674489 seconds
#WITH ERRORS

Process Process-79:
Traceback (most recent call last):
Process Process-80:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Process Process-81:
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-75-fea0df93eff1>", line 11, in howmany_within_range3
    results[i] = count
Process Process-82:
  File "<ipython-input-75-fea0df93eff1>", line 11, in howmany_within_range3
    results[i] = count
IndexError: list assignment index out of range
Traceback (most recent call last):
IndexError: list assignment index out of range
  File "/usr/lib/python3.10/multiprocessing/process.py",

0.22674489 seconds
[]


This was a not very good idea, was it? Too many processes are created. Let's try another way.

In [None]:
import multiprocessing as mp
import math


def howmany_within_range3(row_start, row_end, minimum, maximum):

   if row_end > m:
     row_end = m
   results = []
   for row in range(row_start,row_end):
     count = 0
     for num in data[row]:
        if minimum <= num <= maximum:
           count = count + 1
     results.append(count)


start_time = time()
processes = []
task_size = math.ceil(m / mp.cpu_count())
print(task_size)


for i in range(mp.cpu_count()):
   lower_row_index = i*task_size
   upper_row_index = i*task_size + task_size
   p = mp.Process(target=howmany_within_range3, args=(lower_row_index, upper_row_index, 4, 8, ))
   processes.append(p)
   p.start()


for process in processes:
   process.join()


print(round(time() - start_time,8), 'seconds')
print(results[:10])
#TOTAL TIME = 0.06884527 seconds


#Q4:
#Option 1: Using apply_async with multiprocessing.Pool. TOTAL TIME = 0.04562044 seconds
#This option, for every row in the data, asynchronously applies the function "howmany_within_range2", this means that
#the function calls are concurrent across both CPUs, so every function call can be executed in parallel.
#Ideala for this problem, since we can divide our problem in small tasks, in this case the data matrix in rows.

#Option 2: Directly invoking multiprocessing.Process. TOTAL TIME = 0.01162577 seconds WITH ERRORS
#This option, creates 2 separate processes that can be managed indepently, enabling concurrency and parallelism as option 1.
#But is  more suited to tasks where is not easy to separate the problem in smaller tasks, which is not our case.
#It might be faster, but it requires careful handling of synchronization and error management to ensure reliable execution.

#Option 3: Batching tasks with multiprocessing.Process. TOTAL TIME = 0.22674489 seconds WITH ERRORS
#This option divides the workload into batches and assigns each batch to a separate process. Despite attempts to parallelize tasks,
#the execution time is longer, possibly due to inefficiencies in task partitioning or resource utilization.
#Errors encountered during execution indicate issues with task assignment or result handling, affecting the reliability of the approach.
#It may be suitable for scenarios where the workload can be evenly divided into batches, but requires careful optimization to achieve optimal performance.

#Option 4: Hybrid approach with task partitioning. TOTAL TIME = 0.06884527 seconds
#This approach combines batching tasks with dynamic task partitioning based on CPU count, aiming to optimize resource utilization.
#By dynamically adjusting the workload based on CPU count, it maximizes parallelism while avoiding excessive resource consumption.
#Its not the best option for this problem, because is more suited for scenarios with varying workload sizes.

3
0.06884527 seconds
[]


# **Q4: Write a summary about these different forms of running parallel code. In which situations would you use each one of those alternatives?**

Next, it follows a small example of the use of threads (not using the `threading` module, but the `multiprocessing.dummy` module, which replicates the multiprocessing module to work with threads) in python. More details at: https://stackoverflow.com/questions/2846653/how-can-i-use-threading-in-python. Here, we profile the code using `cProfile`.

(Note: although you may not see much advantage of using threads for these examples, if you try an application that needs to fetch files from the network, you may notice speedups - see example [here](https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python))

# **Q5: Modify these scripts to run using multiple threads instead of processes (you will need to use another module: `threading`). Compare their performance when varying the matrix size.**

In [None]:
import cProfile

from time import time
import numpy as np
import math
import multiprocessing as mp

# Prepare data
# value range
r = 10
# number of rows
#m = 1000
m = 5
# number of columns
#n = 10000
n = 30

np.random.seed(100)
arr = np.random.randint(0, r, size=[m, n])
data = arr.tolist()
# print(data[:10])

def howmany_within_range_rowonly(row, minimum=4, maximum=8):
  # print(mp.current_process(),' ',row) # this will print the process object and the item it is working with
  count = 0
  for num in row:
     if minimum <= num <= maximum:
        count = count + 1
  return count

def mythreads_1():
  # creating 2 threads
  from multiprocessing.dummy import Pool as ThreadPool
  start_time = time()
  pool = ThreadPool(mp.cpu_count())
  results = pool.map(howmany_within_range_rowonly, [row for row in data])
  print("Using cpu_count threads: ",round(time() - start_time,8), 'seconds')
  #print(results)

def mythreads_2():
  # Other thread version, trying to divide work according to indice
  from multiprocessing.dummy import Pool as ThreadPool
  start_time = time()
  task_size = int(math.ceil(m / mp.cpu_count()))
  pool = ThreadPool(mp.cpu_count())
  print(task_size)
  for i in range(mp.cpu_count()):
    lower_row_index = i*task_size
    upper_row_index = i*task_size + task_size
    results[i] = pool.map(howmany_within_range_rowonly, [data[j] for j in range(lower_row_index,upper_row_index)])
  print("Using cpu_count threads but dividing indice: ",round(time() - start_time,8), 'seconds')

def seq():
  # sequential version
  start_time = time()
  results = []
  for row in data:
    results.append(howmany_within_range_rowonly(row))
  print("Sequential: ",round(time() - start_time,8), 'seconds')
    #print(results)

if __name__=='__main__':
   cProfile.run("mythreads_1()")
   cProfile.run("mythreads_2()")
   cProfile.run("seq()")



Using cpu_count threads:  0.00265789 seconds
         598 function calls in 0.005 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        3    0.000    0.000    0.000    0.000 <frozen importlib._bootstrap>:1053(_handle_fromlist)
        6    0.000    0.000    0.000    0.000 <frozen importlib._bootstrap>:404(parent)
        1    0.000    0.000    0.003    0.003 <ipython-input-7-f4b09b333812>:31(mythreads_1)
        1    0.000    0.000    0.000    0.000 <ipython-input-7-f4b09b333812>:36(<listcomp>)
        1    0.000    0.000    0.005    0.005 <string>:1(<module>)
        1    0.000    0.000    0.002    0.002 __init__.py:122(Pool)
        2    0.000    0.000    0.000    0.000 __init__.py:36(__init__)
        2    0.000    0.000    0.000    0.000 __init__.py:43(start)
        5    0.000    0.000    0.000    0.000 _weakrefset.py:39(_remove)
        5    0.000    0.000    0.000    0.000 _weakrefset.py:86(add)
        2    0.000 

NameError: name 'results' is not defined

In [None]:
# Profiling each thread with yappi
!pip install yappi

Collecting yappi
  Downloading yappi-1.6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (79 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/79.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━[0m [32m71.7/79.1 kB[0m [31m2.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yappi
Successfully installed yappi-1.6.0


In [None]:
import yappi

from time import time
import numpy as np
import math
import multiprocessing as mp

# Prepare data
# value range
r = 10
# number of rows
#m = 1000
m = 5
# number of columns
#n = 10000
n = 30

np.random.seed(100)
arr = np.random.randint(0, r, size=[m, n])
data = arr.tolist()
# print(data[:10])

def howmany_within_range_rowonly(row, minimum=4, maximum=8):
  # print(mp.current_process(),' ',row) # this will print the process object and the item it is working with
  count = 0
  for num in row:
     if minimum <= num <= maximum:
        count = count + 1
  return count

def mythreads_1():
  # creating 2 threads
  from multiprocessing.dummy import Pool as ThreadPool
  start_time = time()
  pool = ThreadPool(mp.cpu_count())
  results = pool.map(howmany_within_range_rowonly, [row for row in data])
  print("Using cpu_count threads: ",round(time() - start_time,8), 'seconds')
  #print(results)

def mythreads_2():
  # Other thread version, trying to divide work according to indice
  start_time = time()
  task_size = int(math.ceil(m / mp.cpu_count()))
  pool = ThreadPool(mp.cpu_count())
  print(task_size)
  for i in range(mp.cpu_count()):
    lower_row_index = i*task_size
    upper_row_index = i*task_size + task_size
    results[i] = pool.map(howmany_within_range_rowonly, [data[j] for j in range(lower_row_index,upper_row_index)])
  print("Using cpu_count threads but dividing indice: ",round(time() - start_time,8), 'seconds')

def seq():
  # sequential version
  start_time = time()
  results = []
  for row in data:
    results.append(howmany_within_range_rowonly(row))
  print("Sequential: ",round(time() - start_time,8), 'seconds')
    #print(results)

yappi.start()
mythreads_1()
yappi.stop()

# retrieve thread stats by their thread id (given by yappi)
threads = yappi.get_thread_stats()
for thread in threads:
    print(
        "\nFunction stats for (%s) (%d)" % (thread.name, thread.id)
    )  # it is the Thread.__class__.__name__
    yappi.get_func_stats(ctx_id=thread.id).print_all()


#Q5:
#OPTION 1: Uses the ThreadPool class from the multiprocessing.dummy module to create multiple threads.
#Results 1 (r = 10, m = 1000, n = 10000)
#Using cpu_count threads:  0.00265789 seconds
#Results 2 (r = 10, m = 5, n = 30)
#Using cpu_count threads:  0.90982914 seconds

#OPTION 2: Divides the workload manually by creating multiple threads.
#Results 1 (r = 10, m = 1000, n = 10000)
#Using cpu_count threads:  2.6013186 seconds
#Results 2 (r = 10, m = 5, n = 30)
#Using cpu_count threads:  0.00370073 seconds

#For larger matrices, option 1 performs much better than option 2. Due to the fact that in option 1, for larger matrices, the automatic task distribution
#provided by ThreadPool ensures that the workload is evenly distributed among threads, maximizing parallelism and overall efficiency.
#While for smaller matrices, option 2 performs much better than option 1, likely due to reduced overhead associated with thread management.

Using cpu_count threads:  0.00370073 seconds

Function stats for (Thread) (12)

Clock type: CPU
Ordered by: totaltime, desc

name                                  ncall  tsub      ttot      tavg      
..ectors.py:452 EpollSelector.select  13     0.000145  1.206215  0.092786
..0/asyncio/events.py:78 Handle._run  14     0.000056  0.004484  0.000320
..am.py:563 ZMQStream._handle_events  14     0.000316  0.004273  0.000305
..p.py:730 AsyncIOLoop._run_callback  9      0.000031  0.002572  0.000286
..ventloop/zmqstream.py:661 <lambda>  9      0.000031  0.002541  0.000282
..2 _UnixSelectorEventLoop._run_once  12/6   0.000000  0.002290  0.000191
...py:190 AsyncIOLoop._handle_events  5      0.000033  0.001795  0.000359
..py:630 ZMQStream._rebuild_io_state  14     0.000172  0.001720  0.000123
..ream.py:591 ZMQStream._handle_recv  14     0.000139  0.001189  0.000085
..m.py:653 ZMQStream._update_handler  14     0.000152  0.001086  0.000078
..attrsettr.py:42 Socket.__getattr__  28     0.000290  0.00

# **Q6: For what kind of tasks should you use processes and when should you use threads?**

Below, you can find results when I ran these experiments in my own machine.

Execution times and speedups running on an AMD FX(tm)-8120 Eight-Core Processor (1.4GHz), 16 GBytes RAM, for a matrix with dimension 1000 x 100000.

`#########################################################################`<br>
`Sequential`<br>
`7.727450847625732 seconds`<br>
`[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]`<br>
`#########################################################################`<br>
`pool.apply`<br>
`20.577924489974976 seconds`<br>
`[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]`<br>
**Slowdown: 2.66**<br>
`#########################################################################`<br>
`pool.map`<br>
`4.117670059204102 seconds`<br>
`[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]`<br>
**Speedup: 1.88**<br>
`#########################################################################`<br>
`pool.starmap`<br>
`4.02571177482605 seconds`<br>
`[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]`<br>
**Speedup: 1.92**<br>
`#########################################################################`<br>
`pool.apply_async`<br>
`3.945971965789795 seconds`<br>
`[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]`<br>
**Speedup: 1.96**<br>
`#########################################################################`<br>
`Process` (creating fewer processes - 2nd solution above)<br>
`1.87694931 seconds`<br>
**Speedup: 4.11**<br>



Other run by the same machine running the codes on a matrix of dimension 1000 x 500000

`#########################################################################`<br>
`Sequential`<br>
`38.80920100212097 seconds`<br>
`[250127, 250430, 250285, 249630, 249829, 250269, 250135, 249801, 250431, 249623]`<br>
`#########################################################################`<br>
`pool.apply`<br>
`71.33826541900635 seconds`<br>
`[250127, 250430, 250285, 249630, 249829, 250269, 250135, 249801, 250431, 249623]`<br>
**Slowdown: 1.83**<br>
`#########################################################################`<br>
`pool.map`<br>
`17.257094383239746 seconds`<br>
`[250127, 250430, 250285, 249630, 249829, 250269, 250135, 249801, 250431, 249623]`<br>
**Speedup: 2.25**<br>
`#########################################################################`<br>
`pool.starmap`<br>
`15.533486604690552 seconds`<br>
`[250127, 250430, 250285, 249630, 249829, 250269, 250135, 249801, 250431, 249623]`<br>
**Speedup: 2.50**<br>
`#########################################################################`<br>
`pool.apply_async`<br>
`17.82588529586792 seconds`<br>
`[250127, 250430, 250285, 249630, 249829, 250269, 250135, 249801, 250431, 249623`]<br>
**Speedup: 2.18**<br>
`#########################################################################`<br>
`Processes` (creating fewer processes - 2nd solution above)<br>
`12.95342517` seconds
**Speedup: 2.99**<br>

In [None]:
import yappi

from time import time
import numpy as np
import math
import multiprocessing as mp

def mythreads_1():
  # creating 2 threads
  from multiprocessing.dummy import Pool as ThreadPool
  print(mp.cpu_count())
  pool = ThreadPool(mp.cpu_count())
  start_time = time()
  results = pool.map(sum,[row for row in data])
  print("Using cpu_count threads: ",round(time() - start_time,8), 'seconds')
  #print(results)

def mythreads_2():
  # Other thread version, trying to divide work according to indice
  from multiprocessing.dummy import Pool as ThreadPool
  start_time = time()
  task_size = int(math.ceil(m / mp.cpu_count()))
  pool = ThreadPool(mp.cpu_count())
  print(task_size)
  for i in range(mp.cpu_count()):
    lower_row_index = i*task_size
    upper_row_index = i*task_size + task_size
    results[i] = pool.map(howmany_within_range_rowonly, [data[j] for j in range(lower_row_index,upper_row_index)])
  print("Using cpu_count threads but dividing indice: ",round(time() - start_time,8), 'seconds')

def seq():
  # sequential version
  start_time = time()
  results = []
  for row in data:
    results.append(sum(row))
  print("Sequential: ",round(time() - start_time,8), 'seconds')
    #print(results)

yappi.start()
mythreads_1()
yappi.stop()
seq()

# # retrieve thread stats by their thread id (given by yappi)
# threads = yappi.get_thread_stats()
# for thread in threads:
#     print(
#         "\nFunction stats for (%s) (%d)" % (thread.name, thread.id)
#     )  # it is the Thread.__class__.__name__
#     yappi.get_func_stats(ctx_id=thread.id).print_all()


#Q6:
#Processes:
#Processes are preferable for tasks that are computationally intensive and spend most of their time performing CPU computations.
#Processes provide independent memory space for each instance, ensuring that one process cannot affect the memory of another process. This is beneficial for tasks requiring isolated execution environments.
#Processes can leverage multiple CPU cores efficiently, making them suitable for parallel execution of computationally intensive tasks.
#Processes are preferable when dealing with shared resources that need to be protected from concurrent access, as they offer separate memory spaces and avoid the complexities of shared memory synchronization.
#Threads:
#Threads are suitable for I/O-bound tasks where the majority of time is spent waiting for I/O operations to complete.
#Threads have lower overhead compared to processes, making them more lightweight and efficient for tasks with frequent context switching or lightweight computations.
#Threads within the same process share memory space, allowing them to access shared data structures.
#Threads are useful when the hardware has limited parallelism as they enable concurrent execution within a single CPU core.

2
Using cpu_count threads:  0.00080729 seconds
Sequential:  5.25e-06 seconds
