In [1]:
import multiprocessing as mp
import numpy as np
from time import time
import math

import utils

In [3]:
# Prepare data
# value range
r = 10
# number of rows
# m = 1000
m = 1000
# number of columns
# n = 10000
n = 100000

np.random.seed(100)
arr = np.random.randint(0, r, size=[m, n])
data = arr.tolist()
print(data[:10])

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [5]:
# Sequential code: Solution Without Parallelization

def how_many_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    #print(mp.current_process(),' ',row)
    count = 0
    for num in row:
       if minimum <= num <= maximum:
          count = count + 1
    return count

# begin timing
start_time = time()

results = []
for row in data:
   results.append(how_many_within_range(row, minimum=4, maximum=8))

# end timing
print(f'Total time: {time()-start_time} seconds')

print(results[:10])


Total time: 3.7401390075683594 seconds
[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]


In [6]:
!lsb_release -a

No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 23.10
Release:	23.10
Codename:	mantic


In [7]:
!uname -a

Linux joaom-Type1ProductConfigId 6.5.0-17-generic #17-Ubuntu SMP PREEMPT_DYNAMIC Thu Jan 11 14:01:59 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux


In [8]:
!cat /proc/cpuinfo

processor	: 0
vendor_id	: GenuineIntel
cpu family	: 6
model		: 140
model name	: 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz
stepping	: 1
microcode	: 0xb4
cpu MHz		: 2544.938
cache size	: 12288 KB
physical id	: 0
siblings	: 8
core id		: 0
cpu cores	: 4
apicid		: 0
initial apicid	: 0
fpu		: yes
fpu_exception	: yes
cpuid level	: 27
wp		: yes
flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb cat_l2 invpcid_single cdp_l2 ssbd ibrs ibpb stibp ibrs_enhanced tpr_shadow flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid rdt_a avx51

In [9]:
# Option 1 pool apply

num_cpus = mp.cpu_count()
print('Num cpus = ', num_cpus)

start_time = time()

pool = mp.Pool(mp.cpu_count())
print('Time to create pool: ',round(time() - start_time,8), 'seconds')

results = [pool.apply(how_many_within_range, args=(row, 4, 8)) for row in data]

pool.close()

print(f'Total time: {time()-start_time} seconds')

print(results[:10])




Num cpus =  8
Time to create pool:  0.12904763 seconds
Total time: 7.681582450866699 seconds
[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]


In [10]:
# Option 2 pool map


def how_many_within_range_rowonly(row, minimum=4, maximum=8):
  # print(mp.current_process(),' ',row) # this will print the process object and the item it is working with
  count = 0
  for num in row:
     if minimum <= num <= maximum:
        count = count + 1
  return count

start_time = time()

pool = mp.Pool(mp.cpu_count())
results = pool.map(how_many_within_range_rowonly, [row for row in data])

pool.close()

print(f'Total time: {time()-start_time} seconds')

print(results[:10])



Total time: 2.5759992599487305 seconds
[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]


In [11]:
# Option 3 pool starmap

start_time = time()

pool = mp.Pool(mp.cpu_count())

results = pool.starmap(how_many_within_range, [(row, 4, 8) for row in data])

pool.close()
print(f'Total time: {time()-start_time} seconds')
print(results[:10])


Total time: 2.5045793056488037 seconds
[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]


In [12]:
# Option 4 pool apply async with function how_many_within_range2 in the same file

start_time = time()

pool = mp.Pool(mp.cpu_count())

results = []

def how_many_within_range2(i, row, minimum, maximum):
   """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
   count = 0
   for num in row:
      if minimum <= num <= maximum:
         count = count + 1
 #  print(str(i) + ' ' + count)
   return (i, count)

def collect_result(result):
    global results
    #print(result)
    results.append(result)

for i, row in enumerate(data):
    pool.apply_async(how_many_within_range2, args=(i, row, 4, 8, ), callback=collect_result)
    
pool.close()
pool.join()

print(f'Total time: {time()-start_time} seconds')

results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])
# Option 4 pool apply async with function how_many_within_range2 in the same file


Process ForkPoolWorker-25:
Process ForkPoolWorker-27:
Process ForkPoolWorker-26:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Process ForkPoolWorker-28:
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call las

KeyboardInterrupt: 

Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()


In [13]:
# Option 4 pool apply async with function how_many_within_range2 in another file


pool = mp.Pool(mp.cpu_count())

results = []
def collect_result(result):
    global results
 #   print(result)
    results.append(result)
    
start_time = time()

for i, row in enumerate(data):
    pool.apply_async(utils.how_many_within_range2, args=(i, row, 4, 8, ), callback=collect_result)

pool.close()
pool.join() 

print(f'Total time: {time()-start_time} seconds')

results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])

Total time: 2.4002602100372314 seconds
[50038, 50181, 50084, 50103, 49721, 50100, 50345, 50090, 50007, 49888]


In [None]:
# Option 5 Process (Demora muito tempo)

def how_many_within_range3(i, row, minimum, maximum):
   """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
   count = 0
   global results
   for num in row:
      if minimum <= num <= maximum:
         count = count + 1
   results[i] = count
   
start_time = time()
processes = []

for i, row in enumerate(data):
   p = mp.Process(target=how_many_within_range3, args=(i, row, 4, 8, ))
   processes.append(p)
   p.start() 
   
for process in processes:
   process.join()
   
print(f'Total time: {time()-start_time} seconds')
print(results[:10])


In [65]:
# Parallelizing with Process()

def howmany_within_range3(row_start, row_end, minimum, maximum):
   """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
   if row_end > m:
     row_end = m
   results = []
   for row in range(row_start,row_end):
     count = 0
     for num in data[row]:
        if minimum <= num <= maximum:
           count = count + 1
     results.append(count)
   #print(results)

# begin timing
start_time = time()

processes = []

task_size = math.ceil(m / mp.cpu_count())
print(task_size)
for i in range(mp.cpu_count()):
   lower_row_index = i*task_size
   upper_row_index = i*task_size + task_size
   p = mp.Process(target=howmany_within_range3, args=(lower_row_index, upper_row_index, 4, 8, ))
   processes.append(p)
   p.start()

for process in processes:
   process.join()

# end timing
     
print(f'Total time: {time()-start_time} seconds')
print(results[:10])    
     
     
     

12500
Total time: 0.3232150077819824 seconds
[(0, 6), (1, 3), (2, 5), (3, 4), (4, 4), (5, 5), (6, 5), (7, 7), (8, 6), (9, 5)]
