<a href="https://colab.research.google.com/github/lbrandoli/ProgramacionConcurrente/blob/main/TP_2_PARTE2_MPI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Ejercicio 1 - Hola Mundo con MPI

In [4]:
! pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.5.tar.gz (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m27.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.5-cp310-cp310-linux_x86_64.whl size=2746518 sha256=bc2bf623e585ab3ce63df6b217ff42d5f4d2883662bb3fe3eba91e22089625b8
  Stored in directory: /root/.cache/pip/wheels/18/2b/7f/c852523089e9182b45fca50ff56f49a51eeb6284fd25a66713
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.5


In [2]:
%%writefile Ejercicio2.py
from mpi4py import MPI
import numpy as np
import time

# --------------------------------------------
# Formulario
Max_tiempo_sleep =   2#@param {type: "slider", min: 1, max: 10}
Min_tiempo_sleep =   0#@param {type: "slider", min: 0, max: 10}
# --------------------------------------------

# --------------------------------------------
# Constantes de comunicacion
WORK_FLAG = 1
END_WORK_FLAG = 2
# --------------------------------------------

def main():
    comm = MPI.COMM_WORLD # Instanciamos el tipo de comunicador a utilizar.
    id = comm.Get_rank()  # Obtenemos el id como atributo del proceso que se ejecuta.

    # Utilizamos el 0 para definir al procesos Maestro, cualquier otro id sera un esclavo.
    if (id == 0) :
        init() # Llamamos funcion init para eventos que requeriremos inicialmente solo 1 vez.
        numProcesses = comm.Get_size()  # Obtenemos el numero de procesos totales ejecutados.
        numTasks = (numProcesses-1)*4 # Se setea el numero de tareas.
        workTimes = generateTasks(numTasks) # Se generan las tareas, en este caso seran
        print("El jefe crea {} horas de descanso de sus empleados:".format(workTimes.size), flush=True)
        print(workTimes, flush=True)
        initWork(comm, workTimes, numProcesses)
    else:
        doWork(comm)

def generateTasks(numTasks):
    #TODO: Cambiar la semilla del random para que se generen efectivamente diferentes numeros.
    np.random.seed(1000)
    return np.random.randint(low=Min_tiempo_sleep, high=Max_tiempo_sleep, size=numTasks)

def init():
  print()
  print("Version MPI4py utilizada: {}".format(MPI.Get_version()), flush=True)
  print()
  print( "------------------------------------", flush=True)
  print( "Sistema de trabajo Suizo:", flush=True)
  print( "------------------------------------", flush=True)
  print()

def initWork(comm, workTimes, numProcesses):
    totalWork = workTimes.size
    workcount = 0
    recvcount = 0

    print("Jefe enviando las tareas iniciales:", flush=True)
    for id in range(1, numProcesses):
        if workcount < totalWork:
            work=workTimes[workcount]
            comm.send(work, dest=id, tag=WORK_FLAG) # Envia mensaje de iniciar trabajo con el dato correspondiente del array.
            workcount += 1
            print("Jefe envia trabajo y {} hs de descanso al empleado {}.".format(work, id), flush=True)
    print( "------------------------------------", flush=True)

    # Mientras haya trabajo, se recibe el resultado de los empleados y se sigue enviando MAS trabajo.
    while (workcount < totalWork) :
        stat = MPI.Status()
        workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat) # Recivimos resultados de los empleados.
        recvcount += 1
        workerId = stat.Get_source() # Obtenemos el identificador del empleado.
        print("Jefe recibe trabajo completado {} del empleado {}.".format(workTime, workerId), flush=True)
        #send next work
        work=workTimes[workcount]
        comm.send(work, dest=workerId, tag=WORK_FLAG)
        workcount += 1
        print("Jefe envia nuevo trabajo y {} hs de descanso al empleado {}.".format(work, workerId), flush=True)

    while (recvcount < totalWork):
        stat = MPI.Status()
        workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
        recvcount += 1
        workerId = stat.Get_source()
        print("Jefe recibe trabajo completado {} del empleado {}.".format(workTime, workerId), flush=True)

    for id in range(1, numProcesses):
        comm.send(0, dest=id, tag=END_WORK_FLAG)


def doWork(comm):
    while(True):
        stat = MPI.Status() # Obtiene el estado actual del empleado.
        waitTime = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) # Obtiene lo enviado por el Jefe.
        print("Soy el empleado con id {}, toca descanzo por {} hs.".format(comm.Get_rank(), waitTime), flush=True)

        if (stat.Get_tag() == END_WORK_FLAG):
            print("Marca tarjeta el empleado {}.".format(comm.Get_rank()), flush=True)
            return
        time.sleep(waitTime)
        comm.send(waitTime, dest=0)

main()


Writing Ejercicio2.py


In [76]:
# --------------------------------------------
#@title Parámetros de ejecución { vertical-output: true }
NRO =   2#@param {type: "number"}
# --------------------------------------------

! mpirun --oversubscribe --allow-run-as-root -np $NRO python Ejercicio2.py



Version MPI4py utilizada: (3, 1)

------------------------------------
Sistema de trabajo Suizo:
------------------------------------

El jefe crea 4 horas de descanso de sus empleados:
[1 1 1 0]
Jefe enviando las tareas iniciales:
Jefe envia trabajo y 1 hs de descanso al empleado 1.
------------------------------------
Soy el empleado con id 1, toca descanzo por 1 hs.
Jefe recibe trabajo completado 1 del empleado 1.
Soy el empleado con id 1, toca descanzo por 1 hs.
Jefe envia nuevo trabajo y 1 hs de descanso al empleado 1.
Jefe recibe trabajo completado 1 del empleado 1.
Jefe envia nuevo trabajo y 1 hs de descanso al empleado 1.
Soy el empleado con id 1, toca descanzo por 1 hs.
Jefe recibe trabajo completado 1 del empleado 1.
Jefe envia nuevo trabajo y 0 hs de descanso al empleado 1.
Soy el empleado con id 1, toca descanzo por 0 hs.
Jefe recibe trabajo completado 0 del empleado 1.
Soy el empleado con id 1, toca descanzo por 0 hs.
Marca tarjeta el empleado 1.


Preguntas del ejecicio

a) ¿Que funcion realiza la instancia maestra?¿Que funcion realiza las instancias esclavas?

La instancia maestra genera una lista de tareas y las distribuye entre los procesos esclavos. Cada proceso esclavo realiza su tarea y devuelve el resultado al proceso maestro.

b) ¿Cuantas de esas instancias ejecuta la funcion main(), initWork() y doWork()?

La funcion main() se ejecuta en la instancia maestra, mientras que la funcion dowork() se ejecuta en las instancias esclavas. La funcion initWork() se llama desde la funcion main(), pero no se ejecuta en una instancia separada, sino que se llama desde la funcion main().

c)¿Como se diferencian los mensajes de trabajo o de finalizacion?

Se utilizan dos constantes de comunicacion para diferenciar los mensajes de trabajo, estan constantes son WORK_FLAG y END_WORK_FLAG.
El proceso maestro envia mensajes de trabajo a los procesos esclavos utilizando la constante WORK_FLAG. El mensaje contiene el tiempo de espera correspondiente a la tarea asignada. Los procesos esclavos envian el resultado de su tarea al proceso maestro utilizando la misma constante WORK_FLAG.
Por otro lado, el proceso maestro envia mensajes de finalizacion a los procesos esclavos utilizando la constante END_WORK_FLAG. Cuando un proceso esclavo recibe un mensaje con esta constante, sabe que debe finalizar su trabajo y terminar su ejecucion.

d)¿Como implementaria la funcion BLAS axpy() con este patron?¿Seria eficiente?Tips:Pide solo el planteo, no la implementacion.

La funcion axpy de BLAS es una operacion vectorial que calcula el producto de un vector por un escalar y lo suma a otro vector, asi que si utilizamos el patron maestro-esclavo, se podria dividir el vector en subvectores y distribuirlos entre los procesos esclavos. Cada proceso esclavo calcularia el producto de su subvector por el escalar y lo sumaria al subvector correspondiente. Luego, los resultados parciales se enviarian al proceso maestro para que los ume y obtenga el resultado final.

d)¿Que sucede cuando solo ejecuta con una sola instancia?

Cuando se ejecuta con una sola instancia el proceso maestro y el proceso esclavo se ejecutan en la misma instancia, en este caso, el proceso maestro generara una lista de tareas aleatorias y las asignara al mismo proceso que se esta ejecutando.

# Ejercicio 2 - Contar palabras

In [89]:
%%writefile mpi_tp4.py

from mpi4py import MPI
import numpy as np
from IPython.display import display
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

count = 5000
sendbuf = None

def main():
    if rank == 0:
        sendbuf = np.random.randn(size, count)
        display("Máximo de matriz calculado con metodo: ", sendbuf.max())
        initWork(comm, sendbuf, size) #En esta funcion trabaja el proceso raiz
    else:
        doWork(comm)    #Aqui trabajan todos los procesos distintos al proceso raiz

def initWork(comm, matrix, cantProcess):
    rowCount = 1    #Los procesos que lanzaremos empezaran a calcular el maximo desde la fila 1 y no desde la fila 0
    recvCount = 1   #La cantidad de procesos que esperamos tambien empieza en 1 ya que el proceso 0 no se espera
    maxArray = []   #Aqui se almacenaran los valores maximos de cada fila

    maxArray.append(max(matrix[0])) #El proceso 0 calcula el valor maximo de la fila 0

    #En este for vamos lanzando los procesos y les asignamos cual es la fila en la que tienen que encontrar el maximo
    #Empieza en el proceso 1 ya que el proceso 0 es el raiz
    for id in range(1, cantProcess):
        if rowCount < len(matrix):
            comm.send(matrix[rowCount], dest = id) #Al proceso con id <id> se le asigna la fila <rowCount> para buscar su maximo
            rowCount += 1

    #En este while vamos a esperar a que todos los procesos devuelvan el valor maximo que calcularon de cada fila
    while recvCount < cantProcess:
        stat = MPI.Status()
        maxOfRow = comm.recv(source=MPI.ANY_SOURCE, status=stat)    #En esta variable vamos a almacenar el valor que devuelve cada proceso al calcular el maximo de cada fila
        maxArray.append(maxOfRow)
        recvCount += 1

    print("Maximo de matriz calculado con MPI: ", max(maxArray))

def doWork(comm):
    stat = MPI.Status()
    row = comm.recv(source=0, status=stat) #Aqui recibimos la informacion que nos manda el proceso raiz
    comm.send(max(row), dest = 0)   #Aqui enviamos al proceso raiz el valor maximo calculado en la fila

main()

Overwriting mpi_tp4.py


In [90]:
# --------------------------------------------
# Formulario
NRO =   4#@param {type: "slider", min: 4, max: 10}
# --------------------------------------------

! mpirun --oversubscribe --allow-run-as-root -np $NRO python mpi_tp4.py

Máximo de matriz calculado con metodo:  4.449900798145721
Maximo de matriz calculado con MPI:  4.449900798145721
