In [1]:
%pip install matplotlib

Note: you may need to restart the kernel to use updated packages.


In [69]:
%%file test.py
from mpi4py import MPI
import numpy as np

# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Gather
# Create a 2D array on each process
sendbuf = np.full((2, 2), rank, dtype='i')
print(sendbuf)

# Initialize the receive buffer on rank 0
recvbuf = None
if rank == 0:
    recvbuf = np.empty([size, 2, 2], dtype='i')

# Gather the 2D arrays from all processes to the recvbuf on rank 0
comm.Gather(sendbuf, recvbuf, root=0)

# Perform element-wise addition on rank 0
if rank == 0:
    sum_array = np.sum(recvbuf, axis=0)
    print("Sum array on rank 0:", sum_array)

Overwriting test.py


In [70]:
!mpirun -n 3 python test.py

[[1 1]
 [1 1]]
[[2 2]
 [2 2]]
[[0 0]
 [0 0]]
Sum array on rank 0: [[3 3]
 [3 3]]


In [5]:
%matplotlib inline

In [6]:
%%file gridding.py

from mpi4py import MPI
import numpy as np
import matplotlib.pyplot as plt
import sys

# Function to transform coordinates u and v to wavelengths
def transform_coordinates(data):
    c = 299792458.0  # Speed of light in meters per second
    data[:,0] = data[:,0]*data[:,6]/c
    data[:,1] = data[:,1]*data[:,6]/c
    return data

# Function to perform the gridding process
def grid_data(local_data, N, delta_x_rad):
    local_F_r = np.zeros((N, N), dtype=np.float64)
    local_F_i = np.zeros((N, N), dtype=np.float64)
    local_W_t = np.zeros((N, N), dtype=np.float64)
    
    delta_u_lambda = 1/(delta_x_rad*N)

    i = np.clip(np.round(local_data[:,0]/delta_u_lambda + N/2).astype(int), 0, N-1)
    
    j = np.clip(np.round(local_data[:,1]/delta_u_lambda + N/2).astype(int), 0, N-1)
    
    # acumulamos todos los valores que hagan match en la pos i,j para cada i,j calculado anteriormente
    np.add.at(local_F_r, (i, j), local_data[:,3] * local_data[:,5])
    np.add.at(local_F_i, (i, j), local_data[:,4] * local_data[:,5])
    np.add.at(local_W_t, (i, j), local_data[:,5])
    
    return local_F_r, local_F_i, local_W_t

# Parse command-line arguments
if len(sys.argv) != 7:
    print("Usage: mpirun -n num_processes python gridding.py -i data_file -d deltax -N image_size")
    sys.exit(1)

if sys.argv[1] != '-i':
    print("Error: You must specify the data file using -i")
    sys.exit(1)

if sys.argv[3] != '-d':
    print("Error: You must specify the value of deltax using -d")
    sys.exit(1)

if sys.argv[5] != '-N':
    print("Error: You must specify the image size using -N")
    sys.exit(1)

input_file = sys.argv[2]
name_file = "./"+sys.argv[2]
delta_x_arcsec = float(sys.argv[4])
delta_x_rad = (np.pi/(180*3600))*delta_x_arcsec
N = int(sys.argv[6])

# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# SCATTER PROCESS
data = None
num_rows = None
num_cols = None
if rank == 0:
    data = np.loadtxt("./hltau_completo_uv.csv", delimiter=",")
    num_rows, num_cols = data.shape
    
# Broadcast 'data.shape' desde el proceso 0 a todos los demás procesos
num_rows = comm.bcast(num_rows, root=0)
num_cols = comm.bcast(num_cols, root=0)
    
# Calcula cuántas filas se asignarán a cada proceso
num_rows_per_process = num_rows // size

# Divide las filas de 'data' en partes iguales entre los procesos
local_data = np.empty([num_rows_per_process,num_cols], dtype='float64')
comm.Scatter(data, local_data, root=0)

# GRIDDING PROCESS
local_F_r, local_F_i, local_W_t = grid_data(local_data, N, delta_x_rad)

# Espera a que todos los procesos terminen
comm.Barrier()

# GATHER PROCESS
# Initialize the receive buffers
gathered_M_real = None
gathered_M_imaginaria = None
gathered_M_pesos = None

if rank == 0:
    gathered_M_real = np.empty([size, N, N], dtype = np.float64)
    gathered_M_imaginaria = np.empty([size, N, N], dtype = np.float64)
    gathered_M_pesos = np.empty([size, N, N], dtype = np.float64)

# Gather the 2D arrays from all processes to the recvbuf on rank 0
comm.Gather(local_F_r, gathered_M_real, root=0)
comm.Gather(local_F_i, gathered_M_imaginaria, root=0)
comm.Gather(local_W_t, gathered_M_pesos, root=0)

# Espera a que todos los procesos terminen
comm.Barrier()

# The root process (rank 0) join partials results
if rank == 0:
    
    m_r = np.sum(gathered_M_real, axis=0)
    m_i = np.sum(gathered_M_imaginaria, axis=0)
    m_p = np.sum(gathered_M_pesos, axis=0)
    
    # Normalizamos
    divisor_nonzero = np.where(m_p != 0, m_p, 1)
    m_r = m_r / divisor_nonzero
    m_i = m_i / divisor_nonzero
    
    grid = m_r + 1j * m_i
    dirty_image = np.fft.ifftshift(np.fft.fft2(np.fft.fftshift(grid)))
    
    fig , ax = plt.subplots(1,2)
    # Subtrama izquierda: Datos Grideados
    ax[0].imshow(np.abs(grid), origin="lower", cmap="hot")
    ax[0].set_title("Datos Grideados")

    # Subtrama derecha: Dirty Image
    ax[1].imshow(dirty_image.real, origin="lower", cmap="hot")
    ax[1].set_title("Dirty Image")

    # Ajusta el diseño del gráfico
    plt.tight_layout()

    # Muestra el gráfico en el cuaderno de Jupyter
    plt.show()
    
# Finaliza MPI
MPI.Finalize()

Overwriting gridding.py


In [7]:
!mpirun -n 8 python gridding.py -i hltau_completo_uv.csv -d 0.003 -N 2048

Figure(640x480)
