# Emulated parallelism

</br>


We are interested in learning how to work *in parallel*. But learning about parallel computing is more of learning about a way to *think*. Too often, the subject gets bogged down in details about the particular syntax of a message-passing system. Or you have to deal with hardware difficulties. It's also really hard to debug parallel computers in general. This makes learning interactively much more difficult. 

But it is possible to learn how to think about parallelism without the particular details of actually making it work. Instead, we can think about what a general computer would know about a parallel problem and what it can do about it. In general, this means the following:

* Individual processors require some kind of identifying address. 


* Data is stored on each process locally without reference to data on other processors. 


* Data needs to be transferable between processors without interfering with current local data. 


* Therefore, local processors require at least two (maybe three) different kinds of memory. 

    - Obviously, the first is a place to store local data. 
    - Second, we need a *buffer* that can receive messages from other processors. 
    - In general, buffer sizes need to be large enough to handle messages coming from all relevant processors.
    - Possibly third, we might need some temporary workspace to compute intermediate calculations depending if we need to keep old data or not.
   
   
Working in parallel requires breaking a problem up into manageable chunks, distributing the relevant information to different independent processors, sending a command to all the processors, and having them compute the desired result after figuring out what to do between themselves.

For example, with a matrix, it is a good idea to put different matrix blocks on different processor cores. But in a parallel computer, the different cores would know nothing of the other core's data without communicating. 

The problem in Python is that it's hard to hide the data in different data structures but not hide it from the user. 

## Faux-`Local` arrays

On a parallel computer, if you simply ask for data contained on another core, you will either get an error message or, worse, you will get no warning but a bad error nonetheless. 

To simulate the idea that you can't interact with non-local data, I've made a derived data type. It acts like `numpy` arrays or scalar variables. But the data type "knows" what processor it's contained in. It will complain if it tries an operation with another processor. 

In [6]:
import numpy as np

#from eMPI import Local

In [7]:
class NotLocalData(Exception):
    pass

class Local(np.ndarray):

    def __new__(cls, input_array, proc):
        obj = np.asarray(input_array).view(cls)
        obj.proc = proc
        return obj
        
    def __array_finalize__(self, obj):
        if obj is None: return
        self.your_new_attr = getattr(obj, 'your_new_attr', None)
    
    def __str__(self):
        return (np.ndarray.__str__(self))
        if s[0] not in ('[','(','{'):
            return f'Local({s}, proc={self.proc})'
        return f"{s}, proc={self.proc}]"
        
    def __repr__(self):
        s = (np.ndarray.__repr__(self))[:-1]
        return f"{s}, proc={self.proc})"
    
    def __getitem__(self,args):
        args = self.check(args)
        return Local(np.ndarray.__getitem__(self, args), self.proc)
    
    def __setitem__(self,args,value):
        args  = self.check(args)
        value = self.check(value)
        return Local(np.ndarray.__setitem__(self, args,value), self.proc)
    
    @property
    def T(self):
        return Local(np.ndarray.transpose(self),self.proc)
    
    def transpose(self):
        return self.T
    
    def trace(self):
        return Local(np.ndarray.trace(self),self.proc)
    
    def __neg__(self):
        return Local(np.ndarray.__neg__(self),self.proc)
    
    def __pos__(self):
        return Local(np.ndarray.__pos__(self),self.proc)
    
    def __abs__(self):
        return Local(np.ndarray.__abs__(self),self.proc)
    
    def __add__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__add__(self,other),self.proc)
    
    def __sub__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__sub__(self,other),self.proc)
    
    def __mul__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__mul__(self,other),self.proc)
    
    def __truediv__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__truediv__(self,other),self.proc)
    
    def __floordiv__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__floordiv__(self,other),self.proc)
    
    def __mod__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__mod__(self,other),self.proc)
    
    def __pow__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__pow__(self,other),self.proc)
    
    def __matmul__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__matmul__(self,other),self.proc)
    
    def __eq__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__eq__(self,other),self.proc)
    
    def __ge__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__ge__(self,other),self.proc)
    
    def __le__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__le__(self,other),self.proc)
    
    def __gt__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__gt__(self,other),self.proc)
    
    def __lt__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__lt__(self,other),self.proc)
    
    def __radd__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__radd__(self,other),self.proc)
    
    def __rsub__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rsub__(self,other),self.proc)
    
    def __rmul__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rmul__(self,other),self.proc)
    
    def __rtruediv__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rtruediv__(self,other),self.proc)
    
    def __rfloordiv__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rfloordiv__(self,other),self.proc)
    
    def __rmod__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rmod__(self,other),self.proc)
    
    def __rpow__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rpow__(self,other),self.proc)
    
    def __req__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__req__(self,other),self.proc)
    
    def __ge__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rge__(self,other),self.proc)
    
    def __rle__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rle__(self,other),self.proc)
    
    def __rgt__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rgt__(self,other),self.proc)
    
    def __rlt__(self,other):
        other = self.check(other)
        return Local(np.ndarray.__rlt__(self,other),self.proc)
    
    def check(self,other):
        if other is None:
            return other
        
        if isinstance(other,(int, float, complex)):
            return Local(other,self.proc)
        
        if isinstance(other, slice):
            for e in [other.start, other.stop, other.step]:
                self.check(e)
            return other
        
        if isinstance(other, (list, tuple)):
            return type(other)([self.check(e) for e in other])
        
        if not isinstance(other, Local):
            raise NotLocalData('Data is not local to any core.')
        
        if self.proc != other.proc:
            raise NotLocalData('The data is not local.')
        
        return other


class Core():
    
    def __init__(self,proc):
        self.proc = proc 
        self.memory = {}
        self.buffer = {}
    
    def __str__(self):
        return "Core("+str(self.proc)+')'
    
    def __repr__(self):
        return str(self)
    
    def __getitem__(self,item):
        return self.memory[item]
    
    def __setitem__(self,item,value):
        self.memory[item] = Local(value,self.proc)
        
    def send(self,other,data):
        other.buffer[self.proc,data] = Local(self[data],other.proc)
        
    def receive(self,other,data,out=None):
        if out == None: out = data
        self[out] = self.buffer[other.proc,data]
        del self.buffer[other.proc,data]


It works for scalar data:

In [7]:
x = Local(3,0) #second argument takes the processor assignment
y = Local(4,1)
print(x)
print(y)

3
4


We can see more if we ask nicely.

In [8]:
print(x.__repr__()) #representation method
print(y.__repr__())

Local(3, proc=0)
Local(4, proc=1)


In [9]:
print(x**2)
print(y**2)

9
16


These are in different "locations":

In [10]:
x**2 + y**2

NotLocalData: ignored

It works for `np.ndarray` just the same

In [11]:
x = Local([1,2,3,4],0)
y = Local([5,6,7,8],0)
x+y

Local([ 6,  8, 10, 12], proc=0)

In [12]:
z = Local([9,10,11,12],1)
z

Local([ 9, 10, 11, 12], proc=1)

But not this:

In [13]:
x + z #they are in different processor

NotLocalData: ignored

## faux-`Cores`

We have a Python object that just knows about "local" data. But we don't have anything like a processor core that can hold on to multilple data at that same time. 

These objects have places to put data with names. 

In [16]:
# from eMPI import Core

Let's make two cores and put some data in their memory.

In [17]:
core0 = Core(0)
core0['x'] = [5,6]
core0['A'] = [[1,2],[3,4]]

core1 = Core(1)
core1['x'] = [7,8]
core1['A'] = [[1,2],[3,4]]

In [18]:
print(core0['x'])
print(core1['x'])

[5 6]
[7 8]


In [22]:
core0['x'] #representation

Local([5, 6], proc=0)

In [27]:
core0['A'] @ core0['x'] #[(1*5)+(6*2), (3*5)+(4*6)] which is a matrix multiplication

Local([17, 39], proc=0)

In [28]:
core1['A'] @ core1['x'] 

Local([23, 53], proc=1)

I'm going to make a `machine` that can hold a bunch of `Cores`. 

In [15]:
nproc = 8
machine = {p: Core(p) for p in range(nproc)}

As a the user, you can technically see everything at the same time.

In [16]:
machine

{0: Core(0),
 1: Core(1),
 2: Core(2),
 3: Core(3),
 4: Core(4),
 5: Core(5),
 6: Core(6),
 7: Core(7)}

Now let's put some local data on the `machine`. 

In [18]:
nlocal = 10
for proc in machine:
    machine[proc]['A'] = np.random.randint(0,9,nlocal)
    machine[proc]['B'] = np.random.randint(0,9,nlocal)

In [61]:
machine[0]['A']

Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=0)

In [62]:
machine[1]['A']

Local([5, 1, 7, 4, 6, 1, 3, 2, 8, 6], proc=1)

## Memory

In this toy example, we can see everything in each local memory.

In [63]:
machine[0].memory

{'A': Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=0),
 'B': Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=0)}

In [64]:
machine[1].memory

{'A': Local([5, 1, 7, 4, 6, 1, 3, 2, 8, 6], proc=1),
 'B': Local([7, 6, 4, 1, 7, 6, 5, 7, 5, 7], proc=1)}

In [48]:
machine[1]['pi']=3.1415926

## `send`

We need a way for processors to send messages to each other.

If the real world, we'll have to contend with the "Two Generals Problem:"

https://en.wikipedia.org/wiki/Two_Generals%27_Problem

But here we'll make it simpler. Any array can send a message to any other at any time. This is a toy version of *nonblocking communication*. We can just send it right away, rather than copying it out of memory first.  

In [69]:
machine[0].send(machine[1],'A') #sending machine 0 memory into machine 1
machine[0].send(machine[1],'B')
machine[2].send(machine[1],'A') #sending machine 0 memory into machine 1
machine[2].send(machine[1],'B')

We shouldn't see anything happen in `Core(1)` yet.

In [67]:
machine[1].memory

{'A': Local([5, 1, 7, 4, 6, 1, 3, 2, 8, 6], proc=1),
 'B': Local([7, 6, 4, 1, 7, 6, 5, 7, 5, 7], proc=1)}

## Buffering 

In this case, we've chosen to do a *blocking receive*. That meanss we've put the message into a *buffer*, which is a bit of extra memeory where things can sit and wait until the processor is ready.

In [70]:
machine[1].buffer

{(0, 'A'): Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=1),
 (0, 'B'): Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1),
 (2, 'A'): Local([8, 7, 0, 4, 8, 0, 0, 3, 1, 3], proc=1),
 (2, 'B'): Local([3, 7, 4, 2, 5, 4, 8, 2, 4, 8], proc=1)}

## `receive`

Now we have a method to get the data out of the buffer.

In [71]:
machine[1].receive(machine[0],'A',out='tmp')

Now we can see the transfer is done

In [72]:
machine[0]['A']

Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=0)

In [73]:
machine[1]['tmp']

Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=1)

The buffer is cleared out

In [74]:
machine[1].buffer

{(0, 'B'): Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1),
 (2, 'A'): Local([8, 7, 0, 4, 8, 0, 0, 3, 1, 3], proc=1),
 (2, 'B'): Local([3, 7, 4, 2, 5, 4, 8, 2, 4, 8], proc=1)}

If we don't put it into a `tmp` array, then it will just go into the `A` array and clobber it. 

In [75]:
machine[1]['B_copy'] = machine[1]['B']

In [76]:
machine[1].receive(machine[0],'B')

In [77]:
machine[1].buffer

{(2, 'A'): Local([8, 7, 0, 4, 8, 0, 0, 3, 1, 3], proc=1),
 (2, 'B'): Local([3, 7, 4, 2, 5, 4, 8, 2, 4, 8], proc=1)}

In [78]:
machine[0].memory

{'A': Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=0),
 'B': Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=0)}

In [79]:
machine[1].memory

{'A': Local([5, 1, 7, 4, 6, 1, 3, 2, 8, 6], proc=1),
 'B': Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1),
 'tmp': Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=1),
 'B_copy': Local([7, 6, 4, 1, 7, 6, 5, 7, 5, 7], proc=1)}

Now we can safey do local operations that will irreversibly alter the `A` array.

In [80]:
machine[1]['A'] += machine[1]['tmp']

The `tmp` memory is still around.

In [81]:
machine[1].memory

{'A': Local([ 9,  6,  8, 11,  6,  2, 10,  4,  8, 13], proc=1),
 'B': Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1),
 'tmp': Local([4, 5, 1, 7, 0, 1, 7, 2, 0, 7], proc=1),
 'B_copy': Local([7, 6, 4, 1, 7, 6, 5, 7, 5, 7], proc=1)}

But it might be useful to keep it as a future work array, or maybe get rid of it.

In [82]:
del machine[1].memory['tmp']
del machine[1].memory['B_copy']
machine[1].memory

{'A': Local([ 9,  6,  8, 11,  6,  2, 10,  4,  8, 13], proc=1),
 'B': Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1)}

### TASK 0:

Implement a system that will compute the distributed dot-product $x = A\cdot B$ across the cores. It should return the result to all of the cores. Never in the computation should you rely on global data. Everything has to happen with the `send` and `receive` in terms of local data. 

In [114]:
for p in machine:
  print('A:', repr(machine[p]['A']))
  print('B:', repr(machine[p]['B']))
  print('A@B:', repr(machine[p]['A']@machine[p]['B']))
  print(30*'-')

A: Local([28, 40,  2, 42,  0,  0, 14, 12,  0, 14], proc=0)
B: Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=0)
A@B: Local(900, proc=0)
------------------------------
A: Local([ 9,  6,  8, 11,  6,  2, 10,  4,  8, 13], proc=1)
B: Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1)
A@B: Local(307, proc=1)
------------------------------
A: Local([8, 7, 0, 4, 8, 0, 0, 3, 1, 3], proc=2)
B: Local([3, 7, 4, 2, 5, 4, 8, 2, 4, 8], proc=2)
A@B: Local(155, proc=2)
------------------------------
A: Local([1, 7, 4, 8, 6, 4, 0, 1, 6, 6], proc=3)
B: Local([5, 8, 2, 7, 1, 4, 3, 3, 8, 1], proc=3)
A@B: Local(204, proc=3)
------------------------------
A: Local([6, 7, 2, 1, 8, 4, 4, 3, 6, 8], proc=4)
B: Local([4, 2, 8, 7, 3, 7, 0, 2, 1, 5], proc=4)
A@B: Local(165, proc=4)
------------------------------
A: Local([0, 0, 0, 8, 7, 6, 8, 7, 7, 2], proc=5)
B: Local([1, 8, 0, 2, 2, 2, 0, 6, 8, 5], proc=5)
A@B: Local(150, proc=5)
------------------------------
A: Local([0, 5, 5, 3, 0, 8, 0, 8, 3, 1], proc=6)
B: Local([2, 

In [113]:
A = [core['A'] for core in machine.values()]
B = [core['B'] for core in machine.values()]
print(A)
print(B)

[Local([28, 40,  2, 42,  0,  0, 14, 12,  0, 14], proc=0), Local([ 9,  6,  8, 11,  6,  2, 10,  4,  8, 13], proc=1), Local([8, 7, 0, 4, 8, 0, 0, 3, 1, 3], proc=2), Local([1, 7, 4, 8, 6, 4, 0, 1, 6, 6], proc=3), Local([6, 7, 2, 1, 8, 4, 4, 3, 6, 8], proc=4), Local([0, 0, 0, 8, 7, 6, 8, 7, 7, 2], proc=5), Local([0, 5, 5, 3, 0, 8, 0, 8, 3, 1], proc=6), Local([7, 4, 5, 0, 6, 3, 4, 7, 6, 1], proc=7)]
[Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=0), Local([7, 8, 2, 6, 6, 0, 2, 6, 1, 2], proc=1), Local([3, 7, 4, 2, 5, 4, 8, 2, 4, 8], proc=2), Local([5, 8, 2, 7, 1, 4, 3, 3, 8, 1], proc=3), Local([4, 2, 8, 7, 3, 7, 0, 2, 1, 5], proc=4), Local([1, 8, 0, 2, 2, 2, 0, 6, 8, 5], proc=5), Local([2, 0, 1, 6, 1, 6, 8, 5, 5, 1], proc=6), Local([2, 8, 6, 8, 8, 4, 1, 3, 7, 3], proc=7)]


In [104]:
def dumbProduct(A, B):
    A.send(B,'A')
    B.buffer
    B.receive(A,'A',out='Atmp')
    # B['Atmp'] *= B['B']
    B['dotprod'] = sum(B['Atmp']*B['B'])
    del B.memory['Atmp']
    return B['dotprod']

dumbProduct(machine[0], machine[1])

Local(900, proc=1)