<div style='background-color:#f7f7f7; padding-top:30px; padding-left:20px; padding-right:20px; padding-bottom:30px'>
    <center>
        <div style='  display: block;
  font-size: 2em;
  font-weight: bold;  display: block;
  font-size: 2em;
  font-weight: bold;'>MAPD-B - Preprocessing of SEVN data for binary black holes mass distribution analysis
        </div>
    <center>
        </br>
    <i>Tommaso Bertola, Giacomo Di Prima, Giuseppe Viterbo, Marco Zenari</i></center></div>

# Introduction: the computational problem

Our aim for this project is to preprocess the data of multiple SEVN simulations of binary systems.

SEVN is Python program developed by the Astronomy Department at the University of Padua to simulate the evolution of binary systems. The evolution takes into account different physical phenomena which obey the patterns and laws obeserved in the Universe, especially those seen in the stellar tracks.

We will focus our attention specifically to those systems evolving into binary black holes. To study these systems we therefore need to extract from the whole dataset produced by SEVN only some information regarding the initial and final conditions of the binary system evolution.

The final goal is to obtain a simple and handy DataFrame listing only those features.

# Data structure

To better understading the problem, we will briefely describe the dataset we are given.

The dataset consists of a number of folders named after some "hyperparameters" given to SEVN while performing the simulations.
In our case, we are given 60 different folders, whose names are like `sevn_output_Z0.001A1L1`, `sevn_output_Z0.03A5L1`, ... More specifically the hyperparameters are the numbers following the letters `Z` and `L` which will be included in the final output DataFrame for each record.

Inside each folder there are three kinds of files:
* `output_{nthread}.csv`
* `logfile_{nthread}.dat`
* `evolved_{nthread}.dat`

where {nthread} is a number ranging from 0 to 29, corresponding to the thread responsible for the computation of those simulations. 

On average, each of the `output_{nthread}.csv` files occupies 750MB, `logfile_{nthread}.dat` 200MB, and `evolved_{nthread}.dat` 50MB. 

In total, each folder occupies between 26 to 31GB of data for a gross total of around 1.7TB.

## File schema
We describe the schema of the three different kind of files to better explain the following parsing process

### `evolved_{nthread}.dat` schema
These files contain the initial properties of the systems that has been successfully evolved by SEVN.
These are **fixed width** fields files and therefore we used the `pd.read_table` function to parse them.
The fields we are interested in reading are reported below.

|Column Name| Data type | Description|
|:----|:----|:-----|
|name|string `0_1234....`|Unique in each folder|
|Mass_0|float|Initial mass of star 0 (in Solar masses)|
|Mass_1|float|Initial mass of star 1 (in Solar masses)|
|Z_0|float|Metallicity, same as in directory name (both stars have the same)|
|SN_0|string `rapid_gauNS`|Supernova model for star 0|
|SN_1|string `rapid_gauNS`|Supernova model for star 1|
|a|float| Initial semimajor axis of the binary |
|e|float|Initial eccentricity of the binary|
                      
All other fields in the files are discarded in the analysis.

### `output_{nthread}.csv` schema
The output files contains the final conditions of the simulations.
Each row is a different binary system, and each column indicates a different feature of the simulation.
The file is formatted in a tipical **csv format**.
We report in the following table the columns we are interested in. 

|Column Name| Data type | Description|
|:----|:----|:-----|
|name |string `0_1234...`| Unique in each folder|
|Mass_0|double| Mass of object 0 (in Solar masses)|
|Mass_1|double| Mass of object 1 (in Solar masses)|
|RemnantType_0|int|Type of object 0 after evolution|
|RemnantType_1|int|Type of object 1 after evolution|
|Semimajor|float|Semimajor of the binary (in Solar radius)|
|Eccentricity|float| Eccentricity of the binary|
|GWtime|float|Gravitational wave orbital decay time|
|BWorldtime|int|Time elapsed in the simulations|

All other fields in the files are discarded in the analysis.
 

### `logfile_{nthread}.dat` schema
Logfiles are **plain text** files, containing the description of a particular astrophysical event in each row.
Each event is univocally associated to a specific binary system evolution by its `name`, however there could be multiple events regarding the same system.
To recover the relevant information we have to run different regular expressions.
Each regex used specifically captures the type of the event, namely `RLO_BEGIN`, `CE`, or `BSN`. 

An example of the content of these files is given by the following rows

<div style='background-color:#f7f7f7; padding-top:30px; padding-left:20px; padding-right:20px; padding-bottom:30px'>B;0_474492234654248;0;COLLISION;23.667631;0:9.44621:18.0861:3:1:2.61393:1.21104:1:38.822:0.50739:19.1197:10.6772
B;0_474492234654248;0;MERGER;23.667631;0:9.446e+00:1.906e+00:0.000e+00:3:0:18.1224:1:2.614e+00:0.000e+00:0.000e+00:1:0:1.21104:12.0601:38.822:0.50739
S;0_474492234654248;0;NS;25.471686;5:1.46229:4.64623e+12:63.4538:0.98692
S;0_474492234654248;0;SN;25.471686;10.6025:3.06259:1.65091:1.46229:5:2:453.262:345.701:-197.664:-212.483:-187.853
S;0_641394535500269;0;WD;32.367569;3.2735:2.57868:1.29894:1.29894:3
</div>

# Tools
To parse the data we leverage on distributed computing techniques. 
Specifically we extensively use Python Dask library which enable us to manage a cluster of workers hosted on CloudVeneto virtual machines.

## Infrastracture configuration
![network_configuration.png](network_configuration.png)

We will be using 6 different virtual machines, `bhbh-{d,1,2,3,4,5}`. 
The first, `bhbh-d`, serves as a NFS server, allowing all the other VMs `bhbh-{1,2,3,4,5}` to read, write and crunch the archived data as can be seen from the picture above with the green arrows.
Highlighted in orange, all `bhbh-{1..5}` are connected to a Dask cluster, managed solely by `bhbh-1` which performs the action of client, scheduler and worker at the same time.

### Setting up `bhbh-d`
After instantiating the VM `bhbh-d` of flavor `cloudveneto.medium`, we attach the volume `bhbh-v` to `/dev/vdb` which is then mounted on `/mnt/bhbh-v`.
To allow the other VMs to see the new volume, we use the software provided by `nfs-kernel-server`.

The NFS server is configured such that `/mnt/bhbh-v` can be seen by the other VMs and therefore mounted therein.
It is important to note that the configuration of the sharing of the volume is made in `sync` mode.
The reason being the cluster might potentially alter the consistency of the data if `async` mode were used. 

Finally, the actual data were copied from the `demoblack.fisica.unipd.it` server to the mounted volume.
The operation took a few hours as the total size of the data is around 1.7TB.
The operation was carried out via `rsync` utility in order to resume the copying was the network to fail for some unspecified errors.

### Setting up `bhbh-{1,2,3,4,5}`
The VMs for the proper data crunching are of the flavour `cloudveneto.large`, having 8GB of RAM each for a total amount of 32GB<a name="cite_ref-1"></a>[<sup>[1]</sup>](#cite_note-1).
The following operations are equally repeated on all the VMs in order to have equally working systems.

After installing the `nfs-utils` package, we mount the Network volume issuing the following command `mount -t nfs 10.67.22.169:/mnt/bhbh-v /mnt/bhbh`. 
For simplicty sake we use `conda` as a package manager. We install the `dask[complete]` suite of packages.
This set up is enought to allow the cluster to work.

### Particular set up of bhbh-1
The `bhbh-1` has some slight differences in the software installed as it is used for different puropses.
`bhbh-1` hosts the Jupyter server, acting as a client to the cluster, the cluster scheduler and also a cluster worker.
The reason why this set up is used is just for optimizing the memory and computing resources available.
A better configuration would have been achieved by moving the client and the scheduler to another VM but this was not easy to do as cloud resources were limited.
Indeed, one of the main issues we came across during the tests was a lack of RAM in the `bhbh-1`, presumably due to the exchange of data between workers. For this reason we are discussing different approaches in the following.




<a name="cite_note-1"></a>1. [^](#cite_ref-1) 32GB is comparable to the size of one folder.

# Preprocessing on the files

In this section we are going to give a brief summary of the operations that we have to do on the files for our preprocessing. 
As anticipated, the final goal is to obtain a dataframe containing only the useful informations and save it for further analysis.

In order to obtain only one dataframe we need to read, clean and extract informations from each of the three type of files and finally merge all the informations for each binary in a dataframe.


All the preprocessing operations are written inside a function that is then executed in a delayd way with dask. 

For the preprocessing different attemps have been tried, and in the following section we are going to describe the pros and cons of each of them.

Now we take as example one preprocessing function in order to describe the basic operations that we have done. 
The differences in the preprocessing function of each attempt will be highlighted in the corresponding sections.

In [None]:
# this function does the preprocessing
# on three files of the same thread:
# output_{thread}, evolved_{thread}, logfile_{thread}

def preprocessing_bag_of_thread(paths):
    '''
       paths = python list of the paths of the three files considered [output, evoleved, logfile]
    '''
    
    # list of column names and types to read
    
    # output_{}.csv
    output_column_to_read = ['name', 'Mass_0', 'RemnantType_0',
                             'Mass_1', 'RemnantType_1',
                             'Semimajor','Eccentricity',
                             'GWtime','BWorldtime']
    output_column_type = ['string', 'float64', 'int64',
                          'float64', 'int64',
                          'float64', 'float64',
                          'float64', 'float64']

    # evolved_{}.dat
    evolved_column_to_read = ['name', 'Mass_0',
                              'Z_0', 'SN_0',
                              'Mass_1', 'SN_1',
                              'a', 'e']
    evolved_column_type = ['string', 'float64',
                           'float64', 'string',
                           'float64', 'string',
                           'float64', 'float64']
    
    # further columns to remove at the end 
    drop_list = ['RemnantType_0',  'RemnantType_1']
    
   
    #OUTPUT files processing
    
    output = pd.read_csv(paths[0],                              # read the file
                         usecols=output_column_to_read,         # read only some cols
                         dtype=dict(zip(output_column_to_read,  # specify the types
                                        output_column_type))).\ #
                rename(columns={'Mass_0':'Mass_0_out',          # rename columns
                                'Mass_1':'Mass_1_out'})         #

    # mask to select only the black holes binaries, defined by RemnantType
    idxBHBH=(output.RemnantType_0==6) & (output.RemnantType_1==6) & (output.Semimajor.notnull())
    
    # apply the mask
    output=output[idxBHBH]
        
    
    #EVOLVED files processing
    
    #extracting the alpha parameter from the path of the file 
    alpha = float(re.findall(r".+(?<=A)(.*)(?=L)",
                             paths[1])[0])
    
    #read the columns we are interested in from the evolved file
    evolved = pd.read_table(paths[1],                               # read file
                            sep='\s+',                              # separate by spaces
                            usecols=evolved_column_to_read,         # read only some columns
                            dtype=dict(zip(evolved_column_to_read,  # specify the types
                                           evolved_column_type)))   #
    #NB: sep='\s+' is need because there are different number of spaces separareting the columns
    
    #adding the column with the alpha parameter
    evolved['alpha'] = alpha
    
    
    #LOGFILE files processing
    
    logfile = pd.read_csv(paths[2],    # read the file
                          header=None) # there is no header

    
    #Running Regex on the line of the logfile to extrac useful informations
    df_RLO = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);RLO_BEGIN;").\  # searching for string "RLO_BEGIN"
                dropna().\                                                    # dropping nan
                rename(columns={0:'name', 1:'ID'}).\                          # rename columns
                groupby(['name']).\                                           # grouping by name
                size().\                                                      # and counting the number of RLO
                to_frame(name='RLO').\                                        # 
                reset_index()                                                 # to have a nice dataframe

    
    df_CE = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);CE;").\  # searching for string "CE"
                dropna().\                                            # dropping nan
                rename(columns={0:'name', 1:'ID'}).\                  # rename
                groupby(['name']).\                                   # grouping by name
                size().\                                              # 
                to_frame(name='CE').\                                 # and counting the number of CE
                reset_index()                                         # to have a nice dataframe
    

    df_BSN = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);BSN;").\  #searching for string "BSN"
                dropna().\                                              # dropping nan
                rename(columns={0:'name', 1:'ID'}).\                    #rename
                groupby(['name']).\                                     #grouping by name
                size().\                                                #
                to_frame(name='BSN').\                                  #and counting the number of BSN
                reset_index()                                           #to have a nice dataframe

    
    #MERGE
    bhbh = evolved.merge(output, on=['name'], how='inner').\  #innerg join on the name between wvolved and output
                   merge(df_RLO, on=['name'], how='left').\   #left join on the name with df_RLO
                   merge(df_CE,  on=['name'], how='left').\   #left join on the name with df_CE
                   merge(df_BSN, on=['name'], how='left').\   #left join on the name with df_BSN
                   fillna(value=0).\                          #setting nan to zero
                   drop(columns=drop_list)                    #dropping no longer useful columms
    
    
    #Adding some columns with physical meaning
    bhbh['tdelay'] = bhbh['GWtime'] + bhbh['BWorldtime'] #time delay
    
    #defining the max mass of output
    bhbh['Mass_max_out'] = bhbh['Mass_1_out']
    bhbh['Mass_max_out'] = bhbh['Mass_max_out'].\
                            where(cond=(bhbh['Mass_max_out'] > bhbh['Mass_0_out']),
                                  other=bhbh['Mass_0_out'])

    #defining q=m1/m2 with m2>,m1
    bhbh['q'] = bhbh['Mass_1_out']/bhbh['Mass_0_out']
    bhbh['q'] = bhbh['q'].\
                where(cond=(bhbh['Mass_1_out'] < bhbh['Mass_0_out']),
                      other=bhbh['Mass_0_out']/bhbh['Mass_1_out'])
    
    #defining the Chirp mass
    bhbh['Mass_chirp'] = ((bhbh['Mass_0_out'] * bhbh['Mass_1_out'])**(3/5))/((bhbh['Mass_0_out'] + bhbh['Mass_1_out'])**(1/5))
    
    return bhbh # return the pandas DataFrame

# Alternative solutions
Here we present a summary of the different approaches we tried to tackle the problem. 

## Preprocessing with no optimization - the whole dataset all at once
The first attempt consists in loading the whole dataset (1.7TB) into three `dask.dataframe`: respectively all the output.csv, the evolved.dat and the logfile.dat files.

This solution is the easiest to implement code wise but it is higly inefficient since it does not take advantage of the hierarchical structure of the data. 
Because of the cluster's memory limitation, the RAM fills up due to the shuffling operations as soon the merging operations begin.
For this reason, this approach does not manage to process the entire volume of data and we have to serialize the operations over smaller batches.

The code used for this solution differs from the one previously shown only in the way files are read and the columns on hich the merges are carried out.
The files are read all together by leveraging the wildcards capabilities of Dask, instead of using a Pandas DataFrame.
The merges are performed on `name`, `Z_0` and `alpha` fileds as these three values uniquely identified the binary systems in the whole dataset.
Even varying the number of partitions to higher numbers did not help with the high RAM usage.

Note that with the defaults values, Dask would create the following number of partitions for the whole dataset:
* partitions for evolved files : 1800
* partitions for output files : 21099
* partitions for logfile files : 4350

In [None]:
import dask.dataframe as dd  # required to use the dask dataframes

# omissis code

output = dd.read_csv('/mnt/bhbh/fiducial_Hrad_5M/sevn_output_*/0/output_*.csv')   # all files in all directories
logfile = dd.read_csv('/mnt/bhbh/fiducial_Hrad_5M/sevn_output_*/0/logfile_*.dat', header=None)
evolved = dd.read_table('/mnt/bhbh/fiducial_Hrad_5M/sevn_output_*/0/evolved_*.dat', sep='\s+')

# omissis code

bhbh = evolved.merge(output, on=['name','Z_0','alpha'], how='inner').\ # note how the merges are performed
               merge(df_RLO, on=['name','Z_0', 'alpha'], how='left').\
               merge(df_CE,  on=['name','Z_0', 'alpha'], how='left').\
               merge(df_BSN, on=['name','Z_0', 'alpha'], how='left').\
               fillna(value=0).\
               drop(columns=drop_list)

# omissis code

## Folder and thread wise preproccessing
In order to reduce even further the computational burden of the merge operations, we exploit the thread-wise organisation of the data by delaying the whole preprocessing over every triplet of files (output, evolved and logfile) in each folder.
This is done by appending each task to a list and submitting it to the cluster to compute it.
Even this approach fails to process the whole dateset all at once but we noticed it can analyse 300 GB at a time (~10 folders of data).

The differences of the preprocessing code run are shown below. 
More explicitly: data is read folder by folder and thread by thread with dask dataframes.

Notice there is an additional argument `n_part` and method `repartition` in the code.
They are used only for benchmarking purposes and the first time the function was written we used the default number of partitions

In [None]:
def FGpreprocessing_partitions(dir_path: str, n_thread: int,  # notice the explicit signature
                               output_column_to_remove: list,
                               evolved_column_to_remove: list,
                               drop_list: list, n_part: int): 
    
    # file names to read    
    output_str = f'{dir_path}/0/output_{n_thread}.csv'    # now we read only one folder at a time
    evolved_str = f'{dir_path}/0/evolved_{n_thread}.dat'  #
    logfile_str = f'{dir_path}/0/logfile_{n_thread}.dat'  #
    
   # omitted code    

    output = dd.read_csv(output_str).\
                rename(columns={'Mass_0':'Mass_0_out',
                                'Mass_1':'Mass_1_out'}).\
                drop(columns=output_column_to_remove).\
                repartition(npartitions = n_part)            # note the possibility to repartition the output files
                                                             # it is used only for benchmarking purposes
                                                             # it was not used originally
                                                             # default number of partition was used instead 
    
    # the following code is identical to the function above
    # except for the use of dask dataframes instead of pandas dataframe
    '''
    dd.read_csv instead of pd.read_csv
    '''
    # the following code is therefore omitted


In [None]:
# this code is to properly map the function to the direcotries and threads

n_threads = 30 # the number of thread to analyse
bhbh_list=[]   # to store the delayed objects

for dir_name in dir_list:         # scan some directories
    for i in range(n_threads):    # scan the threads
        _ = dask.delayed(FGpreprocessing_partitions)(dir_name, i,              # indexes to point to files
                                                     output_column_to_remove,  # stuff to remove
                                                     evolved_column_to_remove, #
                                                     drop_list)                #
        bhbh_list.append(_)       # save in the list
        
results = dask.compute(*bhbh_list) # trigger the computation
results= dask.compute(*results)    # 

For this approach we managed to carry out some benchmarks which are presented in the next section.

## Repartition by column 'name' the whole dataset 1.7TB - no time wise benefits
As we were able to see, the merging operations are quite RAM intensive to carry out, especially for very large chunks of data.
According to the documentation, these operations can be sped up by using the dataframe `set_index` method, which explicitly tells dask how to address the data.

In this approach we set the column `name` as the index of each dask dataframe.
The dataframe contains every file of a specific kind for a a given folder. 
Then we repartition the dataframes such that each new partition contains only records with the same name.
However, while the merge operations actually get faster, the whole setting index and repartitioning processes take so much time that by the end we do not see any benefits, time wise.
Therefore we do not carry on with this strategy.

## Serialization over the folders and repartion over name - it works but takes 10 hours
Taking inspiration from the previous attempt, we leveraged on the data structure, namely the 60 folders.
We process one folder at a time in a batch like system developed in a for cycle, processing 30GB at once.
This serialization let us avoid processing the whole dataset of 1.7TB at once and reduces the computational weight of operations like `merge`, `reset_index` and `repartition` as they are carried out on fewer data, ~30GB.

The whole task takes about 10 hours to fully run and being saved to `parquet` files.
Due to the time constraints this approach requires, it is was not benchmarked.

The function used has the following form. 
We highlight only the main differences in the structure.

In [None]:
def FG_new(alpha: float, dir_path: str,      # signature with explicit types
           output_column_to_remove: list,    #
           evolved_column_to_remove: list,   #
           drop_list: list):                 #
    
    output_str = f'{dir_path}/0/output_*.csv'   # read all threads at once
    evolved_str = f'{dir_path}/0/evolved_*.dat' #
    logfile_str = f'{dir_path}/0/logfile_*.dat' #

    # omitted code to read the files and drop unnecessary columns
    
    dask_divisions = output.set_index("name").divisions                    # this codes actually performs the best repartition and division splitting
    unique_divisions = list(dict.fromkeys(list(dask_divisions)))
    
    # omitted code
    # actually perform the merges and 
    # the remaining computations
    
    return bhbh

In [None]:
# this code serializes the computation of the while dataset

bhbh_list=[]                                                     # the list to store the delayed objects
for i,directory in enumerate(dir_list):                          # for all direcotries
    bhbh_list.append(client.submit(FG_new,alpha[i], directory,   # submit these computations
                                   output_column_to_remove,
                                   evolved_column_to_remove,
                                   drop_list))

for i in range(len(bhbh_list)):  # for each delayed object
    bhbh_list[i].compute().to_parquet('/mnt/bhbh/folder_by_folder_repartition/direcotry_'+str(i)+'.parquet') # compute and save its result

## Thread wise preproccessing using Dask Bags
Still takeing advantage of the thread-wise structure of the data, we change the paradigm of the task: we construct a Dask bag that contains one list for each triplet of threaded files inside the whole dataset. 
Given one of this lists, each element is the path to a particular kind of file (output, evolved or logfile), with the same thread. 
We can now apply the preprocessing function, which takes the files' paths as arguments, to each element of the bag and compute a single dataframe as a result by casting the whole bag as a dictionary and than to a dataframe.
The benchmarks results are presented in the next section.

In [None]:
def preprocessing_bag_of_thread(paths):
    
    #lists of columns to read for each file and corresponding type
    output_column_to_read = ['name', 'Mass_0', 'RemnantType_0', 'Mass_1', 'RemnantType_1',
                         'Semimajor','Eccentricity','GWtime','BWorldtime']

    output_column_type = ['string', 'float64', 'int64', 'float64', 'int64',
                      'float64', 'float64', 'float64', 'float64']

    evolved_column_to_read = ['name', 'Mass_0', 'Z_0', 'SN_0', 'Mass_1', 'SN_1', 'a', 'e']


    evolved_column_type = ['string', 'float64', 'float64', 'string', 'float64', 
                      'string', 'float64', 'float64']

    drop_list = ['RemnantType_0',  'RemnantType_1']
    
   
    #Preprocessing OUTPUT
    
    #reading the file
    output = pd.read_csv(paths[0], usecols=output_column_to_read, dtype=dict(zip(output_column_to_read, output_column_type))).\
                rename(columns={'Mass_0':'Mass_0_out', 'Mass_1':'Mass_1_out'})
    
    #mask to select only the binaries we are interested in
    idxBHBH=(output.RemnantType_0==6) & (output.RemnantType_1==6) & (output.Semimajor.notnull())
    output=output[idxBHBH]    
    
    
    #preprocessing EVOLVED
      
    #reading the file
    evolved = pd.read_table(paths[1], sep='\s+', usecols=evolved_column_to_read, dtype=dict(zip(evolved_column_to_read, evolved_column_type)))                
    
    #extracting alpha with a regex
    alpha = float(re.findall(r".+(?<=A)(.*)(?=L)", paths[1])[0])
    evolved['alpha'] = alpha
    
    
    #preprocessing LOGFILE
    
    logfile = pd.read_csv(paths[2], header=None)
    
    
    #extracting informations with regex

    df_RLO = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);RLO_BEGIN;").\
                dropna().\
                rename(columns={0:'name', 1:'ID'}).\
                groupby(['name']).\
                size().to_frame(name='RLO').\
                reset_index()

    df_CE = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);CE;").\
                dropna().\
                rename(columns={0:'name', 1:'ID'}).\
                groupby(['name']).\
                size().to_frame(name='CE').\
                reset_index()

    df_BSN = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);BSN;").\
                dropna().\
                rename(columns={0:'name', 1:'ID'}).\
                groupby(['name']).\
                size().to_frame(name='BSN').\
                reset_index()

    
    #MERGE
    bhbh = evolved.merge(output, on=['name'], how='inner').\
                   merge(df_RLO, on=['name'], how='left').\
                   merge(df_CE,  on=['name'], how='left').\
                   merge(df_BSN, on=['name'], how='left').\
                   fillna(value=0).\
                   drop(columns=drop_list)
    
    
    #add some useful columns
    bhbh['tdelay'] = bhbh['GWtime'] + bhbh['BWorldtime']

    bhbh['Mass_max_out'] = bhbh['Mass_1_out']
    bhbh['Mass_max_out'] = bhbh['Mass_max_out'].\
                            where(cond=(bhbh['Mass_max_out'] > bhbh['Mass_0_out']), other=bhbh['Mass_0_out'])

    bhbh['q'] = bhbh['Mass_1_out']/bhbh['Mass_0_out']
    bhbh['q'] = bhbh['q'].\
                where(cond=(bhbh['Mass_1_out'] < bhbh['Mass_0_out']), other=bhbh['Mass_0_out']/bhbh['Mass_1_out'])

    bhbh['Mass_chirp'] = ((bhbh['Mass_0_out'] * bhbh['Mass_1_out'])**(3/5))/((bhbh['Mass_0_out'] + bhbh['Mass_1_out'])**(1/5))
    
    
    return bhbh #a pandas dataframe

# Benchmarks
In order to benchmark our preprocessing functions we decide to vary the number of workers, threads and partitions, testing this variations on a single directory of our dataset for a total of 5 times to obtain meaningfull statistics.
Also, we try to see if there is any benefit in using the `multiprocessing` options for the workers, instead of the default `distributed`. We decide to follow a best-result to best-result approach rather than a grid search strategy on the whole parameter space since this latter approach would be extremly time consuming. 
Firstly we test different couples (`number_of_workers`, `number_of_threads`), letting the partitions be taken automatically. 
Using the couple of paramteres that took the least amount of time, we assess the effect of the `multiprocessing` option for the workers. 
Eventually we analyse the impact of varing the number of partitions that the cluster handles. 
For the data scalability test we evaluate the performance of our best combination of parameters on a varing number of directories. 

## FG_bag

|Parameter type| Tested values| 
|:----|:----|
| Number of Workers for each  VM   | [1, 2, 4]  |
| Number of Threads for each  VM   | [1, 2, 4]  |
| Workers option                   | [`Multiprocessing`, `Distributed`] |
| Number of Partitions             | TOBEDEFINED |
| Number of Directory              | [1, 2, 3, 4]  |


In [None]:
attempt = 5
workers_list = [1, 2, 4]
threads_list = [1, 2, 4]
partitions_list = [1, 12, 24] 
time_list = np.zeros(shape=(attempt, len(partitions_list), len(workers_list), len(threads_list)))
multiproccesing_method = False

for a in range(attempt):
    for d in range(len(partitions_list)):
        for w in range(len(workers_list)):
            for t in range(len(threads_list)):
                print('a:', a, '\t','d:', partitions_list[d], '\t', 'w:', workers_list[w], '\t','t:', threads_list[t])

                #cluster up
                cluster = SSHCluster(
                ["bhbh-1", "bhbh-1", "bhbh-2", "bhbh-3", "bhbh-4", "bhbh-5"],
                connect_options={"client_keys": "/home/ubuntu/private/tbertola_key.pem"},
                worker_options={"n_workers": workers_list[w],
                                "nthreads": threads_list[t]}, # because each bhbh-* has 4 cores
                scheduler_options={"port": 8786, "dashboard_address": ":8787"}
                                    )
                client=Client(cluster)
                
                #Configure workers as multiprocessing instead of distributed (default option)
                if multiproccesing_method == True:
                    dask.config.set({'distributed.worker.multiprocessing-method': 'spawn'})

                #begin time
                time_i = time.time()
                
                #input bag for the 'preprocessing_bag_of_thread'
                bag_2 = db.from_sequence([[dir_ + f'/0/output_{thread}.csv', 
                                          dir_ + f'/0/evolved_{thread}.dat',
                                          dir_ + f'/0/logfile_{thread}.dat'] for dir_ in dir_list[:20] for thread in range(30)],
                                          npartitions=d) #divisions ?

                #Map the preprocessing function to the bag
                bag_of_df = bag_2.map(preprocessing_bag_of_thread)
                
                #TO BE DEFINED
                
                
                #end time
                time_f = time.time()

                #time difference allocation
                time_list[a, d, w, t] = time_f - time_i

                #cluster down
                cluster.close()
            
#Save the result
#np.save(f'Benchmark_bag.npy', time_list)

### Workers and Threads

<img src="Figures/bk_w_thr_old_bag_try.png">

### Partitions

<img src="Figures/bk_divisions_old_bag_try.png" width="600">

### Worker Multiprocessing

### Directory 

<img src="Figures/bk_dir_old_bag_try.png" width="600">

## FG_normal

|Parameter type| Tested values| 
|:----|:----|
| Number of Workers for each  VM   | [1, 2, 4]  |
| Number of Threads for each  VM   | [1, 2, 4]  |
| Workers option                   | [`Multiprocessing`, `Distributed`] |
| Number of Partitions             | [1, 2, 12]  |
| Number of Directory              | [1, 2, 3, 4]  |


In [None]:
attempt = 5
workers_list = [1, 2, 4]
threads_list = [1, 2, 4]
partitions_list = [1, 12, 24]
time_list = np.zeros(shape=(attempt, len(partitions_list), len(workers_list), len(threads_list)))
multiproccesing_method == False

for a in range(attempt):
    for d in range(len(partitions_list)):
        for w in range(len(workers_list)):
            for t in range(len(threads_list)):
                print('a:', a, '\t','d:', partitions_list[d], '\t', 'w:', workers_list[w], '\t','t:', threads_list[t])

                #cluster up
                cluster = SSHCluster(
                ["bhbh-1", "bhbh-1", "bhbh-2", "bhbh-3", "bhbh-4", "bhbh-5"],
                connect_options={"client_keys": "/home/ubuntu/private/tbertola_key.pem"},
                worker_options={"n_workers": workers_list[w],
                                "nthreads": threads_list[t]}, # because each bhbh-* has 4 cores
                scheduler_options={"port": 8786, "dashboard_address": ":8787"}
                                    )
                client=Client(cluster)
                
                #Configure workers as multiprocessing instead of distributed (default option)
                if multiproccesing_method == True:
                    dask.config.set({'distributed.worker.multiprocessing-method': 'spawn'})


                #begin time
                time_i = time.time()

                #function loop
                n_threads_DEMO = 30
                bhbh_list=[]
                for dir_name in dir_list[:1]:
                    for i in range(n_threads_DEMO):
                        _ = dask.delayed(FGpreprocessing_partitions)(dir_name, i, output_column_to_remove, evolved_column_to_remove,
                                                          drop_list, n_part=partitions_list[d])
                        bhbh_list.append(_)

                results = dask.compute(*bhbh_list) 
                results= dask.compute(*results)

                #end time
                time_f = time.time()

                #time difference allocation
                time_list[a, d, w, t] = time_f - time_i

                #cluster down
                cluster.close()
            
#Save the result
#np.save('Benchmark_normal.npy', time_list)

### Worker and Threads

<img src="Figures/bk_w_thr_old.png">

### Worker Multiprocessing

TO BE RUN WITH 4 WORKERS, 4 THREAD 

### Partitions

<img src="Figures/bk_divisions_old.png" width="600">

### Directory 

<img src='Figures/bk_dir_old.png' width="600">

# Possible improvements and limitations

NFS, reading and writing velocity
Distributing Data

<div style='background-color:#f7f7f7; padding-top:30px; padding-left:20px; padding-right:20px; padding-bottom:30px'>
    <center>
        <div style='  display: block;
  font-size: 2em;
  font-weight: bold;  display: block;
  font-size: 2em;
  font-weight: bold;'>MAPD-B - Preprocessing of SEVN data for binary black holes mass distribution analysis
        </div>
    <center>
        </br>
    <i>Tommaso Bertola, Giacomo Di Prima, Giuseppe Viterbo, Marco Zenari</i></center></div>

# Introduction: the computational problem

Our aim for this project is to preprocess the data of multiple SEVN simulations of binary systems.

SEVN is Python program developed by the Astronomy Department at the University of Padua to simulate the evolution of binary systems. The evolution takes into account different physical phenomena which obey the patterns and laws obeserved in the Universe, especially those seen in the stellar tracks.

We will focus our attention specifically to those systems evolving into binary black holes. To study these systems we therefore need to extract from the whole dataset produced by SEVN only some information regarding the initial and final conditions of the binary system evolution.

The final goal is to obtain a simple and handy DataFrame listing only those features.

# Data structure

To better understading the problem, we will briefely describe the dataset we are given.

The dataset consists of a number of folders named after some "hyperparameters" given to SEVN while performing the simulations.
In our case, we are given 60 different folders, whose names are like `sevn_output_Z0.001A1L1`, `sevn_output_Z0.03A5L1`, ... More specifically the hyperparameters are the numbers following the letters `Z` and `L` which will be included in the final output DataFrame for each record.

Inside each folder there are three kinds of files:
* `output_{nthread}.csv`
* `logfile_{nthread}.dat`
* `evolved_{nthread}.dat`

where {nthread} is a number ranging from 0 to 29, corresponding to the thread responsible for the computation of those simulations. 

On average, each of the `output_{nthread}.csv` files occupies 750MB, `logfile_{nthread}.dat` 200MB, and `evolved_{nthread}.dat` 50MB. 

In total, each folder occupies between 26 to 31GB of data for a gross total of around 1.7TB.

## File schema
We describe the schema of the three different kind of files to better explain the following parsing process

### `evolved_{nthread}.dat` schema
These files contain the initial properties of the systems that has been successfully evolved by SEVN.
These are **fixed width** fields files and therefore we used the `pd.read_table` function to parse them.
The fields we are interested in reading are reported below.

|Column Name| Data type | Description|
|:----|:----|:-----|
|name|string `0_1234....`|Unique in each folder|
|Mass_0|float|Initial mass of star 0 (in Solar masses)|
|Mass_1|float|Initial mass of star 1 (in Solar masses)|
|Z_0|float|Metallicity, same as in directory name (both stars have the same)|
|SN_0|string `rapid_gauNS`|Supernova model for star 0|
|SN_1|string `rapid_gauNS`|Supernova model for star 1|
|a|float| Initial semimajor axis of the binary |
|e|float|Initial eccentricity of the binary|
                      
All other fields in the files are discarded in the analysis.

### `output_{nthread}.csv` schema
The output files contains the final conditions of the simulations.
Each row is a different binary system, and each column indicates a different feature of the simulation.
The file is formatted in a tipical **csv format**.
We report in the following table the columns we are interested in. 

|Column Name| Data type | Description|
|:----|:----|:-----|
|name |string `0_1234...`| Unique in each folder|
|Mass_0|double| Mass of object 0 (in Solar masses)|
|Mass_1|double| Mass of object 1 (in Solar masses)|
|RemnantType_0|int|Type of object 0 after evolution|
|RemnantType_1|int|Type of object 1 after evolution|
|Semimajor|float|Semimajor of the binary (in Solar radius)|
|Eccentricity|float| Eccentricity of the binary|
|GWtime|float|Gravitational wave orbital decay time|
|BWorldtime|int|Time elapsed in the simulations|

All other fields in the files are discarded in the analysis.
 

### `logfile_{nthread}.dat` schema
Logfiles are **plain text** files, containing the description of a particular astrophysical event in each row.
Each event is univocally associated to a specific binary system evolution by its `name`, however there could be multiple events regarding the same system.
To recover the relevant information we have to run different regular expressions.
Each regex used specifically captures the type of the event, namely `RLO_BEGIN`, `CE`, or `BSN`. 

An example of the content of these files is given by the following rows

<div style='background-color:#f7f7f7; padding-top:30px; padding-left:20px; padding-right:20px; padding-bottom:30px'>B;0_474492234654248;0;COLLISION;23.667631;0:9.44621:18.0861:3:1:2.61393:1.21104:1:38.822:0.50739:19.1197:10.6772
B;0_474492234654248;0;MERGER;23.667631;0:9.446e+00:1.906e+00:0.000e+00:3:0:18.1224:1:2.614e+00:0.000e+00:0.000e+00:1:0:1.21104:12.0601:38.822:0.50739
S;0_474492234654248;0;NS;25.471686;5:1.46229:4.64623e+12:63.4538:0.98692
S;0_474492234654248;0;SN;25.471686;10.6025:3.06259:1.65091:1.46229:5:2:453.262:345.701:-197.664:-212.483:-187.853
S;0_641394535500269;0;WD;32.367569;3.2735:2.57868:1.29894:1.29894:3
</div>

# Tools
To parse the data we leverage on distributed computing techniques. 
Specifically we extensively use Python Dask library which enable us to manage a cluster of workers hosted on CloudVeneto virtual machines.

## Infrastracture configuration
![network_configuration.png](network_configuration.png)

We will be using 6 different virtual machines, `bhbh-{d,1,2,3,4,5}`. 
The first, `bhbh-d`, serves as a NFS server, allowing all the other VMs `bhbh-{1,2,3,4,5}` to read, write and crunch the archived data as can be seen from the picture above with the green arrows.
Highlighted in orange, all `bhbh-{1..5}` are connected to a Dask cluster, managed solely by `bhbh-1` which performs the action of client, scheduler and worker at the same time.

### Setting up `bhbh-d`
After instantiating the VM `bhbh-d` of flavor `cloudveneto.medium`, we attach the volume `bhbh-v` to `/dev/vdb` which is then mounted on `/mnt/bhbh-v`.
To allow the other VMs to see the new volume, we use the software provided by `nfs-kernel-server`.

The NFS server is configured such that `/mnt/bhbh-v` can be seen by the other VMs and therefore mounted therein.
It is important to note that the configuration of the sharing of the volume is made in `sync` mode.
The reason being the cluster might potentially alter the consistency of the data if `async` mode were used. 

Finally, the actual data were copied from the `demoblack.fisica.unipd.it` server to the mounted volume.
The operation took a few hours as the total size of the data is around 1.7TB.
The operation was carried out via `rsync` utility in order to resume the copying was the network to fail for some unspecified errors.

### Setting up `bhbh-{1,2,3,4,5}`
The VMs for the proper data crunching are of the flavour `cloudveneto.large`, having 8GB of RAM each for a total amount of 32GB<a name="cite_ref-1"></a>[<sup>[1]</sup>](#cite_note-1).
The following operations are equally repeated on all the VMs in order to have equally working systems.

After installing the `nfs-utils` package, we mount the Network volume issuing the following command `mount -t nfs 10.67.22.169:/mnt/bhbh-v /mnt/bhbh`. 
For simplicty sake we use `conda` as a package manager. We install the `dask[complete]` suite of packages.
This set up is enought to allow the cluster to work.

### Particular set up of bhbh-1
The `bhbh-1` has some slight differences in the software installed as it is used for different puropses.
`bhbh-1` hosts the Jupyter server, acting as a client to the cluster, the cluster scheduler and also a cluster worker.
The reason why this set up is used is just for optimizing the memory and computing resources available.
A better configuration would have been achieved by moving the client and the scheduler to another VM but this was not easy to do as cloud resources were limited.
Indeed, one of the main issues we came across during the tests was a lack of RAM in the `bhbh-1`, presumably due to the exchange of data between workers. For this reason we are discussing different approaches in the following.




<a name="cite_note-1"></a>1. [^](#cite_ref-1) 32GB is comparable to the size of one folder.

# Preprocessing on the files

In this section we are going to give a brief summary of the operations that we have to do on the files for our preprocessing. 
As anticipated, the final goal is to obtain a dataframe containing only the useful informations and save it for further analysis.

In order to obtain only one dataframe we need to read, clean and extract informations from each of the three type of files and finally merge all the informations for each binary in a dataframe.


All the preprocessing operations are written inside a function that is then executed in a delayd way with dask. 

For the preprocessing different attemps have been tried, and in the following section we are going to describe the pros and cons of each of them.

Now we take as example one preprocessing function in order to describe the basic operations that we have done. 
The differences in the preprocessing function of each attempt will be highlighted in the corresponding sections.

In [None]:
# this function does the preprocessing
# on three files of the same thread:
# output_{thread}, evolved_{thread}, logfile_{thread}

def preprocessing_bag_of_thread(paths):
    '''
       paths = python list of the paths of the three files considered [output, evoleved, logfile]
    '''
    
    # list of column names and types to read
    
    # output_{}.csv
    output_column_to_read = ['name', 'Mass_0', 'RemnantType_0',
                             'Mass_1', 'RemnantType_1',
                             'Semimajor','Eccentricity',
                             'GWtime','BWorldtime']
    output_column_type = ['string', 'float64', 'int64',
                          'float64', 'int64',
                          'float64', 'float64',
                          'float64', 'float64']

    # evolved_{}.dat
    evolved_column_to_read = ['name', 'Mass_0',
                              'Z_0', 'SN_0',
                              'Mass_1', 'SN_1',
                              'a', 'e']
    evolved_column_type = ['string', 'float64',
                           'float64', 'string',
                           'float64', 'string',
                           'float64', 'float64']
    
    # further columns to remove at the end 
    drop_list = ['RemnantType_0',  'RemnantType_1']
    
   
    #OUTPUT files processing
    
    output = pd.read_csv(paths[0],                              # read the file
                         usecols=output_column_to_read,         # read only some cols
                         dtype=dict(zip(output_column_to_read,  # specify the types
                                        output_column_type))).\ #
                rename(columns={'Mass_0':'Mass_0_out',          # rename columns
                                'Mass_1':'Mass_1_out'})         #

    # mask to select only the black holes binaries, defined by RemnantType
    idxBHBH=(output.RemnantType_0==6) & (output.RemnantType_1==6) & (output.Semimajor.notnull())
    
    # apply the mask
    output=output[idxBHBH]
        
    
    #EVOLVED files processing
    
    #extracting the alpha parameter from the path of the file 
    alpha = float(re.findall(r".+(?<=A)(.*)(?=L)",
                             paths[1])[0])
    
    #read the columns we are interested in from the evolved file
    evolved = pd.read_table(paths[1],                               # read file
                            sep='\s+',                              # separate by spaces
                            usecols=evolved_column_to_read,         # read only some columns
                            dtype=dict(zip(evolved_column_to_read,  # specify the types
                                           evolved_column_type)))   #
    #NB: sep='\s+' is need because there are different number of spaces separareting the columns
    
    #adding the column with the alpha parameter
    evolved['alpha'] = alpha
    
    
    #LOGFILE files processing
    
    logfile = pd.read_csv(paths[2],    # read the file
                          header=None) # there is no header

    
    #Running Regex on the line of the logfile to extrac useful informations
    df_RLO = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);RLO_BEGIN;").\  # searching for string "RLO_BEGIN"
                dropna().\                                                    # dropping nan
                rename(columns={0:'name', 1:'ID'}).\                          # rename columns
                groupby(['name']).\                                           # grouping by name
                size().\                                                      # and counting the number of RLO
                to_frame(name='RLO').\                                        # 
                reset_index()                                                 # to have a nice dataframe

    
    df_CE = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);CE;").\  # searching for string "CE"
                dropna().\                                            # dropping nan
                rename(columns={0:'name', 1:'ID'}).\                  # rename
                groupby(['name']).\                                   # grouping by name
                size().\                                              # 
                to_frame(name='CE').\                                 # and counting the number of CE
                reset_index()                                         # to have a nice dataframe
    

    df_BSN = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);BSN;").\  #searching for string "BSN"
                dropna().\                                              # dropping nan
                rename(columns={0:'name', 1:'ID'}).\                    #rename
                groupby(['name']).\                                     #grouping by name
                size().\                                                #
                to_frame(name='BSN').\                                  #and counting the number of BSN
                reset_index()                                           #to have a nice dataframe

    
    #MERGE
    bhbh = evolved.merge(output, on=['name'], how='inner').\  #innerg join on the name between wvolved and output
                   merge(df_RLO, on=['name'], how='left').\   #left join on the name with df_RLO
                   merge(df_CE,  on=['name'], how='left').\   #left join on the name with df_CE
                   merge(df_BSN, on=['name'], how='left').\   #left join on the name with df_BSN
                   fillna(value=0).\                          #setting nan to zero
                   drop(columns=drop_list)                    #dropping no longer useful columms
    
    
    #Adding some columns with physical meaning
    bhbh['tdelay'] = bhbh['GWtime'] + bhbh['BWorldtime'] #time delay
    
    #defining the max mass of output
    bhbh['Mass_max_out'] = bhbh['Mass_1_out']
    bhbh['Mass_max_out'] = bhbh['Mass_max_out'].\
                            where(cond=(bhbh['Mass_max_out'] > bhbh['Mass_0_out']),
                                  other=bhbh['Mass_0_out'])

    #defining q=m1/m2 with m2>,m1
    bhbh['q'] = bhbh['Mass_1_out']/bhbh['Mass_0_out']
    bhbh['q'] = bhbh['q'].\
                where(cond=(bhbh['Mass_1_out'] < bhbh['Mass_0_out']),
                      other=bhbh['Mass_0_out']/bhbh['Mass_1_out'])
    
    #defining the Chirp mass
    bhbh['Mass_chirp'] = ((bhbh['Mass_0_out'] * bhbh['Mass_1_out'])**(3/5))/((bhbh['Mass_0_out'] + bhbh['Mass_1_out'])**(1/5))
    
    return bhbh # return the pandas DataFrame

# Alternative solutions
Here we present a summary of the different approaches we tried to tackle the problem. 

## Preprocessing with no optimization - the whole dataset all at once
The first attempt consists in loading the whole dataset (1.7TB) into three `dask.dataframe`: respectively all the output.csv, the evolved.dat and the logfile.dat files.

This solution is the easiest to implement code wise but it is higly inefficient since it does not take advantage of the hierarchical structure of the data. 
Because of the cluster's memory limitation, the RAM fills up due to the shuffling operations as soon the merging operations begin.
For this reason, this approach does not manage to process the entire volume of data and we have to serialize the operations over smaller batches.

The code used for this solution differs from the one previously shown only in the way files are read and the columns on hich the merges are carried out.
The files are read all together by leveraging the wildcards capabilities of Dask, instead of using a Pandas DataFrame.
The merges are performed on `name`, `Z_0` and `alpha` fileds as these three values uniquely identified the binary systems in the whole dataset.
Even varying the number of partitions to higher numbers did not help with the high RAM usage.

Note that with the defaults values, Dask would create the following number of partitions for the whole dataset:
* partitions for evolved files : 1800
* partitions for output files : 21099
* partitions for logfile files : 4350

In [None]:
import dask.dataframe as dd  # required to use the dask dataframes

# omissis code

output = dd.read_csv('/mnt/bhbh/fiducial_Hrad_5M/sevn_output_*/0/output_*.csv')   # all files in all directories
logfile = dd.read_csv('/mnt/bhbh/fiducial_Hrad_5M/sevn_output_*/0/logfile_*.dat', header=None)
evolved = dd.read_table('/mnt/bhbh/fiducial_Hrad_5M/sevn_output_*/0/evolved_*.dat', sep='\s+')

# omissis code

bhbh = evolved.merge(output, on=['name','Z_0','alpha'], how='inner').\ # note how the merges are performed
               merge(df_RLO, on=['name','Z_0', 'alpha'], how='left').\
               merge(df_CE,  on=['name','Z_0', 'alpha'], how='left').\
               merge(df_BSN, on=['name','Z_0', 'alpha'], how='left').\
               fillna(value=0).\
               drop(columns=drop_list)

# omissis code

## Folder and thread wise preproccessing
In order to reduce even further the computational burden of the merge operations, we exploit the thread-wise organisation of the data by delaying the whole preprocessing over every triplet of files (output, evolved and logfile) in each folder.
This is done by appending each task to a list and submitting it to the cluster to compute it.
Even this approach fails to process the whole dateset all at once but we noticed it can analyse 300 GB at a time (~10 folders of data).

The differences of the preprocessing code run are shown below. 
More explicitly: data is read folder by folder and thread by thread with dask dataframes.

Notice there is an additional argument `n_part` and method `repartition` in the code.
They are used only for benchmarking purposes and the first time the function was written we used the default number of partitions

In [None]:
def FGpreprocessing_partitions(dir_path: str, n_thread: int,  # notice the explicit signature
                               output_column_to_remove: list,
                               evolved_column_to_remove: list,
                               drop_list: list, n_part: int): 
    
    # file names to read    
    output_str = f'{dir_path}/0/output_{n_thread}.csv'    # now we read only one folder at a time
    evolved_str = f'{dir_path}/0/evolved_{n_thread}.dat'  #
    logfile_str = f'{dir_path}/0/logfile_{n_thread}.dat'  #
    
   # omitted code    

    output = dd.read_csv(output_str).\
                rename(columns={'Mass_0':'Mass_0_out',
                                'Mass_1':'Mass_1_out'}).\
                drop(columns=output_column_to_remove).\
                repartition(npartitions = n_part)            # note the possibility to repartition the output files
                                                             # it is used only for benchmarking purposes
                                                             # it was not used originally
                                                             # default number of partition was used instead 
    
    # the following code is identical to the function above
    # except for the use of dask dataframes instead of pandas dataframe
    '''
    dd.read_csv instead of pd.read_csv
    '''
    # the following code is therefore omitted


In [None]:
# this code is to properly map the function to the direcotries and threads

n_threads = 30 # the number of thread to analyse
bhbh_list=[]   # to store the delayed objects

for dir_name in dir_list:         # scan some directories
    for i in range(n_threads):    # scan the threads
        _ = dask.delayed(FGpreprocessing_partitions)(dir_name, i,              # indexes to point to files
                                                     output_column_to_remove,  # stuff to remove
                                                     evolved_column_to_remove, #
                                                     drop_list)                #
        bhbh_list.append(_)       # save in the list
        
results = dask.compute(*bhbh_list) # trigger the computation
results= dask.compute(*results)    # 

For this approach we managed to carry out some benchmarks which are presented in the next section.

## Repartition by column 'name' the whole dataset 1.7TB - no time wise benefits
As we were able to see, the merging operations are quite RAM intensive to carry out, especially for very large chunks of data.
According to the documentation, these operations can be sped up by using the dataframe `set_index` method, which explicitly tells dask how to address the data.

In this approach we set the column `name` as the index of each dask dataframe.
The dataframe contains every file of a specific kind for a a given folder. 
Then we repartition the dataframes such that each new partition contains only records with the same name.
However, while the merge operations actually get faster, the whole setting index and repartitioning processes take so much time that by the end we do not see any benefits, time wise.
Therefore we do not carry on with this strategy.

## Serialization over the folders and repartion over name - it works but takes 10 hours
Taking inspiration from the previous attempt, we leveraged on the data structure, namely the 60 folders.
We process one folder at a time in a batch like system developed in a for cycle, processing 30GB at once.
This serialization let us avoid processing the whole dataset of 1.7TB at once and reduces the computational weight of operations like `merge`, `reset_index` and `repartition` as they are carried out on fewer data, ~30GB.

The whole task takes about 10 hours to fully run and being saved to `parquet` files.
Due to the time constraints this approach requires, it is was not benchmarked.

The function used has the following form. 
We highlight only the main differences in the structure.

In [None]:
def FG_new(alpha: float, dir_path: str,      # signature with explicit types
           output_column_to_remove: list,    #
           evolved_column_to_remove: list,   #
           drop_list: list):                 #
    
    output_str = f'{dir_path}/0/output_*.csv'   # read all threads at once
    evolved_str = f'{dir_path}/0/evolved_*.dat' #
    logfile_str = f'{dir_path}/0/logfile_*.dat' #

    # omitted code to read the files and drop unnecessary columns
    
    dask_divisions = output.set_index("name").divisions                    # this codes actually performs the best repartition and division splitting
    unique_divisions = list(dict.fromkeys(list(dask_divisions)))
    
    # omitted code
    # actually perform the merges and 
    # the remaining computations
    
    return bhbh

In [None]:
# this code serializes the computation of the while dataset

bhbh_list=[]                                                     # the list to store the delayed objects
for i,directory in enumerate(dir_list):                          # for all direcotries
    bhbh_list.append(client.submit(FG_new,alpha[i], directory,   # submit these computations
                                   output_column_to_remove,
                                   evolved_column_to_remove,
                                   drop_list))

for i in range(len(bhbh_list)):  # for each delayed object
    bhbh_list[i].compute().to_parquet('/mnt/bhbh/folder_by_folder_repartition/direcotry_'+str(i)+'.parquet') # compute and save its result

## Thread wise preproccessing using Dask Bags
Still takeing advantage of the thread-wise structure of the data, we change the paradigm of the task: we construct a Dask bag that contains one list for each triplet of threaded files inside the whole dataset. 
Given one of this lists, each element is the path to a particular kind of file (output, evolved or logfile), with the same thread. 
We can now apply the preprocessing function, which takes the files' paths as arguments, to each element of the bag and compute a single dataframe as a result by casting the whole bag as a dictionary and than to a dataframe.
The benchmarks results are presented in the next section.

In [None]:
def preprocessing_bag_of_thread(paths):
    
    #lists of columns to read for each file and corresponding type
    output_column_to_read = ['name', 'Mass_0', 'RemnantType_0', 'Mass_1', 'RemnantType_1',
                         'Semimajor','Eccentricity','GWtime','BWorldtime']

    output_column_type = ['string', 'float64', 'int64', 'float64', 'int64',
                      'float64', 'float64', 'float64', 'float64']

    evolved_column_to_read = ['name', 'Mass_0', 'Z_0', 'SN_0', 'Mass_1', 'SN_1', 'a', 'e']


    evolved_column_type = ['string', 'float64', 'float64', 'string', 'float64', 
                      'string', 'float64', 'float64']

    drop_list = ['RemnantType_0',  'RemnantType_1']
    
   
    #Preprocessing OUTPUT
    
    #reading the file
    output = pd.read_csv(paths[0], usecols=output_column_to_read, dtype=dict(zip(output_column_to_read, output_column_type))).\
                rename(columns={'Mass_0':'Mass_0_out', 'Mass_1':'Mass_1_out'})
    
    #mask to select only the binaries we are interested in
    idxBHBH=(output.RemnantType_0==6) & (output.RemnantType_1==6) & (output.Semimajor.notnull())
    output=output[idxBHBH]    
    
    
    #preprocessing EVOLVED
      
    #reading the file
    evolved = pd.read_table(paths[1], sep='\s+', usecols=evolved_column_to_read, dtype=dict(zip(evolved_column_to_read, evolved_column_type)))                
    
    #extracting alpha with a regex
    alpha = float(re.findall(r".+(?<=A)(.*)(?=L)", paths[1])[0])
    evolved['alpha'] = alpha
    
    
    #preprocessing LOGFILE
    
    logfile = pd.read_csv(paths[2], header=None)
    
    
    #extracting informations with regex

    df_RLO = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);RLO_BEGIN;").\
                dropna().\
                rename(columns={0:'name', 1:'ID'}).\
                groupby(['name']).\
                size().to_frame(name='RLO').\
                reset_index()

    df_CE = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);CE;").\
                dropna().\
                rename(columns={0:'name', 1:'ID'}).\
                groupby(['name']).\
                size().to_frame(name='CE').\
                reset_index()

    df_BSN = logfile[0].str.extract(r"B;((?:\d*\_)?\d+);(\d+);BSN;").\
                dropna().\
                rename(columns={0:'name', 1:'ID'}).\
                groupby(['name']).\
                size().to_frame(name='BSN').\
                reset_index()

    
    #MERGE
    bhbh = evolved.merge(output, on=['name'], how='inner').\
                   merge(df_RLO, on=['name'], how='left').\
                   merge(df_CE,  on=['name'], how='left').\
                   merge(df_BSN, on=['name'], how='left').\
                   fillna(value=0).\
                   drop(columns=drop_list)
    
    
    #add some useful columns
    bhbh['tdelay'] = bhbh['GWtime'] + bhbh['BWorldtime']

    bhbh['Mass_max_out'] = bhbh['Mass_1_out']
    bhbh['Mass_max_out'] = bhbh['Mass_max_out'].\
                            where(cond=(bhbh['Mass_max_out'] > bhbh['Mass_0_out']), other=bhbh['Mass_0_out'])

    bhbh['q'] = bhbh['Mass_1_out']/bhbh['Mass_0_out']
    bhbh['q'] = bhbh['q'].\
                where(cond=(bhbh['Mass_1_out'] < bhbh['Mass_0_out']), other=bhbh['Mass_0_out']/bhbh['Mass_1_out'])

    bhbh['Mass_chirp'] = ((bhbh['Mass_0_out'] * bhbh['Mass_1_out'])**(3/5))/((bhbh['Mass_0_out'] + bhbh['Mass_1_out'])**(1/5))
    
    
    return bhbh #a pandas dataframe

# Benchmarks
In order to benchmark our preprocessing functions we decide to vary the number of workers, threads and partitions, testing this variations on a single directory of our dataset for a total of 5 times to obtain meaningfull statistics.
Also, we try to see if there is any benefit in using the `multiprocessing` options for the workers, instead of the default `distributed`. We decide to follow a best-result to best-result approach rather than a grid search strategy on the whole parameter space since this latter approach would be extremly time consuming. 
Firstly we test different couples (`number_of_workers`, `number_of_threads`), letting the partitions be taken automatically. 
Using the couple of paramteres that took the least amount of time, we assess the effect of the `multiprocessing` option for the workers. 
Eventually we analyse the impact of varing the number of partitions that the cluster handles. 
For the data scalability test we evaluate the performance of our best combination of parameters on a varing number of directories. 

## FG_bag

|Parameter type| Tested values| 
|:----|:----|
| Number of Workers for each  VM   | [1, 2, 4]  |
| Number of Threads for each  VM   | [1, 2, 4]  |
| Workers option                   | [`Multiprocessing`, `Distributed`] |
| Number of Partitions             | TOBEDEFINED |
| Number of Directory              | [1, 2, 3, 4]  |


In [None]:
attempt = 5
workers_list = [1, 2, 4]
threads_list = [1, 2, 4]
partitions_list = [1, 12, 24] 
time_list = np.zeros(shape=(attempt, len(partitions_list), len(workers_list), len(threads_list)))
multiproccesing_method = False

for a in range(attempt):
    for d in range(len(partitions_list)):
        for w in range(len(workers_list)):
            for t in range(len(threads_list)):
                print('a:', a, '\t','d:', partitions_list[d], '\t', 'w:', workers_list[w], '\t','t:', threads_list[t])

                #cluster up
                cluster = SSHCluster(
                ["bhbh-1", "bhbh-1", "bhbh-2", "bhbh-3", "bhbh-4", "bhbh-5"],
                connect_options={"client_keys": "/home/ubuntu/private/tbertola_key.pem"},
                worker_options={"n_workers": workers_list[w],
                                "nthreads": threads_list[t]}, # because each bhbh-* has 4 cores
                scheduler_options={"port": 8786, "dashboard_address": ":8787"}
                                    )
                client=Client(cluster)
                
                #Configure workers as multiprocessing instead of distributed (default option)
                if multiproccesing_method == True:
                    dask.config.set({'distributed.worker.multiprocessing-method': 'spawn'})

                #begin time
                time_i = time.time()
                
                #input bag for the 'preprocessing_bag_of_thread'
                bag_2 = db.from_sequence([[dir_ + f'/0/output_{thread}.csv', 
                                          dir_ + f'/0/evolved_{thread}.dat',
                                          dir_ + f'/0/logfile_{thread}.dat'] for dir_ in dir_list[:20] for thread in range(30)],
                                          npartitions=d) #divisions ?

                #Map the preprocessing function to the bag
                bag_of_df = bag_2.map(preprocessing_bag_of_thread)
                
                #TO BE DEFINED
                
                
                #end time
                time_f = time.time()

                #time difference allocation
                time_list[a, d, w, t] = time_f - time_i

                #cluster down
                cluster.close()
            
#Save the result
#np.save(f'Benchmark_bag.npy', time_list)

### Workers and Threads

<img src="Figures/bk_w_thr_old_bag_try.png">

### Partitions

<img src="Figures/bk_divisions_old_bag_try.png" width="600">

### Worker Multiprocessing

### Directory 

<img src="Figures/bk_dir_old_bag_try.png" width="600">

## FG_normal

|Parameter type| Tested values| 
|:----|:----|
| Number of Workers for each  VM   | [1, 2, 4]  |
| Number of Threads for each  VM   | [1, 2, 4]  |
| Workers option                   | [`Multiprocessing`, `Distributed`] |
| Number of Partitions             | [1, 2, 12]  |
| Number of Directory              | [1, 2, 3, 4]  |


In [None]:
attempt = 5
workers_list = [1, 2, 4]
threads_list = [1, 2, 4]
partitions_list = [1, 12, 24]
time_list = np.zeros(shape=(attempt, len(partitions_list), len(workers_list), len(threads_list)))
multiproccesing_method == False

for a in range(attempt):
    for d in range(len(partitions_list)):
        for w in range(len(workers_list)):
            for t in range(len(threads_list)):
                print('a:', a, '\t','d:', partitions_list[d], '\t', 'w:', workers_list[w], '\t','t:', threads_list[t])

                #cluster up
                cluster = SSHCluster(
                ["bhbh-1", "bhbh-1", "bhbh-2", "bhbh-3", "bhbh-4", "bhbh-5"],
                connect_options={"client_keys": "/home/ubuntu/private/tbertola_key.pem"},
                worker_options={"n_workers": workers_list[w],
                                "nthreads": threads_list[t]}, # because each bhbh-* has 4 cores
                scheduler_options={"port": 8786, "dashboard_address": ":8787"}
                                    )
                client=Client(cluster)
                
                #Configure workers as multiprocessing instead of distributed (default option)
                if multiproccesing_method == True:
                    dask.config.set({'distributed.worker.multiprocessing-method': 'spawn'})


                #begin time
                time_i = time.time()

                #function loop
                n_threads_DEMO = 30
                bhbh_list=[]
                for dir_name in dir_list[:1]:
                    for i in range(n_threads_DEMO):
                        _ = dask.delayed(FGpreprocessing_partitions)(dir_name, i, output_column_to_remove, evolved_column_to_remove,
                                                          drop_list, n_part=partitions_list[d])
                        bhbh_list.append(_)

                results = dask.compute(*bhbh_list) 
                results= dask.compute(*results)

                #end time
                time_f = time.time()

                #time difference allocation
                time_list[a, d, w, t] = time_f - time_i

                #cluster down
                cluster.close()
            
#Save the result
#np.save('Benchmark_normal.npy', time_list)

### Worker and Threads

<img src="Figures/bk_w_thr_old.png">

### Worker Multiprocessing

TO BE RUN WITH 4 WORKERS, 4 THREAD 

### Partitions

<img src="Figures/bk_divisions_old.png" width="600">

### Directory 

<img src='Figures/bk_dir_old.png' width="600">

# Possible improvements and limitations

NFS, reading and writing velocity
Distributing Data

In [None]:
so