In [1]:
# import sys
# !{sys.executable} -m pip install dask==2024.6.0 distributed==2024.6.0

In [1]:
import execo
import execo_g5k
import execo_engine
import time
import os
from dask.distributed import Client

# this will be /home/lmascare
HOME_DIR = os.path.expanduser("~")

PATH_TO_SIF_FILE = HOME_DIR + "/bench/docker/images/bench.sif"

SIMULATION_INI = HOME_DIR + "/bench/simulation/setup.ini"
PDI_DEISA_YML = HOME_DIR + "/bench/simulation/io_deisa.yml"
SIM_EXECUTABLE = HOME_DIR + "/bench/simulation/build/main"
DEISA_PATH = HOME_DIR + "/bench/deisa/"

ANALYTICS_PY_FILE = HOME_DIR + "/bench/in-situ/bench_deisa.py"

# must be in the same directory as the script/notebook
SCHEDULER_FILE = HOME_DIR + "/bench/experiment/scheduler.json"

OUTPUT_DIR = HOME_DIR + "/bench/experiment/"

# the total number of nodes needs to be THE SAME as the total number of workers passed in the analytics script
DASK_WORKERS_PER_NODE = 2
TOTAL_DASK_WORKERS = 2


In [2]:
""" 
Get configs from a .ini file.

Arguments:
    config_file (str): Path to the .ini configuration file.
"""
def get_configs(config_file):
    import configparser

    config = configparser.ConfigParser()
    config.read(config_file)

    # Assuming the .ini file has sections and keys
    # Example: [section_name] key_name = value
    configs = {}
    for section in config.sections():
        for key, value in config.items(section):
            configs[key] = value

    return configs

In [3]:
configs = get_configs(SIMULATION_INI)

nb_reserved_nodes = 1

jobs = execo_g5k.oarsub(
    [
        (
            execo_g5k.OarSubmission(f"nodes={nb_reserved_nodes}", walltime=60*60),
            "grenoble",
        )
    ]
)

job_id, site = jobs[0]
print(f"Job {job_id} reserved on site {site}")

nodes = execo_g5k.oar.get_oar_job_nodes(job_id, site)
head_node, nodes = nodes[0], nodes[1:]

print(f"Head node: {head_node}")
print(f"Other nodes: {nodes}")





cores_per_node = execo_g5k.get_host_attributes(head_node)["architecture"]["nb_cores"]
total_simulation_cores = cores_per_node * len(nodes)

print(f"Total simulation cores: {total_simulation_cores}")

Job 2476997 reserved on site grenoble
Head node: Host('dahu-12.grenoble.grid5000.fr')
Other nodes: []
Total simulation cores: 0


In [4]:
import socket

head_node_ip = socket.gethostbyname(head_node.address)

nodes_ips = []
for node in nodes:
    nodes_ips.append(socket.gethostbyname(node.address))
print(f"Head node IP: {head_node_ip}")
print(f"Other nodes IPs: {nodes_ips}")


Head node IP: 172.16.20.12
Other nodes IPs: []


In [5]:
##########################################################
#                   RUNNING SCHEDULER
##########################################################
# Sending the commands for head node

if os.path.exists(SCHEDULER_FILE):
    os.remove(SCHEDULER_FILE)

#redirecting the output to a file
scheduler_cmd = (
    f"dask scheduler "
    # f"--host {head_node.address} "
    f"--scheduler-file {SCHEDULER_FILE} "
    f"> {OUTPUT_DIR}scheduler.e 2>&1; "
    "sync"
)
print(scheduler_cmd)
scheduler_process = execo.SshProcess(
    f'singularity exec {PATH_TO_SIF_FILE} bash -c "{scheduler_cmd}"',
    head_node,
)
scheduler_process.start()

# Wait for the scheduler to start by checking the scheduler file
while not os.path.exists(SCHEDULER_FILE):
    time.sleep(1)

dask scheduler --scheduler-file /home/lmascare/bench/experiment/scheduler.json > /home/lmascare/bench/experiment/scheduler.e 2>&1; sync


In [6]:
##########################################################
#                   RUNNING WORKERS
##########################################################

worker_cmd = (
    f"dask worker "
    # f"tcp://{head_node_ip}:8786 "
    # f"--dashboard-address {head_node.address}:8787 "
    f"--nworkers {DASK_WORKERS_PER_NODE} "
    "--nthreads 1 "
    "--local-directory /tmp "
    f"--scheduler-file {SCHEDULER_FILE} "
    f"> {OUTPUT_DIR}worker.e 2>&1"
)

# for node in nodes:
    # redirecting the output to a file
worker_process = execo.SshProcess(
    f'singularity exec {PATH_TO_SIF_FILE} bash -c "{worker_cmd}"',
    head_node,
)
worker_process.start()

SshProcess('singularity exec /home/lmascare/bench/docker/images/bench.sif bash -c "dask worker --nworkers 2 --nthreads 1 --local-directory /tmp --scheduler-file /home/lmascare/bench/experiment/scheduler.json > /home/lmascare/bench/experiment/worker.e 2>&1"', Host('dahu-12.grenoble.grid5000.fr'))

In [7]:
##########################################################
#                   RUNNING ANALYTICS
##########################################################

# python3 $OPWD/in-situ/bench_deisa.py $NDASKWORKERS $SCHEFILE 2>analytics.e&

py_cmd = (
    'export PYTHONPATH=/home/lmascare/bench/deisa/:$PYTHONPATH; '
    f'python3 {ANALYTICS_PY_FILE} {TOTAL_DASK_WORKERS} {SCHEDULER_FILE} > {OUTPUT_DIR}analytics.e 2>&1'
)
analytics_cmd = (
    # 'hostname'
    f'singularity exec {PATH_TO_SIF_FILE} bash -c "{py_cmd}"'

)
print("Analytics command:")
print(analytics_cmd)

analytics_process = execo.SshProcess(
    analytics_cmd,
    head_node,
)
analytics_process.start()
# analytics_process.wait()

Analytics command:
singularity exec /home/lmascare/bench/docker/images/bench.sif bash -c "export PYTHONPATH=/home/lmascare/bench/deisa/:$PYTHONPATH; python3 /home/lmascare/bench/in-situ/bench_deisa.py 2 /home/lmascare/bench/experiment/scheduler.json > /home/lmascare/bench/experiment/analytics.e 2>&1"


SshProcess('singularity exec /home/lmascare/bench/docker/images/bench.sif bash -c "export PYTHONPATH=/home/lmascare/bench/deisa/:$PYTHONPATH; python3 /home/lmascare/bench/in-situ/bench_deisa.py 2 /home/lmascare/bench/experiment/scheduler.json > /home/lmascare/bench/experiment/analytics.e 2>&1"', Host('dahu-12.grenoble.grid5000.fr'))

In [8]:
##########################################################
#                   RUNNING SIMULATION
##########################################################

# Starting the simulation 32 - 30 for sim and 2 for dask 

mx = int(configs["mx"])
my = int(configs["my"])
mz = int(configs["mz"])

MPI_NP = mx * my * mz

print(f"Running simulation with {MPI_NP} MPI processes")

host_list = ",".join([f"{node.address}:{2}" for node in nodes])
simulation_cmd = (
    f'export PYTHONPATH=/home/lmascare/bench/deisa/:$PYTHONPATH; '
    f'pdirun {SIM_EXECUTABLE} {SIMULATION_INI} {PDI_DEISA_YML} --kokkos-map-device-id-by=mpi_rank > {OUTPUT_DIR}simulation.e 2>&1'
)

# Build the command with two singularity exec calls:
mpi_cmd = (
    'export OMP_NUM_THREADS=2; '
    'export OMP_PROC_BIND=spread; '
    'export OMP_PLACES=threads; '
    f"mpirun -np {MPI_NP} "
    # f"--host {host_list} "
    f'singularity exec {PATH_TO_SIF_FILE} bash -c "{simulation_cmd}"'
)

print("Simulation command:")
print(mpi_cmd)
mpi_process = execo.SshProcess(
    mpi_cmd,
    head_node,
)
mpi_process.start()

time.sleep(20)


Running simulation with 4 MPI processes
Simulation command:
export OMP_NUM_THREADS=2; export OMP_PROC_BIND=spread; export OMP_PLACES=threads; mpirun -np 4 singularity exec /home/lmascare/bench/docker/images/bench.sif bash -c "export PYTHONPATH=/home/lmascare/bench/deisa/:$PYTHONPATH; pdirun /home/lmascare/bench/simulation/build/main /home/lmascare/bench/simulation/setup.ini /home/lmascare/bench/simulation/io_deisa.yml --kokkos-map-device-id-by=mpi_rank > /home/lmascare/bench/experiment/simulation.e 2>&1"
