### Import needed packages and scripts


In [1]:
# Importing necessary libraries
import matplotlib.pyplot as plt
import numpy as np
import astropy.units as u
from datetime import datetime
import pickle, json, sys, os, glob
import pandas as pd

# Display settings for Jupyter Notebook
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# Importing custom utility functions
sys.path.insert(0, os.getcwd() + "/../scripts/")
import auxiliar as aux

### Paths to data and results

In [2]:
# Number of rows for each job
n_rows = 6000

# Root path of this script
root = os.getcwd() + "/"
# Objects directory
root_objects = root + "objects/"

# Directory of all the night-wise datachecks
root_dchecks = "/fefs/aswg/workspace/abelardo.moralejo/data/datachecks/night_wise/DL1_datacheck_"
# Weather station file
ws_database = root_objects + "WS2003-22_short.h5"

# Some filenames -------------------
# Filename of the datacheck dictionary
fname_datacheck_dict = root_objects + "datacheck_dict.pkl"
# Filename of the total dictionary
fname_total_dict = root_objects + "total_dict.pkl"
# Job list file
fname_job_list = root_objects + "bash_job_list.txt"
# Filename of the relation between run and night
fname_run_night_relation = root_objects + "ws_run_relation.txt"


# Flags for computing or not different parts
# Compute the datacheck dictionary
compute_datacheck_dict = False
# Send all the bash jobs to the cluster
send_jobs = True

# Create needed folders
for dir in [root_objects, root_objects + "output_slurm/", root + "plots"]:
    if not os.path.exists(dir):
        os.makedirs(dir)

### Extracting dates and parameters of all runs/subruns

In [3]:
%%time
if compute_datacheck_dict:

    run_number   = [] # Run numbers
    srun_number  = [] # Subrun numbers
    timestamps   = [] # Timestamps of each subrun
    time_elapsed = [] # Elapsed time of each subrun
    mean_azimuth = [] # Mean azimuth of each run
    mean_zenith_distance = [] # Mean zenith of each run
    zd_corrected_intensity_at_half_peak_rate = [] # ZD corrected intensity at half peak rate
    zd_corrected_cosmics_rate_at_422_pe = [] # ZD corrected cosmics rate at 422 pe
    zd_corrected_delta_cosmics_rate_at_422_pe = [] # ZD corrected delta cosmics rate at 422 pe
    zd_corrected_cosmics_spectral_index = [] # ZD corrected cosmics spectral index
    light_yield = [] # Light yield

    # All the datachecks for all the nights
    dchecks = np.sort(glob.glob(root_dchecks + "*.h5"))

    # We iterate over all the datachecks
    for i, dcheck in enumerate(dchecks):

        print(f"Analysing... {i:3}/{len(dchecks)}") if i % 20 == 0 else None

        # The datacheck file of the run summary (runwise)
        ds = pd.read_hdf(dcheck, key="runsummary")
        # The datacheck file of the intensity spectrums (subrunwise)
        di = pd.read_hdf(dcheck, key="cosmics_intensity_spectrum")
        
        # Iterating over all the entries of each night, the subruns
        for j in range(len(ds)):

            # Reference run number
            runref = ds["runnumber"].iloc[j]
            
            # Intensity datacheck for only the subruns of the reference run
            di_run = di.query(f"runnumber == {runref}")
            
            # Subrun iteration and storing all the data we are interested in
            for k in range(len(di_run)):

                run_number.append(runref)
                srun_number.append(di_run["subrun"].iloc[k])
                timestamps.append(datetime.fromtimestamp(di_run["time"].iloc[k]))
                time_elapsed.append(di_run["corrected_elapsed_time"].iloc[k])
                mean_azimuth.append(ds["mean_azimuth"].iloc[j])
                mean_zenith_distance.append(np.arccos(di_run["cos_zenith"].iloc[k]))
                zd_corrected_intensity_at_half_peak_rate.append(di_run["ZD_corrected_intensity_at_half_peak_rate"].iloc[k])
                zd_corrected_cosmics_rate_at_422_pe.append(di_run["ZD_corrected_cosmics_rate_at_422_pe"].iloc[k])
                zd_corrected_delta_cosmics_rate_at_422_pe.append(di_run["ZD_corrected_delta_cosmics_rate_at_422_pe"].iloc[k])
                zd_corrected_cosmics_spectral_index.append(di_run["ZD_corrected_cosmics_spectral_index"].iloc[k])
                light_yield.append(di_run["light_yield"].iloc[k])            

    print(f"Analysing... {len(dchecks):3}/{len(dchecks)}\n")

    # Now we are going to sort looking to the timestamps
    _, run_number = aux.sort_based(run_number, timestamps)
    _, srun_number = aux.sort_based(srun_number, timestamps)
    _, time_elapsed = aux.sort_based(time_elapsed, timestamps)
    _, mean_azimuth = aux.sort_based(mean_azimuth, timestamps)
    _, mean_zenith_distance = aux.sort_based(mean_zenith_distance, timestamps)
    _, zd_corrected_intensity_at_half_peak_rate = aux.sort_based(zd_corrected_intensity_at_half_peak_rate, timestamps)
    _, zd_corrected_cosmics_rate_at_422_pe = aux.sort_based(zd_corrected_cosmics_rate_at_422_pe, timestamps)
    _, zd_corrected_delta_cosmics_rate_at_422_pe = aux.sort_based(zd_corrected_delta_cosmics_rate_at_422_pe, timestamps)
    _, zd_corrected_cosmics_spectral_index = aux.sort_based(zd_corrected_cosmics_spectral_index, timestamps)
    timestamps, light_yield = aux.sort_based(light_yield, timestamps)

    # Creating the data dictionary
    dict_dcheck = {
        "run" : np.array(run_number),
        "srun" : np.array(srun_number),
        "time" : np.array(timestamps),
        "telapsed" : np.array(time_elapsed),
        "az" : np.rad2deg(mean_azimuth),
        "zd" : np.rad2deg(mean_zenith_distance),
        "ZD_corrected_intensity_at_half_peak_rate" : np.array(zd_corrected_intensity_at_half_peak_rate),
        "ZD_corrected_cosmics_rate_at_422_pe" : np.array(zd_corrected_cosmics_rate_at_422_pe),
        "ZD_corrected_delta_cosmics_rate_at_422_pe" : np.array(zd_corrected_delta_cosmics_rate_at_422_pe),
        "ZD_corrected_cosmics_spectral_index" : np.array(zd_corrected_cosmics_spectral_index),
        "light_yield" : np.array(light_yield)
    }        

    # Saving the objects in the objects directory
    with open(fname_datacheck_dict, 'wb') as f:
        pickle.dump(dict_dcheck, f, pickle.HIGHEST_PROTOCOL)  
else:
    # To read the file:
    with open(fname_datacheck_dict, 'rb') as f:
            dict_dcheck = pickle.load(f)    

CPU times: user 181 ms, sys: 126 ms, total: 306 ms
Wall time: 325 ms


### Creating the total dictionary run-subrun-wise

In [4]:
total_dict = {}

# We create an entry per run
for run in np.unique(dict_dcheck["run"]):
    total_dict[run] = {}

# Converting dcheck dictionary to total dictionary
for i in range(len(dict_dcheck["run"])):

    total_dict[dict_dcheck["run"][i]][dict_dcheck["srun"][i]] = {
        "time" : dict_dcheck["time"][i],
        "telapsed" : dict_dcheck["telapsed"][i],
        "az" : dict_dcheck["az"][i],
        "zd" : dict_dcheck["zd"][i],
        "ZD_corrected_intensity_at_half_peak_rate" : dict_dcheck["ZD_corrected_intensity_at_half_peak_rate"][i],
        "ZD_corrected_cosmics_rate_at_422_pe" : dict_dcheck["ZD_corrected_cosmics_rate_at_422_pe"][i],
        "ZD_corrected_delta_cosmics_rate_at_422_pe" : dict_dcheck["ZD_corrected_delta_cosmics_rate_at_422_pe"][i],
        "ZD_corrected_cosmics_spectral_index" : dict_dcheck["ZD_corrected_cosmics_spectral_index"][i],
        "light_yield" : dict_dcheck["light_yield"][i]
    }

#### Reading the WS table and we reduce it to the part we are interested in

In [5]:
# Loading the weather station database
df_ws = pd.read_hdf(ws_database)

# Loading the timestamp of each entry in the datacheck dictionary
dates_dcheck = dict_dcheck["time"]

# Getting the min and max dates
maxdate = np.max(dates_dcheck)
mindate = np.min(dates_dcheck)

# Converting the weather station dates to datetime objects
dates_ws = np.array([datetime.fromisoformat(str(d).split(".")[0]) for d in df_ws.index])

# Getting the max date of the weather station
maxdate_ws = np.max(dates_ws)

# Masking the weather station data to the min and max dates of the datacheck dictionary
mask_dates  = ((dates_ws > mindate) & (dates_ws < maxdate))

# Masking also for day data, i.e. sun_alt > 0 we are not interested in 
mask_night = (df_ws["sun_alt"] < 0)

total_mask = (mask_dates & mask_night)

dates_ws = dates_ws[total_mask]
df_ws    = df_ws[total_mask]

### Separating in bunchs of small number of jobs and writting into a txt file

In [7]:
start_indexes = []
end_indexes   = []

i, total = 0, 0
while total < len(dict_dcheck["run"]):
    start_indexes.append(total)
    end_indexes.append(total + n_rows - 1)
    
    i     += 1
    total += n_rows

print(f"With groups of {n_rows} subruns, the number of prepared jobs is {len(start_indexes)}")

# Opening a new txt file with a job per column
file_job_list = open(fname_job_list, "w")

for s, e in zip(start_indexes, end_indexes):
    file_job_list.write(f"{s},{e}\n") 

file_job_list.close()

With groups of 6000 subruns, the number of prepared jobs is 124


### Launching the jobs to the queue

In [9]:
if send_jobs == True:
    # Creating a file to store the results of the jobs
    file_results = open(fname_run_night_relation, "w")
    file_results.write("# Run - Subrun , WS entry id (date in ISO format)")
    file_results.close()
    
    
    # Launching the jobs
    !sh bash_jobs_indexes_ws_run.sh

Sending job 0,5999 to the queue...

Submitted batch job 31606590
Sending job 6000,11999 to the queue...

Submitted batch job 31606591
Sending job 12000,17999 to the queue...

Submitted batch job 31606592
Sending job 18000,23999 to the queue...

Submitted batch job 31606593
Sending job 24000,29999 to the queue...

Submitted batch job 31606594
Sending job 30000,35999 to the queue...

Submitted batch job 31606595
Sending job 36000,41999 to the queue...

Submitted batch job 31606596
Sending job 42000,47999 to the queue...

Submitted batch job 31606597
Sending job 48000,53999 to the queue...

Submitted batch job 31606598
Sending job 54000,59999 to the queue...

Submitted batch job 31606599
Sending job 60000,65999 to the queue...

Submitted batch job 31606600
Sending job 66000,71999 to the queue...

Submitted batch job 31606601
Sending job 72000,77999 to the queue...

Submitted batch job 31606602
Sending job 78000,83999 to the queue...

Submitted batch job 31606603
Sending job 84000,89999 to

### <span style="color:red;">⚠️ Wait untill the jobs are processed and then the results need to be fully stored ⚠️</span>


### Adding the WS database timestamp/index to the total dictionary

In [None]:
dict_dcheck["ws_entry"]     = []
dict_dcheck["ws_timestamp"] = []

tmp_df_ws = df_ws.copy()
tmp_dates_ws = dates_ws.copy()

for i, date_dcheck in enumerate(dict_dcheck["time"]):
    
    print("Analysinng... {:5}/{}".format(i, len(dict_dcheck["time"]))) if i % 1000 == 0 else None
    
    if date_dcheck > maxdate_ws:
        dict_dcheck["ws_entry"].append(None)
        dict_dcheck["ws_timestamp"].append(None)
    else:

        str_id = str(tmp_df_ws.iloc[np.argmin(np.abs(tmp_dates_ws - date_dcheck))].name)
        dict_dcheck["ws_entry"].append(str_id)
        dict_dcheck["ws_timestamp"].append(datetime.fromisoformat(str_id))
        
        if i % 500 == 0:

            tmp_mask = (tmp_dates_ws > date_dcheck)
            
            tmp_df_ws    = tmp_df_ws[tmp_mask]
            tmp_dates_ws = tmp_dates_ws[tmp_mask]

            print(len(tmp_dates_ws))

In [None]:
plt.plot(dates_dcheck, "o");
plt.plot(dict_dcheck["ws_timestamp"], "o");

In [None]:
dates_ws - date_dcheck

In [None]:
str(df_ws.iloc[np.argmin(np.abs(dates_ws - dict_dcheck["time"][110000]))].name)

In [None]:
dict_dcheck["time"][110000]

In [None]:
    indexes = []
    ref     = []
    for j, date in enumerate(dates):
        
        if j >= init and j <= end:
            
            if j % 200 == 0:
                print(f"{j}/{6000}")

            ref.append(j)
            indexes.append(np.argmin(np.abs(dates[j] - dates_ws)))   

    print("Writting...")
    with open(results_path, "a") as f:
        for i, r in zip(indexes, ref):
            f.write(f"{i},{r}\n")

In [None]:
# Getting the index of the weather station data
_index_ws   = [int(s.split(",")[0]) for s in np.loadtxt(fname_run_night_relation_dict, dtype=str)]
_index_dcheck = [int(s.split(",")[1]) for s in np.loadtxt(fname_run_night_relation_dict, dtype=str)]

# Sorting the weather station data indexes
_index_ws, _index_dcheck = aux.sortbased(_index_ws, _index_dcheck)

In [None]:
# Creating the index of the weather station data and the datacheck dictionary
index_ws, index_dcheck = [], []

# Iterating over all the dates of the datacheck dictionary
for i in range(len((dates_dcheck))):
    
    # We only want to iterate over the dates that are in the weather station database
    # If the date is higher than the max date of the weather station, we append None
    if dates_dcheck[i] > maxdate_ws:
        index_ws.append(None)
        index_dcheck.append(_index_dcheck[i])
    else:
        index_ws.append(_index_ws[i])
        index_dcheck.append(_index_dcheck[i])          
    
dict_dcheck["index_ws"] = np.array(index_ws)


temperature, pressure, humidity = [], [], []
tngDust, tngSeeing, rain        = [], [], []
for i in range(len(dates)):
    if i % 50000 == 0:
        print(f"{i}/{len(dates)}")
    if index_ws[i] != None:
        temperature.append(df_ws.iloc[index_ws[i]]["temperature"])
        pressure.append(df_ws.iloc[index_ws[i]]["pressure"])
        humidity.append(df_ws.iloc[index_ws[i]]["humidity"])
        tngDust.append(df_ws.iloc[index_ws[i]]["tngDust"])
        tngSeeing.append(df_ws.iloc[index_ws[i]]["tngSeeing"])
        rain.append(df_ws.iloc[index_ws[i]]["rain"])
    else:
        temperature.append(None)
        pressure.append(None)
        humidity.append(None)
        tngDust.append(None)
        tngSeeing.append(None)
        rain.append(None)

dict_dcheck["temperature"] = np.array(temperature)
dict_dcheck["pressure"]    = np.array(pressure)
dict_dcheck["humidity"]    = np.array(humidity)
dict_dcheck["tngDust"]     = np.array(tngDust)
dict_dcheck["tngSeeing"]   = np.array(tngSeeing)
dict_dcheck["rain"]        = np.array(rain)

with open(dir_objects + "/data_dict.pkl", 'wb') as f:
    pickle.dump(dict_dcheck, f, pickle.HIGHEST_PROTOCOL)