<img align='left' src = '../../images/linea.png' width=150 style='padding: 20px'> 

# Report HATS
## HATS import on DP0.1 Truth Random Sample catalog

Performance report on LINCC libraries.

Contacts: Luigi Silva ([luigi.silva@linea.org.br](mailto:luigi.silva@linea.org.br)); Julia Gschwend ([julia@linea.org.br](mailto:julia@linea.org.br)).

Last check: 08/11/2024

#### Acknowledgments

'_This notebook used computational resources from the Associação Laboratório Interinstitucional de e-Astronomia (LIneA) with financial support from the INCT of e-Universe (Process No. 465376/2014-2)._'

'_This notebook uses libraries from the LSST Interdisciplinary Network for Collaboration and Computing (LINCC) Frameworks project, such as the hats, hats_import, and lsdb libraries. The LINCC Frameworks project is supported by Schmidt Sciences. It is also based on work supported by the National Science Foundation under Grant No. AST-2003196. Additionally, it receives support from the DIRAC Institute at the Department of Astronomy of the University of Washington. The DIRAC Institute is supported by gifts from the Charles and Lisa Simonyi Fund for Arts and Sciences and the Washington Research Foundation._'

# Imports

Let us import the packages that we will need.

In [None]:
########################### GENERAL ##########################
import os
import gc
import re
import sys
import glob
import time
import math
import getpass
import warnings
import tables_io
import subprocess
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
############################ DASK ############################
import dask
from dask import dataframe as dd
from dask import delayed
from dask.distributed import Client, performance_report, wait
import dask_jobqueue
from dask_jobqueue import SLURMCluster
########################## HATS ###########################
import hats
from hats.inspection.visualize_catalog import plot_pixels
from hats.pixel_math import HealpixPixel
########################## HATS IMPORT ###########################
import hats_import
from hats_import.catalog.file_readers import ParquetReader, FitsReader
from hats_import.margin_cache.margin_cache_arguments import MarginCacheArguments
from hats_import.pipeline import ImportArguments, pipeline_with_client
############################ LSDB ############################
import lsdb
######################## VISUALIZATION #######################
### BOKEH
import bokeh
from bokeh.io import output_notebook, show
from bokeh.models import ColorBar, LinearColorMapper
from bokeh.palettes import Viridis256

### HOLOVIEWS
import holoviews as hv
from holoviews import opts
from holoviews.operation.datashader import rasterize, dynspread

### GEOVIEWS
import geoviews as gv
import geoviews.feature as gf
from cartopy import crs

### DATASHADER
import datashader as ds

### MATPLOTLIB
import matplotlib.pyplot as plt
########################## ASTRONOMY #########################
from astropy.io import fits
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.units.quantity import Quantity

Defining the plots to be inline.

In [None]:
hv.extension('bokeh')
gv.extension('bokeh')
output_notebook()
%matplotlib inline

Printing the versions of the libraries.

In [None]:
### Getting hats version manually, because it has no __version__ attribute.
result = subprocess.run(
    ["conda", "run", "-p", "/lustre/t0/scratch/users/luigi.silva/hats_env_081124", "conda", "list", "hats"],
    stdout=subprocess.PIPE, text=True
)
for line in result.stdout.splitlines():
    if line.startswith("hats "):
        hats_version = line.split()[1]
        break

### Printing the versions.
print(f'python version: {sys.version}')
print(f'numpy version: {np.__version__}')
print(f'dask version: {dask.__version__}')
print(f'dask_jobqueue version: {dask_jobqueue.__version__}')
print(f'hats version: {hats_version}')
print(f'hats_import version: {hats_import.__version__}')
print(f'lsdb version: {lsdb.__version__}')

# Configurations

## Running configurations

Set the configurations for this running.

In [None]:
# DO YOU WANT TO GENERATE A RANDOM SAMPLE OF THE INPUT CATALOG?
# IF YOU CHOOSE TRUE HERE, THE RANDOM SAMPLE WILL BE USED FOR HATS IMPORT, INSTEAD OF THE FULL ORIGINAL CATALOG.
# IF YOU CHOOSE FALSE HERE, THE FULL INPUT CATALOG WILL BE USED FOR HATS IMPORT.
generate_random_sample = False

# DO YOU WANT TO RUN THE HATS IMPORT PIPELINE? 
run_the_pipeline = False

# DO YOU WANT TO LOAD THE INPUT CATALOG AND ITS INFORMATIONS? (IF YOU CHOOSE "TRUE" FOR ANY OF THE ABOVE OPTIONS, YOU MUST CHOOSE "TRUE" HERE)
load_the_input_catalog = True

# DO YOU WANT TO SAVE ALL THE DASK JOBS OUTPUTS AND ERRORS OF DASK SLURMCluster?
save_the_dask_jobs_info = True

# DO YOU WANT TO SAVE ALL THE GENERAL INFORMATIONS OF THIS RUNNING (MAIN LIB VERSIONS, INPUT FILES SIZES, JOBS SCONTROL INFO, OUTPUT FILES SIZES)?
save_the_info = True

# DO YOU WANT TO SHOW THE INFO INLINE?
show_info_inline = True

# ONE OF THE FINAL ANALYSIS STEPS IS TO COMPUTE THE SIZES OF THE CATALOGS DIRECTLY FROM DISK. THIS MAY TAKE A WHILE. DO YOU WANT TO DO THIS COMPUTATION?
compute_sizes_from_disk = True

# DO YOU WANT TO CLOSE THE CLIENT AND THE CLUSTER AT THE END?
close_the_cluster = True

If you choose not to run the pipeline, give the path to a existing HATS catalog.

In [None]:
CATALOG_HATS_DIR = Path('/lustre/t1/cl/lsst/pz_project/test_data/dp01_truth_random_sample_hats')

## Catalogs paths configurations

Defining the INPUT catalog path.

In [None]:
#input_catalog_path_user_defined = '/lustre/t1/cl/lsst/dp01/primary/catalogs/truth/'
input_catalog_path_user_defined = '/lustre/t1/cl/lsst/pz_project/test_data/dp01_truth_random_sample'

In [None]:
if load_the_input_catalog==True:
    if generate_random_sample==True:
        full_catalog_path = input_catalog_path_user_defined
    else:
        input_catalog_path = input_catalog_path_user_defined

Defining the INPUT catalog files to be used. This can be a list or contain a wildcard, ex: ```files_*.parquet```.

In [None]:
catalog_files_user_defined = '*.parquet'
#catalog_files_user_defined = 'truth_tract507*.parquet'

In [None]:
if load_the_input_catalog==True:
    if generate_random_sample==True:
        full_catalog_files = catalog_files_user_defined
    else:
        input_catalog_files = catalog_files_user_defined

Defining the OUTPUT catalog path and name.

In [None]:
# Paths for the final hats catalog.
output_hats_catalog_path_user_defined = '/lustre/t1/cl/lsst/pz_project/test_data'
output_hats_catalog_name_user_defined = f'dp01_truth_random_sample_hats'

# Path for the generated random sample.
output_random_samples = '/lustre/t1/cl/lsst/pz_project/test_data/dp01_truth_random_sample'

In [None]:
if generate_random_sample==True:
    os.makedirs(output_random_samples, exist_ok=True)
    
if run_the_pipeline==True:
    hats_catalog_path = output_hats_catalog_path_user_defined
    hats_catalog_name = output_hats_catalog_name_user_defined

Defining the USER base path, for saving logs, graphs and other informations about the running.

In [None]:
if save_the_dask_jobs_info or save_the_info:
    user = getpass.getuser()
    user_base_path = f'/lustre/t0/scratch/users/{user}/report_hats/DP01-truth-random-sample'

### Creating directories

In [None]:
if save_the_dask_jobs_info or save_the_info:
    os.makedirs(user_base_path, exist_ok=True)

    current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')

    run_path = os.path.join(user_base_path, f'run_hats_{current_date}')
    os.makedirs(run_path, exist_ok=True)

    logs_dir = os.path.join(run_path, f'logs')
    os.makedirs(logs_dir, exist_ok=True)

    dask_logs_dir = os.path.join(logs_dir, f'dask_logs')
    os.makedirs(dask_logs_dir , exist_ok=True)

## Cluster configurations

Do you want to customize extra dask parameters?

In [None]:
extra_dask_configs=False

If you choose ```True```, see the explanation of the parameters and customize them below.

**Explanation of Parameters**

* ```distributed.worker.memory.target```: sets the memory limit before Dask attempts to release memory from completed tasks. At the specified percentage, Dask will start memory collection earlier, reducing the risk of excessive accumulation.

* ```distributed.worker.memory.spill```: defines the point at which Dask starts spilling data to disk (swap) instead of keeping it in RAM. This helps free up memory for new tasks.

* ```distributed.worker.memory.pause```: when memory usage reaches the specified percentage, Dask will temporarily pause the worker to prevent excessive resource use.

* ```distributed.worker.memory.terminate```: if memory usage reaches the specified percentage, the worker will be restarted, which prevents crashes and helps keep usage under control.

* ```distributed.worker.memory.recent-to-old```: determines the fraction of recently accessed data Dask considers as “old” and, therefore, eligible for spilling to disk. A lower percentage (e.g., 0.2 for 20%) means only the most recent data is retained in RAM, while older data is more likely to be released, helping to manage cache memory efficiently.

In [None]:
if extra_dask_configs==True:
    # Additional Dask configurations
    dask_config = {
        "distributed.worker.memory.target": 0.75,         # 75% before starting memory collection
        "distributed.worker.memory.spill": 0.85,          # 85% before starting to use disk
        "distributed.worker.memory.pause": 0.92,          # Pause the worker at 92%
        "distributed.worker.memory.terminate": 0.98,      # Restart the worker at 98%
        "distributed.worker.memory.recent-to-old": 0.2    # Keep 20% of recent data in memory
    }

    # Applying the Dask configurations
    dask.config.set(dask_config)
else:
    print("Running DASK with the standard memory configuration.")

Defining the configurations for the cluster.

In [None]:
interface="ib0"
queue='cpu_small'
cores=48         
processes=2       
memory='114GB'   
walltime='04:00:00'

if save_the_dask_jobs_info:
    job_extra_directives=[
        '--propagate',
        f'--output={dask_logs_dir}/dask_job_%j_{current_date}.out',  
        f'--error={dask_logs_dir}/dask_job_%j_{current_date}.err'
    ]
else:
    job_extra_directives=[
        '--propagate',
        f'--output=/dev/null',  
        f'--error=/dev/null'
    ]

number_of_nodes=20 

Starting the cluster.

In [None]:
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')

# Configuring the SLURMCluster.
cluster = SLURMCluster(
    interface=interface,         # Lustre interface
    queue=queue,                 # Name of the queue
    cores=cores,                 # Number of logical cores per node
    processes=processes,         # Number of dask processes per node
    memory=memory,               # Memory per node
    walltime=walltime,           # Maximum execution time
    job_extra_directives=job_extra_directives,
)

# Scaling the cluster to use X nodes
cluster.scale(jobs=number_of_nodes)

# Defining the dask client
client = Client(cluster)

# Wait for the workers to initialize
cluster.wait_for_workers(n_workers=number_of_nodes*processes)
client.run(lambda: gc.collect())

Showing informations about the cluster.

In [None]:
cluster_info = client.cluster
cluster_info

Saving the requested resources.

In [None]:
if save_the_info == True:  

    # Specific settings that you want to separate for the memory section
    memory_params = {
        "distributed.worker.memory.target": None,
        "distributed.worker.memory.spill": None,
        "distributed.worker.memory.pause": None,
        "distributed.worker.memory.terminate": None,
        "distributed.worker.memory.recent-to-old": "None",
        "distributed.worker.memory.recent-to-old-time": "None"
    }

    # Example of requested resource settings
    requested_resources = {
        "interface": f"{interface}",
        "queue": f"{queue}",
        "cores": cores,
        "processes": processes,
        "memory": f"{memory}",
        "walltime": f"{walltime}",
        "job_extra_directives": job_extra_directives,
        "number_of_nodes": number_of_nodes
    }

    # Getting Dask configurations
    dask_config = dask.config.config

    # Overwrite the memory parameters if they are set in the Dask configuration
    for param in memory_params.keys():
        sections = param.split('.')
        config = dask_config
        for section in sections:
            config = config.get(section, None)
            if config is None:
                break
        if config is not None:
            memory_params[param] = config

    # Preparing sections
    output = []

    # Requested resources section
    output.append("# Requested resources")
    for key, value in requested_resources.items():
        output.append(f"{key}={value}")

    # Memory configuration section
    output.append("\n# Dask memory configuration:")
    for key, value in memory_params.items():
        output.append(f'"{key}": {value}')

    # Section with all Dask configurations
    output.append("\n# Dask all configurations:")
    for section, config in dask_config.items():
        if isinstance(config, dict):
            output.append(f"[{section}]")
            for key, value in config.items():
                output.append(f"{key}: {value}")
        else:
            output.append(f"{section}: {config}")

    # Saving to a file or displaying the result
    with open(f'{logs_dir}/requested_resources_info.txt', 'w') as f:
        f.write("\n".join(output))

    print("Informations saved in requested_resources_info.txt")

## Reading the input catalog

Getting the name of the catalog files.

In [None]:
if load_the_input_catalog==True:
    if generate_random_sample==True:
        full_total_files = [f for f in glob.glob(os.path.join(full_catalog_path, full_catalog_files))]
    else:
        input_total_files = [f for f in glob.glob(os.path.join(input_catalog_path, input_catalog_files))]

Getting the type of files in the catalog.

In [None]:
if load_the_input_catalog==True:
    if generate_random_sample==True:
        if full_total_files:
            _, full_file_extension = os.path.splitext(full_total_files[0])
            print("Full catalog files extension: ", full_file_extension)
        else:
            print("The list of files of the full catalog is empty.")
    else:
        if input_total_files:
            _, input_file_extension = os.path.splitext(input_total_files[0])
            print("Input files extension: ", input_file_extension)
        else:
            print("The list of input files is empty.")

Reading the catalog with dask.

In [None]:
def read_fits_to_df(filename):
    with fits.open(filename) as hdul:
        data = hdul[1].data
        df = pd.DataFrame(data.byteswap().newbyteorder())
    return df

In [None]:
if load_the_input_catalog==True:
    if generate_random_sample==True:
        if full_file_extension in ['.parquet', '.parq', '.pq']:
            ddf_full = dd.read_parquet(full_total_files)
        elif full_file_extension in ['.fits', '.fit', '.fts']:
            delayed_dfs = [delayed(read_fits_to_df)(file) for file in full_total_files]
            ddf_full = dd.from_delayed(delayed_dfs)
    else:
        if input_file_extension in ['.parquet', '.parq', '.pq']:
            ddf_input = dd.read_parquet(input_total_files)
        elif input_file_extension in ['.fits', '.fit', '.fts']:
            delayed_dfs = [delayed(read_fits_to_df)(file) for file in input_total_files]
            ddf_input = dd.from_delayed(delayed_dfs)

Is your id column in the index of your dataframe?

In [None]:
id_column_is_the_index = False

## Generating the random sample

Defining the column for applying a cut in the data and the fraction of the data.

In [None]:
if generate_random_sample==True:
    column_for_cut = 'truth_type'
    value_for_cut = 1 
    fraction = 0.005
    seed = 42
    #seed = np.random.randint(0, 10000)

Generating the random sample.

In [None]:
if generate_random_sample==True:
    random_sample_performance_report_path = os.path.join(output_random_samples, f'performance_report_{column_for_cut}_{value_for_cut}_{fraction}.html')

    with performance_report(filename=random_sample_performance_report_path):
        # REMOVE NAN VALUES IN THE COLUMN FOR CUT
        ddf_filtered = ddf_full.dropna(subset=[f'{column_for_cut}'])
    
        # APPLY THE CUT
        ddf_filtered = ddf_filtered[ddf_filtered[f'{column_for_cut}'] <= value_for_cut]
    
        # MAKING THE RANDOM SAMPLE
       
        ddf_sampled = ddf_filtered.sample(frac=fraction, random_state=seed)

        # OUTPUT PATHS
        output_file_name_parquet = f'random_sample_{column_for_cut}_{value_for_cut}_{fraction}.parquet'
        output_path_parquet = os.path.join(output_random_samples, output_file_name_parquet)
        
        output_file_name_csv = f'random_sample_{column_for_cut}_{value_for_cut}_{fraction}.csv'
        output_path_csv = os.path.join(output_random_samples, output_file_name_csv)
        
        output_file_name_hdf5 = f'random_sample_{column_for_cut}_{value_for_cut}_{fraction}.hdf5'
        output_path_hdf5 = os.path.join(output_random_samples, output_file_name_hdf5)

        # COMPUTING THE DATAFRAME
        ddf_computed = ddf_sampled.compute()
        if id_column_is_the_index==True:
            ddf_computed = ddf_computed.reset_index()
        else:
            ddf_computed = ddf_computed.reset_index(drop=True)
    
        # SAVING THE DATA TO PARQUET
        ddf_computed.to_parquet(output_path_parquet, index=False)

        # SAVING THE DATA TO CSV
        ddf_computed.to_csv(output_path_csv, index=False)

        # SAVING THE DATA TO HDF5
        tables_io.write(ddf_computed, output_path_hdf5)

Saving the provenance info.

In [None]:
%%time
if generate_random_sample==True:
    random_provenance_directory = os.path.join(output_random_samples, f'provenance.txt')

    with open(random_provenance_directory, 'w') as f:
        f.write(f'Full input catalog path: {full_catalog_path} \n')
        f.write(f'Full input catalog files: {full_catalog_files} \n \n')
        
        f.write(f'Column for cut: {column_for_cut} \n')
        f.write(f'Value for cut (less or equal to): {value_for_cut} \n')
        f.write(f'Random sample fraction: {fraction*100} % \n')
        f.write(f'Random sample seed: {seed} \n \n')
        
        f.write(f'Number of lines in the original catalog: {len(ddf_full)} \n')
        f.write(f'Number of columns in the original catalog: {len(ddf_full.columns)} \n')
        f.write(f'Columns in the original catalog: {ddf_full.columns.to_list()} \n \n')
        
        f.write(f'Number of lines in the random sample: {len(ddf_computed)} \n')
        f.write(f'Number of columns in the random sample: {len(ddf_computed.columns)} \n')
        f.write(f'Columns in the random sample: {ddf_computed.columns.to_list()} \n \n')
        
    print(f'File saved as: {random_provenance_directory} \n')

### Reading the generated random sample

Let us read the generated random sample.

In [None]:
if generate_random_sample==True:
    input_catalog_path = output_random_samples
    input_catalog_files = output_file_name_parquet
    input_total_files = [output_path_parquet]

Let us see the type of input files.

In [None]:
if generate_random_sample==True:
    if input_total_files:
        _, input_file_extension = os.path.splitext(input_total_files[0])
        print("Input files extension:", input_file_extension)
    else:
        print("The list of input files is empty.")

Now, let us load the random sample catalog.

In [None]:
if generate_random_sample==True:
    if input_file_extension in ['.parquet', '.parq', '.pq']:
        ddf_input = dd.read_parquet(input_total_files)
    elif input_file_extension in ['.fits', '.fit', '.fts']:
        delayed_dfs = [delayed(read_fits_to_df)(file) for file in input_total_files]
        ddf_input = dd.from_delayed(delayed_dfs)

## Getting the columns of the input catalog for HATS import

Printing what catalog we are using.

In [None]:
if generate_random_sample==True:
    print("Using the random sample as input to HATS import. \n")
else:
    print("Using the full input catalog as input to HATS import. \n")

Getting the name of the columns.

In [None]:
if load_the_input_catalog==True:
    input_catalog_columns = ddf_input.columns
    input_catalog_columns_list = input_catalog_columns.to_list()
    
    if generate_random_sample==True:
        if show_info_inline==True:
            print("Input catalog columns: \n", input_catalog_columns_list)
    else:
        if id_column_is_the_index==True:
            index_head = ddf_input.index.head(10)
            index_name = index_head.name
            input_catalog_columns_list.append(index_name)
        if show_info_inline==True:
            print("Input catalog columns: \n", input_catalog_columns_list)

Select the columns you want to use for the HATS conversion. By default, we have all the catalog columns.

In [None]:
if load_the_input_catalog==True:
    selected_catalog_columns = input_catalog_columns_list

Getting all the columns containing the strings 'id', 'ra' or 'dec.

In [None]:
if load_the_input_catalog==True:
    input_id_column_list = [coluna for coluna in selected_catalog_columns if re.search(r"id", coluna, re.IGNORECASE)]
    input_ra_column_list = [coluna for coluna in selected_catalog_columns if re.search(r"ra", coluna, re.IGNORECASE)]
    input_dec_column_list = [coluna for coluna in selected_catalog_columns if re.search(r"dec", coluna, re.IGNORECASE)]
    
    if load_the_input_catalog==True:
        print(input_id_column_list, '\n')
        print(input_ra_column_list, '\n')
        print(input_dec_column_list, '\n')

Defining the names of the ID, RA and DEC columns of the input catalog.

In [None]:
if load_the_input_catalog==True:
    input_id_column_name = 'id'
    input_ra_column_name = 'ra'
    input_dec_column_name = 'dec'

# Saving libraries and jobs informations

Saving the libraries versions information.

In [None]:
if save_the_info==True:
    current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
    with open(f'{logs_dir}/main_lib_versions_{current_date}.txt', 'w') as f:
        f.write(f'python version: {sys.version} \n')
        f.write(f'numpy version: {np.__version__} \n')
        f.write(f'dask version: {dask.__version__} \n')
        f.write(f'dask_jobqueue version: {dask_jobqueue.__version__} \n')
        f.write(f'hats version: {hats_version} \n')
        f.write(f'hats_import version: {hats_import.__version__} \n')
        f.write(f'lsdb version: {lsdb.__version__} \n')
    print(f'File saved as: {logs_dir}/main_lib_versions_{current_date}.txt \n')

Defining functions to get informations about the jobs running in the cluster.

In [None]:
# Function to collect information about a job using the scontrol show job command
def get_scontrol_job_info(job_id):
    # Remove any interval or `%` from job_id
    clean_job_id = re.sub(r'\[.*?\]', '', job_id)
    
    # Execute scontrol show job
    result = subprocess.run(['scontrol', 'show', 'job', clean_job_id], stdout=subprocess.PIPE)
    job_info = result.stdout.decode('utf-8')
    
    job_dict = {}
    
    # Process the info line by line
    for line in job_info.splitlines():
        items = line.split()
        for item in items:
            if "=" in item:
                key, value = item.split("=", 1)
                job_dict[key] = value
    
    return job_dict

# Function to collect information about all jobs of the user
def get_all_jobs_info_MINE():
    # Gets the username using os.getenv('USER')
    user = os.getenv('USER')
    
    # Captures the list of running jobs for the user
    result = subprocess.run(['squeue', '-u', user, '-h', '-o', '%i'], stdout=subprocess.PIPE)
    job_ids = result.stdout.decode('utf-8').splitlines()

    # Collects information for each job
    jobs_info = []
    for job_id in job_ids:
        # Removes intervals or % from job_id before passing it to scontrol
        clean_job_id = re.sub(r'\[.*?\]', '', job_id)
        try:
            job_info = get_scontrol_job_info(clean_job_id)
            jobs_info.append(job_info)
        except Exception as e:
            print(f"Error processing job {job_id}: {e}")
    
    # Converts the list of dictionaries into a Pandas DataFrame
    df = pd.DataFrame(jobs_info)
    
    return df


# Function to collect information about all jobs that do not belong to the current user
def get_all_jobs_info_NOT_MINE():
    current_user = os.getenv('USER')
    
    # Captures the list of running jobs
    result = subprocess.run(['squeue', '-h', '-o', '%i %u'], stdout=subprocess.PIPE)
    job_lines = result.stdout.decode('utf-8').splitlines()
    
    # Filters jobs from other users
    jobs_info = []
    for line in job_lines:
        job_id, user = line.split()
        
        # Ignores jobs belonging to the current user
        if user != current_user:
            # Removes intervals or % from job_id before passing it to scontrol
            clean_job_id = re.sub(r'\[.*?\]', '', job_id)
            try:
                job_info = get_scontrol_job_info(clean_job_id)
                jobs_info.append(job_info)
            except Exception as e:
                print(f"Error processing job {job_id}: {e}")
    
    # Converts to DataFrame
    df = pd.DataFrame(jobs_info)
    return df

Getting my jobs.

In [None]:
# Collects information of all jobs and saves it in the DataFrame
df_jobs_MINE = get_all_jobs_info_MINE()

if show_info_inline==True:
    print(df_jobs_MINE[['JobId','NodeList','NumNodes','NumCPUs','NumTasks','CPUs/Task','TRES']])

Getting other people jobs.

In [None]:
# Collects information of all jobs and saves it in the DataFrame
df_jobs_NOT_MINE = get_all_jobs_info_NOT_MINE()

if len(df_jobs_NOT_MINE)!=0:
    if show_info_inline==True:
        print(df_jobs_NOT_MINE[['JobId','NodeList','NumNodes','NumCPUs','NumTasks','CPUs/Task','TRES']])
else:
    df_jobs_NOT_MINE_EMPTY_MSG = pd.DataFrame({"EMPTY": ["There are no other jobs running in the cluster."]})
    print("There are no other jobs running in the cluster.")

Saving the data of the jobs in a csv.

In [None]:
if save_the_info==True:
    current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
    
    file_name_MINE = f'{logs_dir}/jobs_info_MINE_{current_date}.csv'
    file_name_NOT_MINE = f'{logs_dir}/jobs_info_NOT_MINE_{current_date}.csv'
    
    df_jobs_MINE.to_csv(file_name_MINE, index=False)
    if len(df_jobs_NOT_MINE)!=0:
        df_jobs_NOT_MINE.to_csv(file_name_NOT_MINE, index=False)
    else:
        df_jobs_NOT_MINE_EMPTY_MSG.to_csv(file_name_NOT_MINE, index=False)
        
    print(f'Files saved as: \n')
    print(f'{file_name_MINE} \n')
    print(f'{file_name_NOT_MINE} \n')

# Preview of the input files and catalog

Preview of some filenames.

In [None]:
if load_the_input_catalog==True:
    if show_info_inline==True:
        for item in input_total_files[0:5]:
            print(item)

First lines of the catalog.

In [None]:
if load_the_input_catalog==True:
    if show_info_inline==True:
        print(ddf_input.head(10))

## Summarize pixels and sizes for input catalog files

Size on disk data - check the balance of your data. 

* min size_on_disk: smallest file (in GB)

* max size_on_disk: largest file size (in GB)

* size_on_disk ratio: max/min

* total size_on_disk: sum of all catalog files sizes

Source: https://hats.readthedocs.io/en/stable/notebooks/catalog_size_inspection.html

In [None]:
if load_the_input_catalog == True:
    file_info_list = []

    for file_path in input_total_files:
        try:
            file_size = os.stat(file_path).st_size  # Size in bytes
            file_size_gb = file_size / (1024 ** 3)  # Converting bytes to GB
            file_info_list.append({"file": file_path, "size_on_disk": file_size, "gbs": file_size_gb})
        except FileNotFoundError:
            print(f"File not found: {file_path}")

    input_info_frame = pd.DataFrame(file_info_list)

    num_partitions = len(input_info_frame)
    min_size_on_disk = input_info_frame["gbs"].min() if not input_info_frame.empty else 0
    max_size_on_disk = input_info_frame["gbs"].max() if not input_info_frame.empty else 0
    total_size_on_disk = input_info_frame["gbs"].sum()
    size_on_disk_ratio = max_size_on_disk / min_size_on_disk if min_size_on_disk > 0 else float('inf')

    if show_info_inline == True:
        print(f"num partitions: {num_partitions}")
        print(f"min size_on_disk: {min_size_on_disk:.6f} GB")
        print(f"max size_on_disk: {max_size_on_disk:.6f} GB")
        print(f"total size_on_disk: {total_size_on_disk:.6f} GB")
        print(f"size_on_disk_ratio: {size_on_disk_ratio:.6f}")

    if save_the_info == True:
        current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
        with open(f"{logs_dir}/input_summarize_pixels_{current_date}.txt", "w") as summary_file:
            summary_file.write(f"num partitions: {num_partitions}\n")
            summary_file.write(f"min size_on_disk: {min_size_on_disk:.6f} GB\n")
            summary_file.write(f"max size_on_disk: {max_size_on_disk:.6f} GB\n")
            summary_file.write(f"total size_on_disk: {total_size_on_disk:.6f} GB\n")
            summary_file.write(f"size_on_disk_ratio: {size_on_disk_ratio:.6f}\n")

        with open(f"{logs_dir}/input_files_paths_{current_date}.txt", "w") as paths_file:
            for file_path in input_total_files:
                paths_file.write(f"{file_path}\n")

## File size distribution for input catalog files
Below we look at histograms of file sizes.

In our initial testing, we find that there’s a “sweet spot” file size of 100MB-1GB. Files that are smaller create more overhead for individual reads. Files that are much larger may create slow-downs when cross-matching between catalogs. Files that are much larger can create out-of-memory issues for dask when loading from disk.

The majority of your files should be in the “sweet spot”, and no files in the “too-big” category."

Source: https://hats.readthedocs.io/en/stable/notebooks/catalog_size_inspection.html

In [None]:
def process_file_size_info(info_frame, type_of_files, bins, labels, logs_dir, save=False, show=False):
    
    plt.hist(info_frame["gbs"], edgecolor='black')
    plt.xlabel("File size (GB)")
    plt.ylabel("Number of files")

    if save:
        current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
        plt.savefig(f"{logs_dir}/{type_of_files}_file_size_histogram_{current_date}.png")

        hist = np.histogram(info_frame["gbs"], bins=bins)[0]
        pcts = hist / len(info_frame)

        with open(f"{logs_dir}/{type_of_files}_file_size_distribution_{current_date}.txt", "w") as file:
            for i, label in enumerate(labels):
                file.write(f"{label} \t: {hist[i]} \t({pcts[i]*100:.1f} %)\n")

    if show:
        plt.show()

        hist = np.histogram(info_frame["gbs"], bins=bins)[0]
        pcts = hist / len(info_frame)

        for i, label in enumerate(labels):
            print(f"{label} \t: {hist[i]} \t({pcts[i]*100:.1f} %)")

In [None]:
type_of_files = 'input'
bins = [0, 0.5, 1, 2, 100]
labels = ["small-ish", "sweet-spot", "big-ish", "too-big"]

if save_the_info:
    logs_dir = logs_dir
else:
    logs_dir=None

process_file_size_info(
    info_frame=input_info_frame,
    type_of_files=type_of_files,
    bins=bins,
    labels=labels,
    logs_dir=logs_dir,
    save=save_the_info,
    show=show_info_inline
)

# Converting the catalog to HATS format

Generating the HATS catalog.

In [None]:
if run_the_pipeline==True:
    current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')

    ################################## INPUT CONFIGS #################################
    ### Directory and name of the input files. The name can be a list or contain a wildcard, ex: files_*.parquet.
    CATALOG_DIR = Path(input_catalog_path)
    CATALOG_FILES = input_catalog_files
    ### Columns to be selected in the input files. The id, ra e dec columns are essential.
    CATALOG_SELECTED_COLUMNS = selected_catalog_columns
    CATALOG_SORT_COLUMN = input_id_column_name
    CATALOG_RA_COLUMN = input_ra_column_name
    CATALOG_DEC_COLUMN = input_dec_column_name
    ### Type of the files we will read.
    FILE_TYPE = input_file_extension
    ###########################################################################################

    ################################# OUTPUT CONFIGS #################################
    ### Name of the HATS catalog to be saved.
    CATALOG_HATS_NAME = hats_catalog_name

    ### Output directory for the catalog and logs.
    HATS_DIR = Path(hats_catalog_path)
    LOGS_DIR = Path(logs_dir)

    CATALOG_HATS_DIR = HATS_DIR / CATALOG_HATS_NAME

    ### Path to dask performance report.
    PERFORMANCE_REPORT_NAME = f'dask_performance_report_{current_date}.html'
    PERFORMANCE_DIR = LOGS_DIR / PERFORMANCE_REPORT_NAME
    ###########################################################################################

    ############################### EXECUTING THE PIPELINE ######################################
    with performance_report(filename=PERFORMANCE_DIR):
        if isinstance(CATALOG_FILES, list)==True:
            CATALOG_PATHS = [CATALOG_DIR / file for file in CATALOG_FILES]
        elif isinstance(CATALOG_FILES, str)==True:
            CATALOG_PATHS = list(CATALOG_DIR.glob(CATALOG_FILES))
        else:
            raise Exception('The type of names of catalogs files (CATALOG_FILES) is not supported. Supported types are list and str.')
    
        if FILE_TYPE in ['.parquet', '.parq', '.pq']:
            catalog_args = ImportArguments(
                sort_columns=CATALOG_SORT_COLUMN,
                ra_column=CATALOG_RA_COLUMN,
                dec_column=CATALOG_DEC_COLUMN,
                input_file_list=CATALOG_PATHS,
                file_reader=ParquetReader(column_names=CATALOG_SELECTED_COLUMNS),
                output_artifact_name=CATALOG_HATS_NAME,
                output_path=HATS_DIR,
            )
            pipeline_with_client(catalog_args, client)
        elif FILE_TYPE in ['.fits', '.fit', '.fts']:
            catalog_args = ImportArguments(
                sort_columns=CATALOG_SORT_COLUMN,
                ra_column=CATALOG_RA_COLUMN,
                dec_column=CATALOG_DEC_COLUMN,
                input_file_list=CATALOG_PATHS,
                file_reader=FitsReader(column_names=CATALOG_SELECTED_COLUMNS),
                output_artifact_name=CATALOG_HATS_NAME,
                output_path=HATS_DIR,
            )
            pipeline_with_client(catalog_args, client)
        else:
            raise Exception('Input catalog type not supported yet.')
###########################################################################################
else:
    print('You selected not to run the pipeline.') 

# Analysing the outputs

## Pixels plot

Plotting the pixels.

In [None]:
loaded_hats_catalog = hats.read_hats(CATALOG_HATS_DIR)
if save_the_info==True:
    current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
    plot_pixels(loaded_hats_catalog)
    plt.savefig(f"{logs_dir}/pixels_plot_{current_date}.png")
else:
    plot_pixels(loaded_hats_catalog)

## Comparing sizes

Computing the number of row in the input catalog.

In [None]:
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')

In [None]:
%%time
if compute_sizes_from_disk==True:
    if load_the_input_catalog==True:
        input_total_rows_computed = len(ddf_input)
    
        if show_info_inline==True:
            print(f"Input catalog path: {input_catalog_path}\n")
            print(f"Total number of rows: {input_total_rows_computed}\n")
            print(f"Total number of columns: {len(input_catalog_columns_list)}\n\n")
    
        if save_the_info==True:
            with open(f'{logs_dir}/total_len_of_files_{current_date}.txt', 'a') as f:            
                f.write(f"Input catalog path: {input_catalog_path}\n")
                f.write(f"Total number of rows: {input_total_rows_computed}\n")
                f.write(f"Total number of columns: {len(input_catalog_columns_list)}\n\n")

Computing the number of rows in the HATS catalog.

In [None]:
%%time
if compute_sizes_from_disk == True:
    loaded_hats_catalog_from_disk_lsdb = lsdb.read_hats(CATALOG_HATS_DIR)
    loaded_hats_catalog_from_disk_hats = hats.read_hats(CATALOG_HATS_DIR)
    
    hats_catalog_total_columns = loaded_hats_catalog_from_disk_lsdb.columns.to_list()
    hats_catalog_total_rows = loaded_hats_catalog_from_disk_hats.catalog_info.total_rows

    if show_info_inline == True:
        print(f"HATS catalog path: {CATALOG_HATS_DIR} \n")
        print(f"Total number of rows: {hats_catalog_total_rows}\n")
        print(f"Total number of columns: {len(hats_catalog_total_columns)}\n\n")

    if save_the_info == True:
        with open(f'{logs_dir}/total_len_of_files_{current_date}.txt', 'a') as f:
            f.write(f"HATS catalog path: {CATALOG_HATS_DIR}\n")
            f.write(f"Total number of rows: {hats_catalog_total_rows}\n")
            f.write(f"Total number of columns: {len(hats_catalog_total_columns)}\n\n")

Comparing the columns.

In [None]:
%%time
if compute_sizes_from_disk==True:
    input_columns_set = set(input_catalog_columns_list)
    loaded_columns_set = set(hats_catalog_total_columns)

    missing_columns = list(input_columns_set - loaded_columns_set)
    extra_columns = list(loaded_columns_set - input_columns_set)

    if show_info_inline==True:
        print(f"missing_columns = {missing_columns} \n")
        print(f"extra_columns = {extra_columns}\n\n")
    if save_the_info==True:
        with open(f'{logs_dir}/total_len_of_files_{current_date}.txt', 'a') as f:
            f.write(f"missing_columns = {missing_columns} \n")
            f.write(f"extra_columns = {extra_columns}\n\n")

## Summarize pixels and sizes
* "healpix orders: distinct healpix orders represented in the partitions

* num partitions: total number of partition files

Size on disk data - using the file sizes fetched above, check the balance of your data. If your rows are fixed-width (e.g. no nested arrays, and few NaNs), the ratio here should be similar to the ratio above. If they’re very different, and you experience problems when parallelizing operations on your data, you may consider re-structuring the data representation.

* min size_on_disk: smallest file (in GB)

* max size_on_disk: largest file size (in GB)

* size_on_disk ratio: max/min

total size_on_disk: sum of all parquet catalog files (actual catalog size may vary due to other metadata files)"

Source: https://hats.readthedocs.io/en/stable/notebooks/catalog_size_inspection.html

In [None]:
### Change this path!!!
catalog_dir = CATALOG_HATS_DIR

### ----------------
### You probably won't have to change anything from here.

catalog = hats.read_hats(catalog_dir)

output_info_frame = catalog.partition_info.as_dataframe()

for index, partition in output_info_frame.iterrows():
    file_name = result = hats.io.paths.pixel_catalog_file(
        catalog_dir, HealpixPixel(partition["Norder"], partition["Npix"])
    )
    output_info_frame.loc[index, "size_on_disk"] = os.path.getsize(file_name)

output_info_frame = output_info_frame.astype(int)
output_info_frame["gbs"] = output_info_frame["size_on_disk"] / (1024 * 1024 * 1024)

In [None]:
if save_the_info==True:
    current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
    with open(f"{logs_dir}/output_summarize_pixels_{current_date}.txt", "w") as file:
        file.write(f'healpix orders: {output_info_frame["Norder"].unique()}\n')
        file.write(f'num partitions: {len(output_info_frame["Npix"])}\n')
        file.write("------\n")
        file.write(f'min size_on_disk: {output_info_frame["gbs"].min():.8f}\n')
        file.write(f'max size_on_disk: {output_info_frame["gbs"].max():.8f}\n')
        file.write(f'size_on_disk ratio: {output_info_frame["gbs"].max()/output_info_frame["gbs"].min():.8f}\n')
        file.write(f'total size_on_disk: {output_info_frame["gbs"].sum():.8f}\n')
if show_info_inline==True:
    print(f'healpix orders: {output_info_frame["Norder"].unique()}')
    print(f'num partitions: {len(output_info_frame["Npix"])}')
    print("------")
    print(f'min size_on_disk: {output_info_frame["gbs"].min():.7f}')
    print(f'max size_on_disk: {output_info_frame["gbs"].max():.7f}')
    print(f'size_on_disk ratio: {output_info_frame["gbs"].max()/output_info_frame["gbs"].min():.7f}')
    print(f'total size_on_disk: {output_info_frame["gbs"].sum():.7f}')

In [None]:
if show_info_inline==True:
    print(output_info_frame[output_info_frame['gbs'] == output_info_frame['gbs'].min()])

In [None]:
if show_info_inline==True:
    print(output_info_frame[output_info_frame['gbs'] == output_info_frame['gbs'].max()])

## File size distribution
"Below we look at histograms of file sizes.

In our initial testing, we find that there’s a “sweet spot” file size of 100MB-1GB. Files that are smaller create more overhead for individual reads. Files that are much larger may create slow-downs when cross-matching between catalogs. Files that are much larger can create out-of-memory issues for dask when loading from disk.

The majority of your files should be in the “sweet spot”, and no files in the “too-big” category."

Source: https://hats.readthedocs.io/en/stable/notebooks/catalog_size_inspection.html

In [None]:
def process_file_size_info(info_frame, type_of_files, bins, labels, logs_dir, save=False, show=False):
    
    plt.hist(info_frame["gbs"], edgecolor='black')
    plt.xlabel("File size (GB)")
    plt.ylabel("Number of files")

    if save:
        current_date = datetime.now().strftime('%Y-%m-%d_%H-%M')
        plt.savefig(f"{logs_dir}/{type_of_files}_file_size_histogram_{current_date}.png")

        hist = np.histogram(info_frame["gbs"], bins=bins)[0]
        pcts = hist / len(info_frame)

        with open(f"{logs_dir}/{type_of_files}_file_size_distribution_{current_date}.txt", "w") as file:
            for i, label in enumerate(labels):
                file.write(f"{label} \t: {hist[i]} \t({pcts[i]*100:.1f} %)\n")

    if show:
        plt.show()

        hist = np.histogram(info_frame["gbs"], bins=bins)[0]
        pcts = hist / len(info_frame)

        for i, label in enumerate(labels):
            print(f"{label} \t: {hist[i]} \t({pcts[i]*100:.1f} %)")

In [None]:
type_of_files = 'output'
bins = [0, 0.5, 1, 2, 100]
labels = ["small-ish", "sweet-spot", "big-ish", "too-big"]

if save_the_info:
    logs_dir = logs_dir
else:
    logs_dir=None

process_file_size_info(
    info_frame=output_info_frame,
    type_of_files=type_of_files,
    bins=bins,
    labels=labels,
    logs_dir=logs_dir,
    save=save_the_info,
    show=show_info_inline
)

# Closing the cluster

In [None]:
if close_the_cluster==True:
    client.close()
    cluster.close()