## Multiprocessing

$\textbf{Borrowed from}$ 

$ \textit{codebasics}$<br>
https://www.youtube.com/playlist?list=PLeo1K3hjS3uub3PRhdoCTY8BxMKSW7RjN

In [None]:
import time
import multiprocessing

### Snippet 1  <br>$\textit{Hello World}$

In [None]:
arr = [i for i in range(5)]
def getSquare(x):
    for i in x:
        time.sleep(2)
        print('', end='s ')
    print()
    
    
def getCube(x):
    for i in x:
        time.sleep(3)
        print('', end='c ')
    print()
    
p1 = multiprocessing.Process(target = getSquare, args = (arr,))
p2 = multiprocessing.Process(target = getCube, args = (arr,))

p1.start()
p2.start()


p1.join()
p2.join()

print('Done')

### Snippet 2 <br>$\textit{Shortcoming of global variables}$

In [None]:
square_result = []
arr = [i for i in range(5)]
def getSquare(x):
    global square_result
    for i in x:
        square_result.append(i*i)
    
    
    
p1 = multiprocessing.Process(target = getSquare, args = (arr,))

p1.start()
p1.join()

print(square_result)
#Global variables are not shared amongst processes

### Snippet 3 <br>$\textit{Array}$

In [None]:
#Array as shared variable
square_result = multiprocessing.Array('i',5) #Here i stands for integer. For double it is d
arr = [i for i in range(5)]

def getSquare(x, square_result):
    for idx, item in enumerate(x):
        square_result[idx] = item**2
    
    
    
p1 = multiprocessing.Process(target = getSquare, args = (arr, square_result))
p1.start()
p1.join()


print(square_result[:])

### Snippet 4 <br>$\textit{Value}$

In [None]:
#Value as shared Variable
val = multiprocessing.Value('d',0.5)
arr = [i for i in range(5)]

def getSquare(x, val):
    for item in x:
        val.value+=(item**2)
        print(val)
    
    
    
p1 = multiprocessing.Process(target = getSquare, args = (arr, val))
p1.start()
p1.join()


print(val.value)

### Snippet 5 <br>$\textit{Queue}$

In [None]:
#Queue as shared Variable
# q = multiprocessing.Queue()
arr = [i for i in range(5)]

def getSquare(x, q):
    for item in x:
#         time.sleep(2)
        q.put('s')
    
def getCube(x, q):
    for item in x:
#         time.sleep(3)
        q.put('c')
        
#The time intervals have been chosen and are not random
    
p1 = multiprocessing.Process(target = getSquare, args = (arr, q))
p2 = multiprocessing.Process(target = getCube, args = (arr, q))
p1.start()
p2.start()

p1.join()
p2.join()



while q.empty() is False:
    print(q.get(),end=' ')

### Snippet 6  <br>$\textit{Locks}$

In [None]:
#Without lock
def deposit(balance):
    for i in range(100):
        time.sleep(0.01)
        balance.value+=1
        
def withdraw(balance):
    for i in range(100):
        time.sleep(0.02)
        balance.value-=1
        


for i in range(20):
    balance = multiprocessing.Value('i',200)
    d = multiprocessing.Process(target = deposit, args = (balance,))
    w = multiprocessing.Process(target = withdraw, args = (balance,))
    d.start()
    w.start()
    d.join()
    w.join()
    
    print(balance.value, end=' ')

In [None]:
#With lock
def deposit(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        balance.value+=1
        lock.release()
        
def withdraw(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        balance.value-=1
        lock.release()
        

for i in range(20):
    balance = multiprocessing.Value('i',200)
    lock = multiprocessing.Lock()
    d = multiprocessing.Process(target = deposit, args = (balance, lock))
    w = multiprocessing.Process(target = withdraw, args = (balance, lock))
    d.start()
    w.start()
    w.join()
    d.join()
    print(balance.value,end=' ')

### Snippet 6 <br>$\textit{Divide among cores}$

In [None]:
from multiprocessing import Pool

In [None]:
def f(n):
    return n**2

p = Pool()

a = time.time()
result = p.map(f, range(5)) #This alone will divide the work among all available cores equally
p.close()
p.join()
b = time.time() - a
print(result)

In [None]:
def f(n):
    sum = 0
    for i in range(10000):
        sum+=(i*i)
    

p = Pool()

a = time.time()
result = p.map(f, range(10000)) #This alone will divide the work among all available cores equally
p.close()
p.join()
b = time.time() - a
print('Time taken: {}'.format(b))


In [None]:
a = time.time()
for i in range(10000):
    sum = 0
    for j in range(10000):
        sum+=(j*j)

b = time.time()- a
print('Time taken: {}'.format(b))

Number of cores

In [None]:
multiprocessing.cpu_count()

In [24]:
import concurrent.futures
import torch
import torch.nn as nn
import torch.optim as optimizer

In [2]:
class mymodel(nn.Module):
    def __init__(self):
        super(mymodel,self).__init__()
        self.weight = nn.Parameter(torch.randn(3,2))
        self.bias = nn.Parameter(torch.randn(2))
        
    def forward(self, X):
        return torch.matmul(X,self.weight)+self.bias

In [37]:
def doTrain(a1,b1):    
#     a1 = model()
    optim = optimizer.Adam(lr=1e-3, params = a1.parameters())
    optim.zero_grad()
#     out = a1(X)
    loss = b1.sum()
    loss.backward()
    optim.step()
    
    
    return list(a1.parameters())

In [None]:
X = torch.ones(1,3, requires_grad = True)
a1 = mymodel()
b1 = a1(X)

updatedParams = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(doTrain, a1, b1) for _ in range(5)]
    
    for f in concurrent.futures.as_completed(results):
        updatedParams.append(f.result())


# with concurrent.futures.ProcessPoolExecutor() as executor:
#     result = executor.map()

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/home/shouvik/Desktop/DeepLearning/dl/lib/python3.6/site-packages/torch/multiprocessing/reductions.py", line 136, in reduce_tensor
    raise RuntimeError("Cowardly refusing to serialize non-leaf tensor which requires_grad, "
RuntimeError: Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries.  If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump

In [36]:
updatedParams

[[Parameter containing:
  tensor([[ 0.8683, -0.0678],
          [ 1.2524, -0.7526],
          [-0.5902,  0.4066]], requires_grad=True),
  Parameter containing:
  tensor([-1.5302,  0.9537], requires_grad=True)],
 [Parameter containing:
  tensor([[ 0.8683, -0.0678],
          [ 1.2524, -0.7526],
          [-0.5902,  0.4066]], requires_grad=True),
  Parameter containing:
  tensor([-1.5302,  0.9537], requires_grad=True)],
 [Parameter containing:
  tensor([[ 0.8683, -0.0678],
          [ 1.2524, -0.7526],
          [-0.5902,  0.4066]], requires_grad=True),
  Parameter containing:
  tensor([-1.5302,  0.9537], requires_grad=True)],
 [Parameter containing:
  tensor([[ 0.8683, -0.0678],
          [ 1.2524, -0.7526],
          [-0.5902,  0.4066]], requires_grad=True),
  Parameter containing:
  tensor([-1.5302,  0.9537], requires_grad=True)],
 [Parameter containing:
  tensor([[ 0.8683, -0.0678],
          [ 1.2524, -0.7526],
          [-0.5902,  0.4066]], requires_grad=True),
  Parameter containi