<a href="https://colab.research.google.com/github/yuvalofek/Multiprocessing/blob/main/Multiprocessing_Matrix_Multimplication.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multiprocessing for Matrix Multiplication

An investigation of multi and single core multiprocessing using concurrent futures ThreadPoolExecuter, ProcessPoolExecutor, as well as the threading  and multiprocessing libraries. 


The plan is to create functions using each one of the above tools, potentially using the same tool multiple times in different ways, and testing to see which ends up being faster. 


In [172]:
import concurrent.futures as cp
import threading
import multiprocessing

import random
import numpy as np
import time

from pprint import pprint
from collections import defaultdict
from tqdm import tqdm

## Dot Product:
The first idea I had was to try to first do dot products, and then build them up into matrix multiplication. 

A traditional dot product is:
\begin{equation}
  \text{dot}(x,y) = y^Tx
\end{equation}

Or:

\begin{equation}
  \text{dot}(x,y) = \sum _{i=0}^N{x_iy_i^T}
\end{equation}

I create a function:
\begin{equation}
  \mathsf{F}(x, y^T) \rightarrow dot(x,y)
\end{equation}


### Functions created
I ended up creating a number of versions of the dot product function, mainly under these following umbrellas:
* Using only standard python, to use as a baseline
* Using concurrent.futures ThreadPoolExecuter
* Using concurrent.futures ProcessPoolExecuter
* Using Threading Thread
* Using Multiprocessing Pool

In [173]:
dot_product_functions = {}

#### Pure Python

In [174]:
def dot_prod_nothing(x,y, nothing=None):
  """
  Dot product of two vectors (without transpose) using Python
  params:
  x, y - vectors of same length
  nothing - (unused)
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  for i in range(len(x)):
    ip += x[i]*y[i]
  return ip

dot_product_functions['Python'] = dot_prod_nothing

In [175]:
def mult(x,y):
  return x*y

#### Concurrent.futures

In [176]:
def dot_prod_map_th(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using ThreadPoolExecutor and map
  params:
  x, y - vectors of same length
  max_workers - maximum workers
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  ip = 0
  with cp.ThreadPoolExecutor(max_workers=max_workers) as ex:
    r = ex.map(mult, x, y)
    for l in r:
      ip+=l
  return ip

dot_product_functions['ThreadPool_map'] = dot_prod_map_th

In [177]:
def dot_prod_map_mult(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using ProcessPoolExecutor and map
  params:
  x, y - vectors of same length
  max_workers - maximum workers
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  ip = 0
  with cp.ProcessPoolExecutor(max_workers=max_workers) as ex:
    r = ex.map(mult, x, y)
    for l in r:
      ip+=l
  return ip

dot_product_functions['ProcessPool_map'] = dot_prod_map_mult

In [178]:
def dot_prod_sub_th(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using ThreadPoolExecutor and submit
  params:
  x, y - vectors of same length
  max_workers - maximum workers
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  with cp.ThreadPoolExecutor(max_workers=max_workers) as ex:
    for i in range(len(x)):      
      ip+= ex.submit(mult, x[i], y[i]).result()
  return ip

dot_product_functions['ThreadPool_submit'] = dot_prod_sub_th

In [179]:
def dot_prod_sub_mult(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using ProcessPoolExecutor and submit
  params:
  x, y - vectors of same length
  max_workers - maximum workers
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  ip = 0
  with cp.ProcessPoolExecutor(max_workers=max_workers) as ex:
    for i in range(len(x)):      
      ip+= ex.submit(mult, x[i], y[i]).result()
  return ip

dot_product_functions['ProcessPool_submit'] = dot_prod_sub_mult

In [180]:
# dot product with execute instead
def dot_prod_ac(x,y, max_workers=None):

  """
  Dot product of two vectors (without transpose) using ThreadPoolExecutor, submit, and as_completed
  params:
  x, y - vectors of same length
  max_workers - maximum workers
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  with cp.ThreadPoolExecutor(max_workers=max_workers) as ex:
      mults = [ex.submit(mult, x[i], y[i]) for i in range(len(x))]
  for multiples in cp.as_completed(mults):
    ip+=multiples.result()
  return ip

dot_product_functions['ThreadPool_submit_ac'] = dot_prod_ac

In [181]:
# dot product with execute instead
def dot_prod_ac_mult(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose)using ProcessdPoolExecutor, submit, and as_completed
  params:
  x, y - vectors of same length
  max_workers - maximum workers
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  ip = 0
  with cp.ProcessPoolExecutor(max_workers=max_workers) as ex:
      mults = [ex.submit(mult, x[i], y[i]) for i in range(len(x))]
  for multiples in cp.as_completed(mults):
    ip+=multiples.result()
  return ip

dot_product_functions['ProcessPool_submit_ac'] = dot_prod_ac_mult

#### Threading Library

In [182]:
# threading only
def dot_prod_thread(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using the threading Thread
  params:
  x, y - vectors of same length
  max_workers - maximum workers for threading (unused)
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  threads = []
  results = []
  for i in range(len(x)):
    t = threading.Thread(target=mult_r, args = (x[i],y[i], results))
    t.start()
    threads.append(t)
  
  for t in threads:
    t.join()
  for r in results:
    ip+= r
  return ip

def mult_r(x,y, results):
  results.append(x*y)


dot_product_functions['Threading_threads'] = dot_prod_thread

#### Multiprocessing Library

In [183]:
def dot_prod_multiprocess_apply(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using the Pool apply
  params:
  x, y - vectors of same length
  max_workers - maximum workers for threading (unused)
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  r = []
  with multiprocessing.Pool() as p:
    for i in range(len(x)):
      r.append(p.apply(mult, (x[i], y[i])))
    for l in r:
      ip+=l

  return ip

dot_product_functions['Multiprocessing_lib_apply'] = dot_prod_multiprocess_apply

In [184]:
def dot_prod_multiprocess_apply_async(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using the Pool apply_async
  params:
  x, y - vectors of same length
  max_workers - maximum workers for threading (unused)
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  r = []
  with multiprocessing.Pool() as p:
    for i in range(len(x)):
      r.append(p.apply_async(mult, (x[i], y[i])).get())
    for l in r:
      ip+=l

  return ip

dot_product_functions['Multiprocessing_lib_apply_async'] = dot_prod_multiprocess_apply_async

In [185]:
def dot_prod_multiprocess_starmap(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using the Pool starmap
  params:
  x, y - vectors of same length
  max_workers - maximum workers for threading (unused)
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  with multiprocessing.Pool() as p:
    r = p.starmap(mult, zip(x, y))
    for l in r:
      ip+=l
  return ip


dot_product_functions['Multiprocessing_lib_starmap'] = dot_prod_multiprocess_starmap

In [186]:
def dot_prod_multiprocess_starmap_async(x,y, max_workers=None):
  """
  Dot product of two vectors (without transpose) using the Pool starmap_async
  params:
  x, y - vectors of same length
  max_workers - maximum workers for threading (unused)
  """
  # check that the vector sizes are the same
  if len(x) != len(y):
    raise ValueError(f'Length of vectors {x} & {y} is not equal')
  
  # thread the multiplications and add sequentially
  ip = 0
  with multiprocessing.Pool() as p:
    r = p.starmap_async(mult, zip(x, y))
    for l in r:
      ip+=l
  return ip


dot_product_functions['Multiprocessing_lib_starmap_async'] = dot_prod_multiprocess_starmap

### Testing
First we do a small test to see that everything is going well, then we do a stress test with a large input value

In [187]:
# Contrived test case:
x = [1,2,3, 4]
y = x[::-1]
results = {name:dot_product(x,y) for name, dot_product in dot_product_functions.items()}

for name, result in results.items():
  print(name, ': ', result)

# 1*4 + 2*3 + 3*2 +4*1 = 4 + 6 + 6 + 4 = 20

Python :  20
ThreadPool_map :  20
ProcessPool_map :  20
ThreadPool_submit :  20
ProcessPool_submit :  20
ThreadPool_submit_ac :  20
ProcessPool_submit_ac :  20
Threading_threads :  20
Multiprocessing_lib_apply :  20
Multiprocessing_lib_apply_async :  20
Multiprocessing_lib_starmap :  20
Multiprocessing_lib_starmap_async :  20


In [189]:
# Random test case:
MAX_SIZE = 100
MAX_STDV = 10
n = 5000
x_stdv, y_stdv = random.randint(1,MAX_STDV), random.randint(1,MAX_STDV)
x = (x_stdv*np.random.randn(n))
y = (y_stdv*np.random.randn(n))
print(f'Vector size: {n}')

results = defaultdict(list)
durations = defaultdict(list)
mx_workers = [None]
mx_workers.extend(range(1, 10))
for i in tqdm(mx_workers):  
  for name, dot_prod in dot_product_functions.items():
    start_time = time.time()
    results[name].append(dot_prod(x,y, i))
    durations[name].append(time.time() - start_time)

## Numpy for comparison
start_time = time.time()
results['numpy'] = [np.dot(x,y)]
durations['numpy'].append(time.time() - start_time)



print('')
print('Average Results')
for name, result in results.items():
  print(name, ': ', sum(result)/len(result))

print('')
print('Average Duration')
for name, duration in durations.items():
  print(name, ': ', sum(duration)/len(duration))


Vector size: 5000


100%|██████████| 10/10 [02:04<00:00, 12.49s/it]


Average Results
Python :  -44.219752776722835
ThreadPool_map :  -44.219752776722835
ProcessPool_map :  -44.219752776722835
ThreadPool_submit :  -44.219752776722835
ProcessPool_submit :  -44.219752776722835
ThreadPool_submit_ac :  -44.21975277672318
ProcessPool_submit_ac :  -44.21975277672312
Threading_threads :  -44.219752776722835
Multiprocessing_lib_apply :  -44.219752776722835
Multiprocessing_lib_apply_async :  -44.219752776722835
Multiprocessing_lib_starmap :  -44.219752776722835
Multiprocessing_lib_starmap_async :  -44.219752776722835
numpy :  -44.219752776723446

Average Duration
Python :  0.004215073585510254
ThreadPool_map :  0.19429244995117187
ProcessPool_map :  2.314366412162781
ThreadPool_submit :  0.2800591468811035
ProcessPool_submit :  3.242601180076599
ThreadPool_submit_ac :  0.22586772441864014
ProcessPool_submit_ac :  2.19613721370697
Threading_threads :  0.6056812763214111
Multiprocessing_lib_apply :  1.551779580116272
Multiprocessing_lib_apply_async :  1.5413208007




### Conclusions:
Given the strain test test, we see that using Multiprocessing starmap was the fastest out of the multiprocessing options, but that using nothing at all was (significantly) faster (and numpy is of course even faster than that)! We opt to use the python version because of this for the matrix multiplication.

This is likely telling us that multiprocessing multiplication like this is not benefitial, maybe because of the additional calls and overhead costs.

Another interesting thing we see is that the as_completed inner product doesn't match the other inner_products exactly.  

## Matrix multiplication

Matrix multiplication is of the form:
\begin{equation}
  \mathsf{M}_{n \text{x} m}(\mathbb{R}) * \mathsf{M}_{m \text{x} l}(\mathbb{R}) \rightarrow \mathsf{M}_{n \text{x} l}(\mathbb{R})
\end{equation}

Therefore we do:
1. Check that the inputs are matrices
2. Check that the inner matrix dimenstions $m$ match for the two matrices
3. Preform matrix multiplication

We use the 'normal' python version as our 'inner product component' in the following functions and only consider creating functions for the top 5 performing methods from the inner product functions:
1. ThreadPool Map
2. ThreadPool Submit
3. ThreadPool Submit as_completed
4. Multiprocessing Starmap
5. Multiprocessing Starmap_async

We don't create the submit as_completed function. This is due to the need to know which inner product belongs where, which isn't suitable for the as_completed method

In [203]:
def is_matrix(X):
  ## Check if the input is a matrix
  lengths = map(len, X)
  try:
    # first length
    first = next(lengths)
  except StopIteration:
    return True
  return all(first == x for x in lengths)

In [204]:
# Testing the is_matrix funciton:
is_matrix([[1, 2], [3], [1,3]]) # False

False

In [262]:
mat_mul_functions = {}

In [263]:
def mat_mul_nothing(X, Y):
  # check that the inputs are valid matrices
  if not is_matrix(X):
    raise ValueError(f'Input {X} is not a matrix')
  if not is_matrix(Y):
    raise ValueError(f'Input {Y} is not a matrix')
  
  # check that the dimensions of the matrices are valid for multiplication
  # and save lengths
  n, m, l = len(X), len(X[0]), len(Y[0])
  if m != len(Y):
    raise ValueError('Dimensions of X and Y do not match')

  # thread each of the inner products
  output = [[0 for j in range(l)] for i in range(n) ]
  for i in range(n):
    for j in range(l):
      x =  X[i]
      y = [row[j] for row in Y]    
      output[i][j] = dot_prod_nothing(x, y)
        
  return output

mat_mul_functions['Python'] = mat_mul_nothing

In [264]:
# Checking the list transpose we found here: https://stackoverflow.com/questions/6473679/transpose-list-of-lists
np.array(list(map(list, zip(*Y)))) == np.array(Y).T

array([[ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True]])

In [265]:
def mat_mul_submit(X, Y, max_workers=None):
  # check that the inputs are valid matrices
  if not is_matrix(X):
    raise ValueError(f'Input X is not a matrix')
  if not is_matrix(Y):
    raise ValueError(f'Input Y is not a matrix')
  
  # check that the dimensions of the matrices are valid for multiplication
  n, m, l = len(X), len(X[0]), len(Y[0])
  if m != len(Y):
    raise ValueError('Dimensions of X and Y do not match')

  # thread each of the inner products
  output = [[0 for j in range(l)] for i in range(n) ]
  with cp.ThreadPoolExecutor(max_workers=max_workers) as ex:
    for i in range(n):
      for j in range(l):
        x =  X[i]
        y = [row[j] for row in Y]    
        output[i][j] = ex.submit(dot_prod_nothing, x, y).result()
  return output

mat_mul_functions['ThreadPool_submit'] = mat_mul_submit

In [266]:
def mat_mul_map(X, Y, max_workers=None):
  # check that the inputs are valid matrices
  if not is_matrix(X):
    raise ValueError(f'Input X is not a matrix')
  if not is_matrix(Y):
    raise ValueError(f'Input Y is not a matrix')
  
  # check that the dimensions of the matrices are valid for multiplication
  n, m, l = len(X), len(X[0]), len(Y[0])
  if m != len(Y):
    raise ValueError('Dimensions of X and Y do not match')

  output = [[] for i in range(n) ]
  # thread each of the inner products
  Y = list(map(list, zip(*Y)))
  with cp.ThreadPoolExecutor(max_workers=max_workers) as ex:
    for i in range(n):
      output[i] = list(ex.map(dot_prod_nothing, [X[i]]*n, Y))
  return output

mat_mul_functions['ThreadPool_map'] = mat_mul_map

In [267]:
def mat_mul_multi_starmap(X, Y, max_workers=None):
  # check that the inputs are valid matrices
  if not is_matrix(X):
    raise ValueError(f'Input X is not a matrix')
  if not is_matrix(Y):
    raise ValueError(f'Input Y is not a matrix')
  
  # check that the dimensions of the matrices are valid for multiplication
  n, m, l = len(X), len(X[0]), len(Y[0])
  if m != len(Y):
    raise ValueError('Dimensions of X and Y do not match')

  output = [[] for i in range(n) ]
  # thread each of the inner products
  Y = list(map(list, zip(*Y)))
  with multiprocessing.Pool() as p:
    for i in range(n):
      output[i] = list(p.starmap(dot_prod_nothing, zip([X[i]]*n, Y)))
  return output

mat_mul_functions['Multiprocessing_starmap'] = mat_mul_multi_starmap

In [268]:
def mat_mul_multi_starmap_async(X, Y, max_workers=None):
  # check that the inputs are valid matrices
  if not is_matrix(X):
    raise ValueError(f'Input X is not a matrix')
  if not is_matrix(Y):
    raise ValueError(f'Input Y is not a matrix')
  
  # check that the dimensions of the matrices are valid for multiplication
  n, m, l = len(X), len(X[0]), len(Y[0])
  if m != len(Y):
    raise ValueError('Dimensions of X and Y do not match')

  output = [[] for i in range(n) ]
  # thread each of the inner products
  Y = list(map(list, zip(*Y)))
  with multiprocessing.Pool() as p:
    for i in range(n):
      output[i] = p.starmap_async(dot_prod_nothing, zip([X[i]]*n, Y)).get()
  return output

mat_mul_functions['Multiprocessing_starmap_async'] = mat_mul_multi_starmap_async

In [297]:
 == True

False

In [307]:
# Random test case
MAX_SIZE = 10
MAX_STDV = 10
N = 300
n, m, l = N, N, N 
x_stdv, y_stdv = random.randint(1,MAX_STDV), random.randint(1,MAX_STDV)
X = (x_stdv*np.random.randn(n, m))
Y = (y_stdv*np.random.randn(m, l))

X = X.tolist()
Y = Y.tolist()




results = defaultdict(list)
durations = defaultdict(list)
mx_workers = [None]
mx_workers.extend(range(1, 10))

for i in tqdm(mx_workers):
  for name, mat_mul in mat_mul_functions.items():
    start_time = time.time()
    results[name].append(mat_mul(X, Y))
    durations[name].append(time.time() - start_time)

avg = []
print('')
check = True
old_val = None
for name, result in results.items():
  mean_result = np.mean(np.stack([np.array(r) for r in result]), axis=0)
  try:
    old_val == None
    old_val =mean_result
  except:
    check = check & (mean_result == old_val).all()

  print(name, mean_result.shape)
  print('Result matches previous: ', check)

## Numpy for comparison
start_time = time.time()
results['numpy'] = [np.matmul(X,Y)]
durations['numpy'] = [time.time() - start_time]


print('')
print('Average Duration')
for name, duration in durations.items():
  print(name, ': ', sum(duration)/len(duration))

100%|██████████| 10/10 [07:24<00:00, 44.46s/it]



Python (300, 300)
Result matches previous:  True
ThreadPool_submit (300, 300)
Result matches previous:  True
ThreadPool_map (300, 300)
Result matches previous:  True
Multiprocessing_starmap (300, 300)
Result matches previous:  True
Multiprocessing_starmap_async (300, 300)
Result matches previous:  True

Average Duration
Python :  5.79459581375122
ThreadPool_submit :  18.671090173721314
ThreadPool_map :  7.166719651222229
Multiprocessing_starmap :  6.4087567806243895
Multiprocessing_starmap_async :  6.418905210494995
numpy :  0.019414186477661133


### Conclusions:

Once again we see that threading didn't help at all and that numpy is the way to go for any matrix multiplication operation. We see how significant the speed-up numpy gives us and how much time we would be waisting if we didn't use it. 

It makes sense that the python only version was faster than threading looking back, because matrix multiplication involves a bunch of very simple operations and maybe the overhead was too much. 