In [1]:
import os
import sys
import time
from datetime import datetime
import pandas as pd
from os import path
import yaml
from experiments import execute_command_on_server_and_clients, concat_dict

from execo import SshProcess
from execo_g5k import oarsub, oardel, OarSubmission, get_current_oar_jobs, get_oar_job_nodes, get_oar_job_info, Deployment, deploy

In [2]:
jobname="fl-measure"
nodecount=3
walltime="02:00:00"
resources_selection="-t exotic -p estats"
site="toulouse"
force_redeploy=False
environment_dsc_file='./images/fl_jetson_image.yaml'

# repository_dir = os.getcwd()
# tmp_dir = "/tmp"
# jetson_sensor_monitor = f"{repository_dir}/jetson_monitoring_energy.py"
# result_energy_file = "energy.csv"
# log_file = "logs.log"
# exp_csv = f"{repository_dir}/outputs/experiment_summary.csv"

# Reserve a job and deploy the chosen environment

In [3]:
jobs = get_current_oar_jobs()
jobid = None
waiting_jobs = []
while jobs:
    j, site = jobs.pop()
    info = get_oar_job_info(j, site)
    if info['name'] == jobname:
        if info['state'] == 'Running':
            jobid = j
            print("A {} job is already running, using it. jobid is {}".format(jobname, jobid))
            break
        else:
            waiting_jobs.append(j)
if not jobid and not waiting_jobs:
    jobspec = OarSubmission(resources="/cluster=1/nodes={}".format(nodecount), walltime=walltime,
                            additional_options=resources_selection, job_type="deploy", name=jobname,
                            queue='testing')
    jobid, _ = oarsub([(jobspec, site)]).pop()
    print("New job submitted, jobid is {}".format(jobid))
elif not jobid:
    print("One or more {} jobs exist ({}) but are not running.\n"
          " Connect to the frontend to see what is happening, and/or run the cell again.".format(
          jobname, ", ".join([str(j) for j in waiting_jobs])))

New job submitted, jobid is 448524


In [4]:
nodes = get_oar_job_nodes(jobid, site)
nodes.sort(key=lambda n: n.address)
nodes

[Host('estats-5.toulouse.grid5000.fr'),
 Host('estats-8.toulouse.grid5000.fr'),
 Host('estats-9.toulouse.grid5000.fr')]

In [5]:
server=nodes[0]
clients=nodes[1:]
print("Server:{} \n Clients: {}".format(server,clients))

Server:Host('estats-5.toulouse.grid5000.fr') 
 Clients: [Host('estats-8.toulouse.grid5000.fr'), Host('estats-9.toulouse.grid5000.fr')]


In [6]:
deployment = Deployment(hosts=nodes, env_file=os.path.abspath(environment_dsc_file))
deploy_ok, deploy_failed = deploy(deployment, check_deployed_command=not force_redeploy,
                              stdout_handlers=[sys.stdout],
                              stderr_handlers=[sys.stderr])
print("Deployement status:\n* ok: {}\n* failed: {}".format(deploy_ok, deploy_failed))





toulouse: Connection to toulouse.grid5000.fr closed.toulouse: 

Deployement status:
* ok: {'estats-8.toulouse.grid5000.fr', 'estats-5.toulouse.grid5000.fr', 'estats-9.toulouse.grid5000.fr'}
* failed: set()


In [None]:
from utils_experiment import MyExperiment

params = {
    "params.num_rounds":[3, 5, 10],
    "client.lr" : [1e-1],
    #"client.local_epochs": [1, 3, 5],
    "client.decay_rate": [0.1],
    "client.decay_steps": [10],
    
    "neuralnet":["MobileNetV3Small"],
    "strategy": ["fedavg"],
    "optimizer": ["SGD"],
}

to_remove = ["client.cid","client.dry_run","params.root_data","params.num_classes","tmp_result_folder","energy_file","exp_datetime"]

Exps = MyExperiment(params=params,nodes=nodes,key_to_remove=to_remove)
Exps.run()

# BELOW SECTIONS ARE NOT NECESSARY ANYMORE

# Defining hyper parameters

In [8]:
repository_dir = os.getcwd()
tmp_dir = "/tmp"
jetson_sensor_monitor = f"{repository_dir}/jetson_monitoring_energy.py"
result_energy_file = "energy.csv"
log_file = "logs.log"
exp_csv = f"{repository_dir}/outputs/experiment_summary.csv"

In [9]:
exp_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

result_folder = f"{repository_dir}/outputs/{exp_datetime}/"
!mkdir -p $result_folder 
tmp_result_folder = f"{tmp_dir}/{exp_datetime}/"

# Create hyperparameters folder
hyperparams = {}
hyperparams["result_folder"]=result_folder
hyperparams["result_energy_file"]=result_energy_file
hyperparams["log_file"]=log_file
hyperparams["tmp_result_folder"]=tmp_result_folder
hyperparams["exp_datetime"]=exp_datetime
hyperparams["sleep_duration"]=30
hyperparams["timestamps"]={}

In [10]:
hyperparams["server"] = server.address
for cid in range(len(clients)):
    hyperparams[f"client_{cid}"] = clients[cid].address

In [54]:
path_to_yaml = repository_dir + "/config/config_file.yaml"
with open(path_to_yaml) as yaml_file:
    yaml_contents = yaml.load(yaml_file, Loader=yaml.FullLoader)
hyperparams.update(yaml_contents)

# RUN EXPERIMENT

First step is to get to IP adress of the server so the clients can connect to it. To get it, we send a SshProcess to the server:

In [13]:
def get_host_ip(hostname):
    command = f"hostname -I"
    process = SshProcess(command, host=hostname)
    process.run()
    if process.ok:
        ip_address = process.stdout.strip()
        process.kill()
        return ip_address
    else:
        process.kill()
        return f"Failed to get IP for {hostname}"

# Example usage
hostname = "your_host"
ip_address_full = get_host_ip(server)
ip_address = ip_address_full.split(" ")[0]
print(f"IP address for {hostname} is {ip_address}")
hyperparams["comm"]["host"]=ip_address

IP address for your_host is 172.16.121.5


Now we define the SshProcess for the server and the clients, providing them with the hyper parameters of this experiment as defined in the previous section.
We first save results locally (in the /tmp/ folder) to reduce communication overhead. Results are copied back to the home dirs at the end of the training.

In [14]:
command = f"mkdir -p {tmp_result_folder}; echo -n > {tmp_result_folder}/logs.log"
_ = execute_command_on_server_and_clients(nodes, command, f"{repository_dir}/outputs/logs.log")

In [15]:
command = f"cd {repository_dir}; \
    python3 main_server.py \
    hydra.run.dir={tmp_result_folder} \
    comm.host={ip_address}"
print(command)
run_server = SshProcess(
    command, 
    host=server, 
    connection_params={'user':'root'}, 
    stdout_handlers=[sys.stdout, f"{repository_dir}/outputs/logs.log"], 
    stderr_handlers=[sys.stderr, f"{repository_dir}/outputs/logs.log"]
    )

cd /home/tunguyen/jetson-test;     python3 main_server.py     hydra.run.dir=/tmp/2024-01-31_12-48-25/     comm.host=172.16.121.5


In [16]:
run_clients = []
for (host,cid) in zip(clients,range(len(clients))):
    command = f"cd {repository_dir}; python3 client.py client.cid={cid} hydra.run.dir={tmp_result_folder} comm.host={ip_address}"
    run_client=SshProcess(
        command, 
        host=server, 
        connection_params={'user':'root'},
        stdout_handlers=[sys.stdout, f"{repository_dir}/outputs/logs.log"], 
        stderr_handlers=[sys.stderr, f"{repository_dir}/outputs/logs.log"])
    run_clients.append(run_client)

We can start the server, wait a few seconds so that it's ready before starting the clients.

In [18]:
# start the monitoring
command = f"python3 {jetson_sensor_monitor} --log-dir {tmp_result_folder} --log-csv {result_energy_file}"
jtop_processes = execute_command_on_server_and_clients(nodes, command, background=True)
# sleep
hyperparams["timestamps"]["start_experiment_before_sleep"]=time.time()
time.sleep(hyperparams["sleep_duration"])
hyperparams["timestamps"]["start_experiment"]=time.time()
# start the server and the clients
run_server.start()
time.sleep(5)
for run_client in run_clients:
    run_client.start()

The server will disconnect once the training is done. So we wait until the server SshProcess is done to process results and start another experiment.

In [19]:
# wait until the training is done
run_server.wait()
# kill the monitoringz
for proc in jtop_processes:
    proc.kill()
# copy the logs back to the home repository
cp_command = f"mkdir -p {result_folder}/server/; cp {tmp_result_folder}/* {result_folder}/server/" #; rm -r {tmp_result_folder}"
execute_command_on_server_and_clients([server], cp_command, f"{repository_dir}/outputs/logs.log")
for cid in range(len(clients)):
    cp_command = f"mkdir -p {result_folder}/client_{cid}/; cp {tmp_result_folder}/* {result_folder}/client_{cid}/" #; rm -r {tmp_result_folder}"
    execute_command_on_server_and_clients([clients[cid]], cp_command, f"{repository_dir}/outputs/logs.log")
# save the timestamps and sleep to make sure the power goes down
hyperparams["timestamps"]["end_experiment"]=time.time()
time.sleep(hyperparams["sleep_duration"])
hyperparams["timestamps"]["end_experiment_after_sleep"]=time.time()
# tmp_result_folder

Connection to estats-5.toulouse.grid5000.fr closed.
Connection to estats-5.toulouse.grid5000.fr closed.
Connection to estats-5.toulouse.grid5000.fr closed.


# Process results and save it as csv

In [None]:
def concat_dict(dict_list):
    new_dict = {}
    for d in dict_list:
        if isinstance(d, (dict)):
            new_dict.update(d)
    return new_dict

res = concat_dict(hyperparams["defaults"])
hyperparams["defaults"] = res

hyperparams_normalized = pd.json_normalize(hyperparams)
hyperparams_normalized

In [None]:
if path.exists(exp_csv):
    df = pd.read_csv(exp_csv)
    df = pd.concat([df, hyperparams_normalized], ignore_index=True)
else:
    df = pd.DataFrame(hyperparams_normalized)
df.to_csv(exp_csv, index=False)

If something went wrong:

In [None]:
run_server.kill()
[run_client.kill() for run_client in run_clients]

# Kill the job once all the experiments are done.

In [None]:
oardel([(jobid,site)])