<!--

    Gaia Data Processing and Analysis Consortium (DPAC) 
    Co-ordination Unit 9 Work Package 930
    
    (c) 2005-2025 Gaia DPAC
    
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
    -->
    
The bulk of Gaia XP spectra at Gaia DR3 are provided in a parametric "continuous" representational form (as opposed to conventional sampled form, i.e. fluxes in wavelength bins) in table `gaiadr3.xp_continuous_mean_spectrum`. Utilities for handling this form, including conversion to sampled form and plotting, are provided in a bespoke Python package [GaiaXPy](https://gaia-dpci.github.io/GaiaXPy-website/) which is available on this platform. A small subset of the XP spectra are provided also in sampled form in table `gaiadr3.xp_sampled_mean_spectrum` but these are for illustrative purposes only: users are strongly encouraged to familiarise themselves and work with the continuous representation, not least in order to handle correctly the statistical uncertainties inherent to the data.

To access GaiaXPy on this platform simply import the package as follows:

    import gaiaxpy

then all classes and utility functions etc. will be available.


In [1]:
%pyspark

# standard platform set-up
import gaiadmpsetup

# utility code set-up
from gaiaxpy import plot_spectra, convert

# XP products available in Gaia DR3, so set the default database context accordingly for convenience
spark.sql('USE gaiadr3')

# grab an example spectrum from the table
continuous_df = spark.sql('SELECT * FROM gaiadr3.xp_continuous_mean_spectrum WHERE source_id = 5853498713190525696')
# ... this source identifier corresponds to Proxima Cen (= Alpha Cen C, spectral type M5.5V i.e. a mid-M dwarf)

# convert to a Pandas dataframe for GaiaXPy
continuous_spectrum = continuous_df.toPandas()

# convert to sampled form:
sampled_spectrum, sampling = convert(continuous_spectrum, save_file = False)
    
# plot to sanity check:
plot_spectra(sampled_spectrum, sampling = sampling, multi=False, show_plot=True, output_path=None, legend=True)



In [2]:
%pyspark

from gaiaxpy import calibrate

# GaiaXPy provides classes and methods to create an externally calibrated single spectrum from the internal XP continuous representation:
calibrated_spectrum, sampling = calibrate(continuous_spectrum, save_file = False)

# plot it
plot_spectra(calibrated_spectrum, sampling = sampling, legend = False)


The code in the following cells illustrates a workflow where we trawl through the a large XP spectral data looking for spectra similar to a high signal-to-noise template example. The implementation takes advantage of the end-user programmability of the distributed query execution engine to return results in a reasonable time. The approach is to look for spectra having the same spectral shape, as expressed in the coefficients of the continuous representation.

In [4]:
%pyspark

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import DataFrame
#from scipy.stats import chi

# static constants for use when reconstructing a covariance matrix from the flattened upper-triangular correlation matrix stored in xp_continuous_mean_spectrum
NUM_XP_COEFFS = 55
# these indexes define the column-major positions of the correlation vector elements in the lower triangle of the 2d correlation matrix ...
lower_index = np.tril_indices(NUM_XP_COEFFS,-1)
# ... note that numpy indexing is row-major. The upper triangular index is created from the lower, reflecting across the diagonal,
# by transposing the axes - this results in the required column-major indexing for the upper part
upper_index = (lower_index[1], lower_index[0])
# correlation matrix, empty apart from unity on the diagonal (off-diagonal elements to be filled in on a case-by-case basis)
correlation_matrix = np.diag(np.ones(NUM_XP_COEFFS))

def make_correlation_matrix(correlation_vector : np.ndarray) -> np.ndarray:
    '''
    Returns the fully populated 2d correlation matrix given the flattened, 1d upper-triangular off-diagonal
    elements of the same as persisted in the table of XP continuous representation spectra.
    '''
    # copy in unique, off-diagonal elements from the flattened correlation vector into the 2d-indexed positions
    correlation_matrix[upper_index] = correlation_vector
    correlation_matrix[lower_index] = correlation_vector
    
    # give back the complete correlation matrix
    return correlation_matrix

def make_covariance_matrix(complete_correlation_matrix : np.ndarray, coefficient_error_vector : np.ndarray) -> np.ndarray:
    '''
    Creates the fully reconstructed 2d covariance matrix of an XP continuous representation spectrum given the
    complete 2d correlation matrix and the vector of formal errors on the coefficients. Note that Gaia DPAC CU5 scale 
    predicted coefficient errors by the standard deviation as a post-hoc correction to the formal (sqrt) variances.
    GaiaXPy actually reverses this scaling in computation of the covariance matrix to return the latter as exactly
    that produced as a result of the least-squares solution for the coefficients. Such a de-scaling of the errors
    is not applied here: we assume that the uncertainties are best represented with this scaling intact.
    '''
    # 2d matrix with the errors on the diagonal
    error_matrix = np.diag(coefficient_error_vector)
    
    # from the standard relationship between covariance and correlation
    return error_matrix @ (error_matrix @ complete_correlation_matrix)
    
def xp_mahalanobis_distance(coeff_vector_1 : np.ndarray, covariance_1 : np.ndarray,
        coeff_vector_2 : np.ndarray, error_vector_2 : np.ndarray, correlation_vector_2 : np.ndarray) -> float:
    '''
    Computes the Mahalanobis distance between two XP spectra given template coefficients and fully populated
    covariance for the first, and the data record of the second candidate spectrum, i.e. coefficients, errors and 
    upper-triangular part of the correlation matrix in the continuous representation. The second set of
    coefficients and errors should be scaled to the same flux level as the template spectrum. This means that
    the distance returned quantifies how close the candidate SED shape is to that of the template regardless any
    difference in intrinsic luminosity.
    
    This function uses plain matrix inversion of the combined covariance matrix. This can be numerically unstable
    and result in a negative squared distance resulting in turn in a return value of not-a-number. It is up to 
    the calling application to handle this condition.
    '''
    # second covariance matrix
    corr2 = make_correlation_matrix(correlation_vector_2)
    covariance_2 = make_covariance_matrix(corr2, error_vector_2)
    
    # form covariance of the coefficient difference vector as sum of individual covariance 
    cocovar = covariance_1 + covariance_2
    
    # inverse of combined, scaled covariance
    cocovar_inv = np.linalg.inv(cocovar)

    # use these in the computation of the square of the Mahalanobis distance, e.g. see source linked from
    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.mahalanobis.html
    delta = coeff_vector_1 - coeff_vector_2
    d = np.sqrt(np.dot(np.dot(delta, cocovar_inv), delta))
    
    # resulting Mahalanobs distance follows chi distribution with NUM_XP_COEFFS degrees of freedom (M. Weiler, personal communication)
    # for identical spectra
    return d
    
def find_similar_continuous_spectra(data_frame : DataFrame, template_df : DataFrame) -> DataFrame:
    '''
    Given data frames defining a large set of XP spectra in continuous representation, 
    and a single template example also in continuous representation, search the former for cases
    similar to the latter. The data frame of the set of spectra being
    searched is annotated with a dissimilarity statistic: the greater the value the more
    dissimilar is the candidate spectrum to the template given. By definition this statistic
    will be zero for the template spectrum if it is present in the set of candidates.
    
    Parameters:
    -----------
    data_frame : DataFrame()
        the data frame encapsulating the set of XP continuous representation spectra to be searched
    template_df : DataFrame()
        the template, also in XP continuous representation encapsulated in a data frame.
        
    return : DataFrame()
        a new data frame annotated with a dissimilarity (increasingly positive) statistic where 
        zero indicates a perfect match.
    '''
    
    # convenience reference to template as a Row object:
    template_row = template_df.collect()[0]
    
    # extract the template arrays 
    template_bp_coefficients = np.array(template_row['bp_coefficients']).reshape(-1)
    template_bp_coefficient_errors = np.array(template_row['bp_coefficient_errors']).reshape(-1)
    template_bp_correlations = np.array(template_row['bp_coefficient_correlations']).reshape(-1)
    template_rp_coefficients = np.array(template_row['rp_coefficients']).reshape(-1)
    template_rp_coefficient_errors = np.array(template_row['rp_coefficient_errors']).reshape(-1)
    template_rp_correlations = np.array(template_row['rp_coefficient_correlations']).reshape(-1)
    template_gmag = template_row['phot_g_mean_mag']

    # precompute the required vectors and matrices for the template
    bp_correl_mat = make_correlation_matrix(template_bp_correlations)
    template_bp_covariance = make_covariance_matrix(bp_correl_mat, template_bp_coefficient_errors)
    rp_correl_mat = make_correlation_matrix(template_rp_correlations)
    template_rp_covariance = make_covariance_matrix(rp_correl_mat, template_rp_coefficient_errors)
    
    # define a vectorised Pandas UDF against the template for comparison of other spectra against it
    @pandas_udf('float')
    def xp_is_similar(bp_coeffs:pd.Series, bp_coeff_errors:pd.Series, bp_correlations:pd.Series, 
                      rp_coeffs:pd.Series, rp_coeff_errors:pd.Series, rp_correlations:pd.Series,
                      gmags:pd.Series) -> pd.Series:
        '''
        Create a similarity metric for the XP continuous representation coefficients with respect 
        to those of a predefined template. Similarity is based on a combined BP and RP Mahalanobis
        distance between the coefficient sets with full accounting for covariance. Input arguments
        are series of the BP and RP coefficients, errors and correlation vectors as stored in table
        xp_continuous_mean_spectrum. The returned object is a corresponding series of floats of the
        quadrature sum of the BP and RP Mahalanobis distances between each candidate spectrum and
        the static template defined above.
        '''
        
        # initialise the results series
        results = pd.Series(np.full(bp_coeffs.size, 0.0))

        # iterate over the data series
        for i in range(bp_coeffs.size):
            
            # normalise the candidate flux coefficients and uncertainties to that of the template
            norm = 10.0**(0.4 * (template_gmag - gmags.iloc[i]))
            bp_coeffs_norm = bp_coeffs.iloc[i] / norm
            bp_coeff_errors_norm = bp_coeff_errors.iloc[i] / norm
            rp_coeffs_norm = rp_coeffs.iloc[i] / norm
            rp_coeff_errors_norm = rp_coeff_errors.iloc[i] / norm
            
            # Mahalanobis distances for the individual BP and RP coefficient sets normalised to the flux level of the template
            bp_mdist = xp_mahalanobis_distance(template_bp_coefficients, template_bp_covariance, bp_coeffs_norm, bp_coeff_errors_norm, bp_correlations.iloc[i])
            rp_mdist = xp_mahalanobis_distance(template_rp_coefficients, template_rp_covariance, rp_coeffs_norm, rp_coeff_errors_norm, rp_correlations.iloc[i])
            
            # combined Mahalanobis distance
            results.iloc[i] = np.sqrt(bp_mdist * bp_mdist + rp_mdist * rp_mdist)
            
        return results

    # add in the similarity statistic
    data_frame = data_frame.withColumn('xp_similar', xp_is_similar(
        data_frame.bp_coefficients, data_frame.bp_coefficient_errors, data_frame.bp_coefficient_correlations, 
        data_frame.rp_coefficients, data_frame.rp_coefficient_errors, data_frame.rp_coefficient_correlations, data_frame.phot_g_mean_mag))
        
    # give back the full set filtering out any nulls (which may result from NaN individual Mahalanobis distances)
    return data_frame.filter(data_frame.xp_similar.isNotNull())



In [5]:
%pyspark

# select a high s/n template for the search, e.g. Proxima Cen
sid = 5853498713190525696

# defined the template data frame
template_df = spark.sql('SELECT xp.*, g.phot_g_mean_mag FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id = xp.source_id WHERE g.source_id = %d'%(sid))

# define a query over the entire dataset, restricting to low reddening for simplicity
query = 'SELECT xp.*, g.phot_g_mean_mag  ' + \
        'FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id = xp.source_id ' + \
        'WHERE g.ag_gspphot < 0.1 AND MOD(g.random_index, 20) = 0'
# TEST: give the template only in the df
#query = 'SELECT * FROM xp_continuous_mean_spectrum WHERE source_id = %d'%(sid)

# sanity check the formatted query
#print(query)

# define a data frame via the query
df = spark.sql(query)

# get any that are similar and show them
similar_df = find_similar_continuous_spectra(df, template_df)

# results quick-look
#similar_df.show()


In [6]:
%pyspark

# collecting the results to a Pandas data frame collects the results to the driver interpreter process
# so this cell will actually action the trawl on the Spark worker cluster. Be prepared to sit back and wait...
top3_pdf = similar_df.sort(similar_df.xp_similar.asc()).limit(3).toPandas()

# sanity check as required
# z.show(top3_pdf)

In [7]:
%pyspark

# convert to sampled form:
sampled_spectra, sampling = convert(top3_pdf, save_file = False)
    
# plot to sanity check:
plot_spectra(sampled_spectra, sampling = sampling, multi=True, show_plot=True, output_path=None, legend=True)



In [8]:
%pyspark

# externally calibrate the spectra
calibrated_spectra, sampling = calibrate(top3_pdf, save_file = False)

# plot the spectra
plot_spectra(calibrated_spectra, sampling = sampling, legend = True)



* [Gaia DR3 XP spectra online documentation](https://gea.esac.esa.int/archive/documentation/GDR3/Data_processing/chap_cu5pho/cu5pho_sec_specProcessing/cu5pho_ssec_specInternCal.html#SSS3)
* [Gaia DR3 spectroscopic data model](https://gea.esac.esa.int/archive/documentation/GDR3/Gaia_archive/chap_datamodel/sec_dm_spectroscopic_tables/)
* [GaiaXPy website](https://gaia-dpci.github.io/GaiaXPy-website/)
* [GaiaXPy API documentation](https://gaiaxpy.readthedocs.io/en/latest/gaiaxpy.html)
* [Vectorised User Defined Functions for PySpark](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html)
