# RAIL - Fundamentals

RAIL is a LSST-DESC software created to process different algorithms used to calculate photometric redshift. Its main goal is to minimize impact that different infrastructures can cause on different algorithms, for that it unifyes in a modular code supporting different inputs that different algorithms needs and padronizing the output so that it can be a more fair comparison between their results.

Rail uses 4 principal libraries in its core: <br>
_tables_io_: for data manipulation as hdf5 files, fits, etc. <br>
_qp_: used to paremitrize data PDFs for metrics calculation. <br>
_ceci_: construct pipelines, produces a .yaml within the steps and configurations as threads. <br>
_pzflow_: creates a flow for data creation. <br>

#### Core.
Where the main functions are going to manage the data and files that the program creates. It works based in the behavioral chain of resposability pattern (https://refactoring.guru/pt-br/design-patterns/chain-of-responsibility), where you create a flux in the code, where there is a request related/processed by a class handler that decides to pass it foward or not according to what is defined. So for that, what bpz does is create a class request (eg: Inform_BPZ_lite) that has all the inputs/configurations and is handled by its class handler (BPZ_lite).

#### Creation.
Contain all the support for data creation, as degradors, data flow creation, Column remapping, etc. It creates .hdf5 files with the data that is being manipulated.

#### Estimation.
This is where the codes are defined and executed.  <br>
inform: this is where the PRIORS for template fitting are informed and the machine learning codes are trained. <br>
estimate: where the algorith is executed though the .evaluate() function.
The code is wrapped as a RAIL stage so that it can be run in a controlled way. Estimation code can be stored in a yaml file to be run as a ceci module.


#### Evaluation.
This step contais the metrics for performance of the estimated codes.
<br>
------
For installation instructions check the official documentation: https://lsstdescrail.readthedocs.io/en/latest/source/installation.html <br>

Its important to point out that as Rail is still being developed it may be necessary to do a update (onde in a while) to you rail package once its installed. <br> 
First you must update the cloned rail repository: _git pull origin_ <br>
Then, run: pip install packageName --upgrade

For Rail versions check: https://github.com/LSSTDESC/RAIL/releases

## Imports, setup and some sample data

In [None]:
import os
import numpy as np
import pandas as pd
import qp
import tables_io

import rail

##from rail.core.data import TableHandle, PqHandle, ModelHandle
##from rail.core.stage import RailStage
##from rail.core.utilStages import ColumnMapper, TableConverter

##from rail.estimation.algos.bpz_lite import Inform_BPZ_lite, BPZ_lite
##from rail.evaluation.evaluator import Evaluator

#for rail versions
help(rail)

In [None]:
CURR_DIR = os.getcwd()
RAIL_DIR = os.path.join(os.path.dirname(rail.__file__), '..')
CURR_DIR, RAIL_DIR

### Reading some sample

In [None]:
data_columns = ["coadd_objects_id","ra","dec","mag_g","magerr_g","mag_i","magerr_i","mag_r","magerr_r","mag_u","magerr_u","mag_y","magerr_y","mag_z","magerr_z","z_true"]

file_path = '../../../../DATA/dp0_train_random.csv'
full_data = pd.read_csv(file_path, usecols=data_columns)
full_data.head()

#### Spliting into train and test data

In [None]:
size = len(full_data)//2

train_sample = full_data.sample(n=size,ignore_index=True)
test_sample = full_data.drop(train_sample.index)

In [None]:
train_sample.head()

In [None]:
test_sample.head()

---

##  RAIL 

Rail has a lot of classes and it uses Object Oriented Programming - POO, therefore things can get complicated very fast, but for now we are going to focus on understanging a little bit of the three bases ones: **RailStage, DataStore and DataHandler**

**Image:** This diagram represents some classes and its hierarchy.

![title](RAILclasses.png)


## DataStore

The data store class is the class that is going to store all the data that is being processed associated with a key value. For example for a file containing the sample that we are going to use to test an algorithm named 'test_sampe.hdf5' we add this to the data store naming the key 'test_sample' and a what class (DaataHandler) is it going to use to read it, in this case TableHandler -> HandlerHdf5. 
<br>

Another important thing to know is that the DataStore class acts as a [singleton class](https://refactoring.guru/design-patterns/singleton) wich basically is a class that has only one instance in the aplication. That is important due to the fact that rail keeps all the data and handlers as it runs so that the previous stage can access and read it. Based on that when if we try to create another instace, what its going to do is serve as a DataStore factory, but not the DataStore class itself.  
<br>

We can see access the data storage trough the attribute data_store. By default it does not allow to overwrite the data tha its being stored so if we want to change the value of a key we have to manually set the property allow_overwrite to true.

In [None]:
DS = RailStage.data_store
DS.__class__.allow_overwrite = True
DS

To help us undestand better we are constantly going to monitor how DataStore stores the data and how the memory goes with that.

In [None]:
import sys

sys.getsizeof(DS), 'bytes'

We can manually add data to the data store with the add_data or pass a file and store it in the DS with the read_file.

In [None]:
DS.add_data(key="input", data=train_sample, handle_class=PqHandle)
## DS.read_file(key="name", path=file_path, handle_class=Handler)
DS

Here is how we can access the data, what it is going to do is use the handler that we passed

In [None]:
sys.getsizeof(DS), 'bytes'

In [None]:
DS.read("input").head()

#### Memory x Files

As we can see, as soon that we added the data to the DS the memory increased in 200 bytes. In Rail we can store data as a tableLike object as pandas dataframe, orderDic, etc. but we can also work with the flow in memory. For that there are a bunch of steps/configs that are going differ from bringing or not the data to memory.

## DataHandler

As all the stages herd from RailStages, [Delegate Pattern](https://en.wikipedia.org/wiki/Delegation_pattern), that can be seen in figure of the classe maped above, and RailStages can be seen as a 
[CeciStage](https://github.com/LSSTDESC/ceci/blob/d1d5686aefab18bc53e3d4d8a05af42d19e28a91/ceci/stage.py#L24]), when we declare a stage we use the method _make_stage(**args)_, what id does is to return the object itself as a stage configured with the given parameters. 

To undestand how the returned object works we can use an explanation present in the c# language for [delegate](https://docs.microsoft.com/pt-br/dotnet/csharp/programming-guide/delegates/using-delegates). Basically we can think of delegates as a method that points to an abstract class and a method of that class that is going to execute. Therefore a class that behaves as a method and can be executed. In python this method can be declared as `__call__` and the retuned class can be executed as class(), then this is going to execute the defined methos class. For RailStages it is going to run the algorithm.

In [None]:
##help(ColumnMapper)

In [None]:
columns_remmap = {
"coadd_objects_id": "id",
"ra": "coord_ra",
"dec": "coord_dec",
"mag_g": "mag_g_lsst",
"magerr_g": "mag_err_g_lsst",
"mag_i": "mag_r_lsst",
"magerr_i": "mag_err_r_lsst",
"mag_r": "mag_i_lsst",
"magerr_r": "mag_err_i_lsst",
"mag_u": "mag_u_lsst",
"magerr_u": "mag_err_u_lsst",
"mag_y": "mag_y_lsst",
"magerr_y": "mag_err_y_lsst",
"mag_z": "mag_z_lsst",
"magerr_z": "mag_err_z_lsst",
"z_true": "redshift"
}

col_remapper_train = ColumnMapper.make_stage(name='col_remapper_train', columns=columns_remmap)
print(f"Returned class -> {type(col_remapper_train)}")

In [None]:
sys.getsizeof(DS), 'bytes'

we can see the configurations of the returned class with `returned_obj.config.to_dict()`

In [None]:
##col_remapper_train.config.to_dict()

Basically we can call execute it in two ways.
1. When the data is added manually to the DS with `col_remapper_train.run()`
2. Passing the data trough parameter and invoking the method as `col_remapper_train(dataAsTableLike)`

in this case we are going to call the method run

In [None]:
col_remapper_train.run()
print(f"\nRodando em paralelo -> {col_remapper_train.is_parallel()}")
DS

We can see that it is storing the outputs in the DS before the stage. Lets check the outupt

In [None]:
col_remapper_train.get_data("output").head() 
## or trough DS as 
##DS.read("output_col_remapper_train")
##DS["output_col_remapper_train"].data

In [None]:
sys.getsizeof(DS), 'bytes'

____
**OBSERVATION**

Passing the input in make_stage does not work
`ColumnMapper.make_stage(name='col_remapper_train_2', columns=columns_remmap, input='test')`

While a did the test of putting the input as name of DS PqHandler does not work, what it does when we call run what it does primary is to call get_data <br>
`data = self.get_data('input', allow_missing=True)` <br>
this search in the DS to a key named input.

To change that would be necessary to call the method <br>
`self.set_data(self.config.input, data)` <br> before and if not set then serach by the key 'input'
___

For the The algorithms, basically they all expect an input as TableHandler.<br>
`inputs = [('input', <class 'rail.core.data.TableHandle'>)]`<br>
as the output of remmapColumns is already a TableHandler we dont need to specify, but if the data is already in the correct form, it may be helpful to use the TableConverter class. 

Eg:

     table_conv_train = TableConverter.make_stage(name='table_conv_train', output_format='numpyDict')
     table_conv_train.run()


and the output is a ModelHandler<br>
`outputs = [('model', <class 'rail.core.data.ModelHandle'>)]`

**Image:** basic flux of inputs and outputs. 

![title](SimpleRailBPZflow.png)


In [None]:
##help(Inform_BPZ_lite)

In [None]:
DS.add_data(key="input", data=col_remapper_train.get_data("output"), handle_class=PqHandle)
DS

In [None]:
bpz_columns_file = os.path.join(CURR_DIR, 'configs/bpz.columns')

inform_bpz = Inform_BPZ_lite.make_stage(
    name='inform_bpzlite', 
    ##input="inprogress_output_col_remapper_train.pq", same question as above
    model='trained_BPZ_output.pkl', 
    hdf5_groupname='', 
    columns_file=bpz_columns_file,
    prior_band="mag_r_lsst"
)
inform_bpz.config.to_dict()

compute the best fit prior parameters

In [None]:
%%time
inform_bpz.run()
## or inform_bpz.inform(data)

In [None]:
DS

___

## POSTERIOR -> Estimate 1 


For posteriors

     inputs = [('model', <class 'rail.core.data.ModelHandle'>)]
     outputs = [('output', <class 'rail.core.data.QPHandle'>)]

In [None]:
##help(BPZ_lite)

In [None]:
DS.add_data(key="input", data=DS["model_inform_bpzlite"].data, handle_class=ModelHandle)
DS, DS.read("input")

In [None]:
estimate_bpz = BPZ_lite.make_stage(
    name='estimate_bpz', 
    hdf5_groupname='', 
    columns_file=bpz_columns_file, 
    ##input="inprogress_output_table_conv_train.hdf5", 
    model=inform_bpz.get_handle('model')
)
estimate_bpz.set_data()

In [None]:
estimate_bpz.run() ## -> input -> DS
estimate(dato) ## 'input' -> dados

In [None]:
test_data_orig = test_data.data

evaluator = Evaluator.make_stage(name=f'bpz_eval', truth=test_data_orig)
result_dict = evaluator.evaluate(bpz_estimated, test_data_orig)

In [None]:
results_tables = tables_io.convertObj(result_dict.data, tables_io.types.PD_DATAFRAME)
results_tables.head()

## CECI pipeline -> undestand this pipeline yaml

In [None]:
import ceci
pipe = ceci.Pipeline.interactive()
stages = [
    # create the test catalog
    flow_creator_test, lsst_error_model_test, col_remapper_test, table_conv_test,
    # inform the estimators
    inform_bpz,
    # estimate posteriors
    estimate_bpz,
    # evaluator
    evaluator
]
for stage in stages:
    pipe.add_stage(stage)

In [None]:
pipe.initialize(dict(input=catalog_file), dict(output_dir='.', log_dir='.', resume=False), None)

In [None]:
pipe.save('tmp_goldenspike.yml')

In [None]:
pr = ceci.Pipeline.read('tmp_goldenspike.yml')

In [None]:
pr.run()