# QP Ensemble to Nested Pandas Dataframes to LSDB HATS to QP Ensemble

## Modules

In [None]:
import convert_ens
import numpy as np
import qp
import pickle
import pandas as pd
import lsdb 
import nested_pandas as npd
from nested_pandas.nestedframe import NestedFrame
from rail.core.stage import RailStage
from rail.core.data import QPHandle, TableHandle, QPOrTableHandle, ModelHandle

%matplotlib inline 

## Rail Stage Setup

In [11]:
DS = RailStage.data_store
DS.__class__.allow_overwrite = True

## QP Ensemble to Nested Pandas Dataframes
##### Converts a qp.Ensemble into a single pandas.DataFrame where each row corresponds to an object and each column contains nested ensemble data in dictionary form, suitable for HATS processing. 
##### It uses convert_ens to generate multiple NestedFrame representations of the ensemble (e.g., histogram, quantiles, interpolated), then flattens and organizes this data into a structured, object-wise DataFrame for easy analysis or storage.

In [None]:
def ens_to_df(ens, algo):
    """
    Convert a qp.Ensemble into a single pandas DataFrame using nested-pandas format.
    """
    # Get the converted ensembles using the new convert_ens function
    converted_ensembles = convert_ens.convert_ens(ens, algo)
    
    # Create a base DataFrame with one row per object
    n_objects = len(next(iter(converted_ensembles.values())))
    df = pd.DataFrame(index=range(n_objects))
    
    # For each ensemble type, store the nested data in a way that HATS can handle
    for ensemble_name, nested_frame in converted_ensembles.items():
        series_data = []
        
        # The nested_frame should have nested columns containing the actual data
        if nested_frame.nested_columns:
            nested_col_name = nested_frame.nested_columns[0]
            
            # For each object (row in the base frame)
            for obj_id in range(n_objects):
                if obj_id < len(nested_frame):
                    # Get the nested data for this object
                    nested_data = nested_frame[nested_col_name].iloc[obj_id]
                    
                    # Convert the nested DataFrame to a dictionary
                    if hasattr(nested_data, 'to_dict'):
                        obj_dict = nested_data.to_dict('list')
                    elif isinstance(nested_data, dict):
                        obj_dict = nested_data
                    else:
                        # Try to convert to dict some other way
                        obj_dict = dict(nested_data) if nested_data is not None else {}
                    
                    series_data.append(obj_dict)
                else:
                    series_data.append({})
        else:
            # If no nested columns, create empty dicts
            series_data = [{}] * n_objects
        
        df[ensemble_name] = series_data
    
    return df

## Nested Pandas Dataframes to LSDB HATS
##### Converts a pandas.DataFrame into a HATS-compatible LSDB catalog and saves it to disk. 
##### It initializes the catalog using spatial coordinates (coord_ra, coord_dec) and a specified name, sets multipole orders (lowest_order=2, highest_order=5), and exports the catalog in HATS format. 
##### The function returns the resulting catalog object for further use.

In [15]:
def df_to_hats(df, name):
    '''
    Convert a pandas DataFrame into a HATS-compatible LSDB catalog and save it to disk.
    '''
    catalog = lsdb.from_dataframe(
    df,
    ra_column="coord_ra",
    dec_column="coord_dec",
    catalog_name=name,
    catalog_type="object",
    lowest_order=2,
    highest_order=5,
    threshold=len(df))
    catalog.to_hats(name, overwrite=True)
    from_dataframe_catalog = lsdb.open_catalog(name)
    return catalog

## LSDB HATS to QP Ensemble
##### Loads a HATS-formatted LSDB catalog from a given file path and reconstructs a dictionary of qp.Ensemble objects from its contents. 
##### Based on the key suffix (e.g., interp, mixmod, norm, hist, or quantile lengths like 99, 20, 5), it parses the nested data fields and rebuilds each ensemble using the appropriate qp generator. 
##### Returns a dictionary mapping each key to its corresponding ensemble.

In [16]:
def hats_to_qp(file_path):
    '''
    Load a HATS-formatted LSDB catalog and reconstruct qp.Ensemble objects from its contents.
    '''
    from_dataframe_catalog = lsdb.open_catalog(file_path)
    df = from_dataframe_catalog.compute()
    ensembles = {}
    for key in df.keys():
        parts = key.split('_')
        data_dict = {}
        if parts[-1] == 'interp':
            yvals = []
            for i in df[key]:
                yvals.append(i['yvals'])
            data_dict['yvals'] = yvals
            data_dict['xvals'] = i['xvals']
            ensembles[key] = qp.Ensemble(qp.interp_gen, data=data_dict)
        if parts[-1] == 'mixmod':
            weights = []
            stds = []
            means = []
            for i in df[key]:
                weights.append(i['weights'])
                stds.append(i['stds'])
                means.append(i['means'])
            data_dict['means'] = np.array(means)
            data_dict['stds'] = np.array(stds)
            data_dict['weights'] = np.array(weights)
            ensembles[key] = qp.Ensemble(qp.mixmod_gen, data=data_dict)
        if parts[-1] == 'norm':
            loc = []
            scale = []
            for i in df[key]:
                loc.append(i['loc'])
                scale.append(i['scale'])
            data_dict['loc'] = np.array(loc)
            data_dict['scale'] = np.array(scale)
            ensembles[key] = qp.Ensemble(qp.stats.norm, data=data_dict)
        if parts[-1] == 'hist':
            pdfs = []
            for i in df[key]:
                pdfs.append(i['pdfs'])
            data_dict['pdfs'] = np.array(pdfs)
            data_dict['bins'] = np.linspace(0, 3, 301)
            ensembles[key] = qp.Ensemble(qp.hist_gen, data=data_dict)
        if parts[-1] == '99' or parts[-1] == '20' or parts[-1] == '5':
            locs = []
            for i in df[key]:
                locs.append(i['locs'])
            data_dict['locs'] = np.array(locs)
            data_dict['quants'] = np.array(i['quants'])
            ensembles[key] = qp.Ensemble(qp.quant_gen, data=data_dict)
    return ensembles    

# DP1 Example

##### 4 Band Data

In [17]:
# Change file path
bpz_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_bpz.hdf5'
cmnn_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_cmnn.hdf5'
dnf_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_dnf.hdf5'
fzboost_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_fzboost.hdf5'
knn_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_knn.hdf5'
lephare_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_lephare.hdf5'
tpz_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/output_estimate_tpz.hdf5'
outputFiles_4bands = {'bpz_file': bpz_file, 'cmnn_file': cmnn_file, 'dnf_file': dnf_file,
                      'fzboost_file': fzboost_file, 'knn_file': knn_file, 'lephare_file': lephare_file, 
                      'tpz_file': tpz_file}
algos = ['bpz', 'cmnn', 'dnf', 'fzboost', 'knn', 'lephare', 'tpz']
df = pd.read_parquet('/Users/sarahpelesky/Downloads/dp1_ecdfs_edfs_sv95_sv37_v1.parquet')
df_subset = pd.read_parquet('/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/sarah_data/4bands/dp1_v29.0.0_gold_Rubin_SV_38_7_photoz_cat.parquet')
new_df = pd.merge(df_subset, df, on='objectId', how='left', suffixes=('_subset', '_full'))

#### BPZ Example

In [18]:
output_data = DS.read_file("output_data", QPHandle, bpz_file)
df = ens_to_df(output_data.data, 'bpz')
df['coord_ra'] = new_df['coord_ra']
df['coord_dec'] = new_df['coord_dec']
df



here
Successfully created bpz_ens_interp_to_quant_99
here
Successfully created bpz_ens_interp_to_quant_20
here
Successfully created bpz_ens_interp_to_quant_5


  res = sqrt(self.stats(*args, **kwds))


Done converting.


Unnamed: 0,bpz_ens_interp,bpz_ens_interp_to_hist,bpz_ens_interp_to_quant_99,bpz_ens_interp_to_quant_20,bpz_ens_interp_to_quant_5,bpz_ens_interp_to_norm,coord_ra,coord_dec
0,"{'yvals': [0.0, 0.05513663978250978, 0.1068442...","{'pdfs': [0.02756831989125489, 0.0809904364631...","{'locs': [0.037283232580966996, 0.061513217592...","{'locs': [0.04591148831496429, 0.0615132175920...","{'locs': [0.05313888364411205, 0.0615132175920...","{'loc': [0.42742514450305613], 'scale': [0.358...",37.691623,7.384259
1,"{'yvals': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","{'pdfs': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...","{'locs': [0.6537138438429267, 0.68143610109248...","{'locs': [0.6673293784597863, 0.68143610109248...","{'locs': [0.6761759467206362, 0.68143610109248...","{'loc': [0.9007582468804702], 'scale': [0.1842...",37.825903,7.384505
...,...,...,...,...,...,...,...,...
169032,"{'yvals': [0.0, 0.05950714207418124, 0.1165122...","{'pdfs': [0.02975357103709062, 0.0880097171943...","{'locs': [0.034157510332403385, 0.060015429903...","{'locs': [0.041979860352284334, 0.060015429903...","{'locs': [0.04960739182574118, 0.0600154299039...","{'loc': [0.5970112416211045], 'scale': [0.4137...",37.408291,7.891881
169033,"{'yvals': [0.0, 0.29417214903714967, 0.5238156...","{'pdfs': [0.14708607451857483, 0.4089939230504...","{'locs': [0.01411907719545967, 0.0271376218298...","{'locs': [0.017641523020463352, 0.027137621829...","{'locs': [0.02061429183617046, 0.0271376218298...","{'loc': [0.45371880592382235], 'scale': [0.440...",37.404037,7.901374


In [19]:
cat = df_to_hats(df, 'from_dataframe')
cat

Unnamed: 0_level_0,bpz_ens_interp,bpz_ens_interp_to_hist,bpz_ens_interp_to_quant_99,bpz_ens_interp_to_quant_20,bpz_ens_interp_to_quant_5,bpz_ens_interp_to_norm,coord_ra,coord_dec
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
"Order: 3, Pixel: 2","struct<xvals: list<item: double>, yvals: list<...","struct<bins: list<item: double>, pdfs: list<it...","struct<locs: list<item: double>, quants: list<...","struct<locs: list<item: double>, quants: list<...","struct<locs: list<item: double>, quants: list<...","struct<loc: list<item: double>, scale: list<it...",double[pyarrow],double[pyarrow]
"Order: 5, Pixel: 4471",...,...,...,...,...,...,...,...


In [11]:
d = hats_to_qp('from_dataframe')
d



{'bpz_ens_interp': Ensemble(the_class=interp,shape=(169034, 301)),
 'bpz_ens_interp_to_hist': Ensemble(the_class=hist,shape=(169034, 300)),
 'bpz_ens_interp_to_quant_99': Ensemble(the_class=quant,shape=(169034, 101)),
 'bpz_ens_interp_to_quant_20': Ensemble(the_class=quant,shape=(169034, 22)),
 'bpz_ens_interp_to_quant_5': Ensemble(the_class=quant,shape=(169034, 7)),
 'bpz_ens_interp_to_norm': Ensemble(the_class=norm,shape=(169034, 1))}

##### 6 Band Data

In [8]:
# Change file path
bpz_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_bpz.hdf5'
cmnn_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_cmnn.hdf5'
dnf_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_dnf.hdf5'
fzboost_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_fzboost.hdf5'
gpz_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_gpz.hdf5'
knn_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_knn.hdf5'
lephare_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_lephare.hdf5'
tpz_file = '/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/output_estimate_tpz.hdf5'
outputFiles_4bands = {'bpz_file': bpz_file, 'cmnn_file': cmnn_file, 'dnf_file': dnf_file,
                      'fzboost_file': fzboost_file, 'gpz_file': gpz_file, 'knn_file': knn_file, 'lephare_file': lephare_file, 
                      'tpz_file': tpz_file}
algos = ['bpz', 'cmnn', 'dnf', 'fzboost', 'gpz', 'knn', 'lephare', 'tpz']
df = pd.read_parquet('/Users/sarahpelesky/Downloads/dp1_ecdfs_edfs_sv95_sv37_v1.parquet')
df_subset = pd.read_parquet('/Users/sarahpelesky/Desktop/LINCC Internship/RAIL/Projects (RAIL)/nersc_results/dp1_6band/dp1_v29.0.0_gold_ecdfs_edfs_sv95_photoz_cat.parquet')
new_df = pd.merge(df_subset, df, on='objectId', how='left', suffixes=('_subset', '_full'))

In [9]:
output_data = DS.read_file("output_data", QPHandle, bpz_file)
n_objects = len(output_data.data)
print(n_objects)
#df = ens_to_df(output_data.data, 'bpz')
#df['coord_ra'] = new_df['coord_ra']
#df['coord_dec'] = new_df['coord_dec']
#df

375610
