In [None]:
import dask
from dask.distributed import Client, progress
from osirisprocess import OsirisProcess
import glob
import pandas as pd
import numpy as np
from itertools import compress

In [None]:
# We define the list of dask workers setted up in different nodes [node1-node4]
client0 =  Client("NODE1:8786")
client1 =  Client("NODE2:8786")
client2 =  Client("NODE3:8786")
client3 =  Client("NODE4:8786")

In [None]:
# Restart the clients

clients = [client0,client1,client2,client3]
[i.restart() for i in clients]

In [None]:
# Select the DATA PATH
basepath= "/OSIRIS_SIMULATION_DATA_PATH"
# Base name, all the simulation contained in the basepath (Same shape, but Length/Radii of antenna changed = SCAN) start with the same name
basename = f"{basepath}/SCAN_NAME"

# Function which is send to each dask worker, just take the argument to simulation
# to be processed
def run(path):
    import sys
    # Add the path where the OsirisProcess module is located
    sys.path.insert(0, 'OSIRIS_PROCESS_MODULE_PARENT_DIRECTORY_PATH') 
    from osirisprocess import OsirisProcess
    import pandas as pd
    
    # Depending on what you want to comment or uncomennt the lines you want.
    s = OsirisProcess(path,1)
    s.convert_h5_to_zarr() #CONVERT FROM HDF5 TO ZARR
    #s.load_fields_zarr() 
    #s.convert_to_cyl()
    #max_values = s.find_max_values()
    #df = pd.read_pickle("datasets/SCAN_FILE.pckl")
    #name = path.split(basepath)[1]
    #max_values = {path: df.loc[name].to_dict()}
    #s.set_max_values(max_values)
    #return max_values

# Store all the simulation paths 
paths = [i for i in sorted(glob.iglob(f"{basename}*"))] 
# Split the list of paths in 4 parts, one for each dask client (CLIENT0-CLIENT3)
split_paths  = [paths[start::len(clients)] for start in range(4)]
# Calculate the lengths of paths
lengths = np.array([len(i) for i in split_paths])

In [None]:
# Main cell, which will process batchs of 4 simulations in parallel.

results = []
for i in range(lengths.max()):
    if i < lengths[0]:
        print(split_paths[0][i])
        t1 = client1.submit(run,split_paths[0][i])
    if i < lengths[1]:
        print(split_paths[1][i])
        t2 = client2.submit(run,split_paths[1][i])
    if i < lengths[2]:
        print(split_paths[2][i])
        t3 = client3.submit(run,split_paths[2][i])
    if i < lengths[3]:
        print(split_paths[3][i])
        t4 = client4.submit(run,split_paths[3][i])
    try:
        if i < lengths[0]:
            results.append(t1.result())
    except:
        results.append({f"c0_{i}": "error"} if i < lengths[0] else None)
    try:
        if i < lengths[1]:
            results.append(t2.result())
    except:
        results.append({f"c1_{i}": "error"} if i < lengths[1] else None)
    try:
        if i < lengths[2]:
            results.append(t3.result())
    except:
        results.append({f"c2_{i}": "error"} if i < lengths[2] else None)
    try:
        if i < lengths[3]:
            results.append(t4.result())
    except:
        results.append({f"c3_{i}": "error"} if i < lengths[3] else None)
    client1.restart()
    client2.restart()
    client3.restart()
    client4.restart()

In [None]:
# If we calculate the max values. This converts the results to a pandas dataframe and store into file.

key_names = [list(i.keys())[0].split(basepath)[1] for i in results[:-3]]
sort_key = lambda x: [float(x.split("SPLIT_MARKER")[-1]) for  i in x] #THE SPLIT MARKER SHOULD SEPARATE THE COMMON FIRST PART OF ALL SIMULATION DIRECTORIES WITHIN THE SCAN AND THE SCAN PARAMETER IN THE LAST PART
index_names = [list(i.keys())[0].split(basepath)[1] for i in results[:-3]]
index_names = sorted(list(dict.fromkeys(index_names)),key=sort_key)
results_df = [pd.DataFrame(i[list(i.keys())[0]]) for i in results[:-3]]
final_df = pd.concat(results_df,keys=key_names)

sorted_df_list = [final_df.loc[i] for i in index_names]
sorted_df = pd.concat(sorted_df_list,keys=index_names,sort=False)
sorted_df.to_pickle("datasets/SCAN_FILE.pckl")