In [2]:
import os
import numpy as np

try:
    os.makedirs("fake_files")
except:
    pass
else:
    for idx in range(100_000):
        fname = "fake_files/file_{}.npy".format(idx)
        if not os.path.exists(fname):
            np.save(fname, np.random.normal(size=100000))  

# Diseño de software para cómputo científico

---
## Unidad 4: Optimización, paralelismo, concurrencia y cómputo distribuido en alto nivel.

**NO BORRAR OUTPUT**

## Concurrencia 

> se refiere a la habilidad de distintas partes de un programa, algoritmo, o problema de ser ejecutado en desorden o en orden parcial, sin afectar el resultado final. 

> Los cálculos (operaciones) pueden ser ejecutados en múltiples procesadores, o ejecutados en procesadores separados físicamente o virtualmente en distintos hilos de ejecución. 

![image.png](attachment:image.png)

## Hilos vs Procesos

![image.png](attachment:image.png)

- **Hilos:**
    - Se ejecutan en el mismo proceso.
    - Comparten memoria.
- **Procesos:**
    - **NO** comparten memoria.
    - Pueden ejecutarse en diferentes procesadores.

## Qué es un sistema operativo?

Es una **colección** de programas que funciona como:

- **Una máquina extendida** (top-down): Es una abstracción sobre el hardware.
- **Un manejador de recursos** (bottom-up): Un proveedor de servicios solicitados por el software.

![image.png](attachment:image.png)

## Si nos centramos en el kernel

Linux tiene estos subsistemas

![image.png](attachment:image.png)

https://www.makelinux.net/kernel_map/

## Llamadas al sistema

- Es el mecanismo usado por una aplicación para solicitar un servicio al sistema operativo.

![image.png](attachment:image.png)

## Interrupción

Es una señal recibida por el procesador de una computadora, para indicarle que debe «interrumpir» el curso de ejecución actual y pasar a ejecutar código específico para tratar esta situación.

![image.png](attachment:image.png)

## Python - Multithreaded Programming

Es **similar** a ejecutar varios programas diferentes al mismo tiempo, pero con los siguientes beneficios:

- Comparten el mismo espacio de datos con el subproceso principal y, por lo tanto, pueden compartir información o comunicarse entre ellos fácilmente.
- No requieren mucha sobrecarga de memoria; son más baratos que los procesos.

**Problema:** EL &Ç@$!! GIL

## Global interpreter lock (GIL)

- Es un mecanismo utilizado en los intérpretes de lenguaje de computadora para sincronizar la ejecución de threads para que solo **un** subproceso nativo pueda ejecutarse a la vez.  

**Pero... WHY?**

- Mayor velocidad de los programas de un solo subproceso (no es necesario adquirir o liberar bloqueos en todas las estructuras de datos por separado).
- Fácil integración de bibliotecas C que generalmente no son seguras para subprocesos,
- Facilidad de implementación.


## Python `threading`

Vamos con un ejemplo secuencial

In [2]:
import numpy as np

In [3]:
%%time

def sts(name, nums, verbose=True):
    mean, std = np.mean(nums), np.std(nums)
    if verbose:
        print("Name {}".format(name))
        print("\t Mean {}".format(mean))
        print("\t Std {}".format(std))
    
for idx in range(10):
    name = "F{}".format(idx)
    nums = np.random.normal(size=1000)
    sts(name, nums, False)

CPU times: user 2.56 ms, sys: 159 µs, total: 2.72 ms
Wall time: 6.23 ms


## Python `threading`

In [4]:
import numpy as np
import threading

class MyThread(threading.Thread):
    def __init__(self, name, nums):
        super().__init__()
        self.name = name
        self.nums = nums
        
    def run(self):
        print("Starting ", self.name)
        self.mean = np.mean(self.nums)
        self.std = np.std(self.nums)

## Python `threading`

In [5]:
%%time
threads = []
for tidx in range(10):
    thread = MyThread(
        name="T{}".format(tidx),
        nums=np.random.normal(size=1000))
    
    thread.start()
    threads.append(thread)
    
print("-" * 10)
for thread in threads:
    thread.join()
    

Starting  T0
Starting  T1
Starting  T2
Starting  T3
Starting  T4
Starting  T5
Starting  T6Starting 
 Starting T7
 T8
Starting  T9
----------
CPU times: user 11.7 ms, sys: 180 µs, total: 11.9 ms
Wall time: 24.7 ms


### Threading que si sirve

In [6]:
%%time
def open_file(idx):
    fname = "fake_files/file_{}.npy".format(idx)
    with open(fname, "rb") as fp:
        nums = np.load(fp)

    
for idx in range(12_000):
    open_file(idx)

CPU times: user 5.79 s, sys: 5.14 s, total: 10.9 s
Wall time: 43.2 s


In [7]:
threads = []

def open_file(idx):
    fname = "fake_files/file_{}.npy".format(idx)
    with open(fname, "rb") as fp:
        nums = np.load(fp)
        
for idx in range(12_000):
    thread = threading.Thread(target=open_file, kwargs={"idx": idx})
    threads.append(thread)

In [8]:
%%time

for thread in threads:
    thread.start()
    
for thread in threads:
    thread.join()

CPU times: user 9.36 s, sys: 7.41 s, total: 16.8 s
Wall time: 12.2 s


## Multiprocessing

Es un paquete que admite la generación de procesos utilizando una API similar al módulo de multithreading.

![image.png](attachment:image.png)

## Multiprocessing

In [9]:
import numpy as np
import multiprocessing as mp

class MyProc(mp.Process):
    def __init__(self, name, nums):
        super().__init__()
        self.name = name
        self.nums = nums
        
    def run(self):
        print("Starting ", self.name)
        self.mean = np.mean(self.nums)
        self.std = np.std(self.nums)

## Multiprocessing

In [10]:
procs = []
for pidx in range(4):
    proc = MyProc(
        name="P{}".format(pidx),
        nums=np.random.normal(size=(100, 10)))
    proc.start()
    procs.append(proc)
    
print("-" * 10)
for proc in procs:
    proc.join()
    print("\t Mean {}".format(proc.mean))
    print("\t Std {}".format(proc.std))

Starting  P0
Starting  P1
----------


AttributeError: 'MyProc' object has no attribute 'mean'

Starting  P3
Starting  P2


## Multiprocessing comunication

In [25]:
import numpy as np
import multiprocessing as mp

class MyProc(mp.Process):
    def __init__(self, name, nums):
        super().__init__()
        self.name = name
        self.nums = nums
        self._q = mp.Queue()
        
    def run(self):
        print("Starting ", self.name)
        self._q.put_nowait({"mean": np.mean(self.nums), 
                     "std": np.std(self.nums)})
    
    def communicate(self):
        if not hasattr(self, "_mean"):
            data = self._q.get_nowait()
            self._mean = data["mean"]
            self._std = data["std"]
    
    @property
    def mean(self):
        self.communicate()
        return self._mean
    
    @property
    def std(self):
        self.communicate()
        return self._std       

## Multiprocessing comunication

In [26]:
procs = []
for pidx in range(8):
    proc = MyProc(
        name="P{}".format(pidx),
        nums=np.random.normal(size=(100, 10)))
    proc.start()
    procs.append(proc)
    
print("-" * 10)
for proc in procs:
    proc.join()
    print("Proc {}".format(proc.name))
    print("\t Mean {}".format(proc.mean))
    print("\t Std {}".format(proc.std))

Starting  P0
Starting  P1
Starting  P2
Starting  P3
Starting  P4
Starting  P5
Starting  P6
Starting  P7
----------
Proc P0
	 Mean 0.016363281686373087
	 Std 1.0004939075598476
Proc P1
	 Mean 0.016228688016542918
	 Std 1.0284006618407977
Proc P2
	 Mean 0.010223332853830584
	 Std 1.0161831654875304
Proc P3
	 Mean -0.07882693242000212
	 Std 1.018865734548018
Proc P4
	 Mean 0.024512645702439183
	 Std 1.0107371157176535
Proc P5
	 Mean -0.03671805151183993
	 Std 1.0250561164605405
Proc P6
	 Mean -0.018028344913451247
	 Std 0.9130342443337405
Proc P7
	 Mean 0.007224276182464615
	 Std 0.9916168553235424


## Joblib

![image-2.png](attachment:image-2.png)

Ayudante vergonzosamente simple para facilitar la escritura de código paralelo legible y depurarlo rápidamente.

```bash
$ pip install joblib
```

## Joblib - Multiprocessing

In [11]:
%%time

import joblib # import Parallel, delayed

def open_file(idx):
    fname = "fake_files/file_{}.npy".format(idx)
    with open(fname, "rb") as fp:
        nums = np.load(fp)
    return nums


with joblib.Parallel(n_jobs=-1) as parallel:
    results = parallel(
        joblib.delayed(open_file)(idx) for idx in range(10_000))

CPU times: user 5.64 s, sys: 5.33 s, total: 11 s
Wall time: 29.6 s


## Joblib - Multi-Threading

In [12]:
%%time

import joblib # import Parallel, delayed

def open_file(idx):
    fname = "fake_files/file_{}.npy".format(idx)
    with open(fname, "rb") as fp:
        nums = np.load(fp)
    return nums


with joblib.Parallel(n_jobs=1, prefer="threads") as parallel:
    results = parallel(
        joblib.delayed(open_file)(idx) for idx in range(10_000))

CPU times: user 6.39 s, sys: 9.06 s, total: 15.4 s
Wall time: 1min 36s


## Referencias

- https://es.wikipedia.org/wiki/Concurrencia_(inform%C3%A1tica)
- http://materias.fi.uba.ar/7508/MOS4/Operating.Systems.4th.Edi.pdf
- https://es.wikipedia.org/wiki/Interrupci%C3%B3n
- https://es.wikipedia.org/wiki/Llamada_al_sistema
- https://www.tutorialspoint.com/python/python_multithreading.htm
- https://docs.python.org/2/library/multiprocessing.html
- https://www.tutorialspoint.com/concurrency_in_python/concurrency_in_python_multiprocessing.htm
- https://joblib.readthedocs.io/en/latest/