**Author**: Andy Tzanidakis \
Last updated: May 05, 2024

## Overview



### VizieR Query


### Crossmatch to ZTF

### Compute Time-Series Featues with `TAPE`

### Exercise

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

%matplotlib inline
%config InlineBackend.figure_format = "retina"
from matplotlib import rcParams
rcParams['savefig.dpi'] = 550
rcParams['font.size'] = 20
plt.rc('font', family='serif')
import matplotlib as mpl
mpl.rcParams['text.usetex'] = True
mpl.rcParams['axes.linewidth'] = 2

import lsdb
import tape
from tape import Ensemble, ColumnMapper

import dask
dask.config.set({"temporary-directory" :'/epyc/ssd/users/atzanida/tmp'})
dask.config.set({"dataframe.shuffle-compression": 'Snappy'})

from dask.distributed import Client

## VizieR Querying

In [None]:
## ViZier and Aladin querying 
from pyvo import registry  # version >=1.4.1 
from mocpy import MOC
from ipyaladin import Aladin

In [None]:
# the catalogue name in VizieR (Gaia DR3 part 6)
CATALOGUE = "I/360"

catalogue_ivoid = f"ivo://CDS.VizieR/{CATALOGUE}"

# the actual query to the registry
voresource = registry.search(ivoid=catalogue_ivoid)[0]

tables = voresource.get_tables()

# We can also extract the tables names for later use
tables_names = list(tables.keys())

In [None]:
print (f"Available table names: {tables_names}")

In [None]:
# Let's read quickly the table description...
voresource.describe(verbose=True)

In [None]:
# Select the first table name
table_name_1 = "I/360/goldf"
first_table_name = table_name_1

In [None]:
tap_service = voresource.get_service("tap")
tap_records = voresource.get_service("tap").run_sync(f'SELECT TOP 2000000  * \
                            FROM "{first_table_name}" WHERE (DE_ICRS > -30)')

In [None]:
table0 = tap_records.to_table()

# Convert to pandas dataframe
table_df = table0.to_pandas()

In [None]:
# select only M-type stars
table_df = table_df[table_df['SpType'].str.contains('M')]

In [None]:
table_df.head(1)

In [None]:
client = Client(n_workers=6, threads_per_worker=1, memory_limit='auto')

In [None]:
client

In [None]:
%%time
hips_object = lsdb.from_dataframe(
    table_df,
    catalog_name="golden",
    catalog_type="object",
    ra_column="RA_ICRS", 
    dec_column="DE_ICRS")

In [None]:
hips_object

In [None]:
%%time
hips_object.head(1)

## Load Additional Catalogs

In [None]:
# load ZTF source table
ztf_sources = lsdb.read_hipscat("/epyc/data3/hipscat/catalogs/ztf_axs/ztf_zource")

# load ZTF object table
ztf = lsdb.read_hipscat("/epyc/data3/hipscat/catalogs/ztf_axs/ztf_dr14",
                        columns=['ps1_objid', 'nobs_r', 'nobs_g', 'ra', 'dec']) # select only Nobs (gr) bands

In [None]:
ztf

## Crossmatch

In [None]:
xmatch_golden_ztf_object = hips_object.crossmatch(ztf, n_neighbors=1, radius_arcsec=1, require_right_margin=False)

In [None]:
xmatch_golden_ztf_object.head(1)

In [None]:
%%time
# Head of xmatch table with 250k rows for exploration
xmatch_golden_ztf_object_comp = xmatch_golden_ztf_object.compute()

In [None]:
%%time
hips_object_v2 = lsdb.from_dataframe(
    xmatch_golden_ztf_object_comp,
    catalog_name="golden",
    catalog_type="object",
    ra_column="ra_ztf_dr14", 
    dec_column="dec_ztf_dr14")

In [None]:
xmatch_golden_ztf_object_comp.keys()

In [None]:
plt.figure(figsize=(4, 5))
plt.scatter(xmatch_golden_ztf_object_comp['Teff-P_golden'],
             np.log(xmatch_golden_ztf_object_comp['Lum-F_golden']), s=1, 
             color='#28282B')
plt.xlim(plt.xlim(3200, 4700)[::-1])
plt.ylim(-6, -1)
plt.minorticks_on()
plt.xlabel(r"$T_{\rm eff}$ [K]")
plt.ylabel(r"$\log_{10} L/L_\odot$")


In [None]:
%%time
# sync source catalog data to object (xmatched) object
_sources = hips_object_v2.join(
    ztf_sources, left_on="ps1_objid_ztf_dr14", right_on="ps1_objid")

## TAPE

In [None]:
# Initialize an Ensemble
ens = Ensemble(client=client)
ens.client_info()

In [None]:
# ColumnMapper Establishes which table columns map to timeseries quantities
colmap = ColumnMapper(
        id_col='_hipscat_index',
        time_col='mjd',
        flux_col='mag',
        err_col='magerr',
        band_col='band',
      )

ens.from_dask_dataframe(
    source_frame=_sources._ddf,
    object_frame=hips_object_v2._ddf,
    column_mapper=colmap,
    sync_tables=False, # Avoid doing an initial sync
    sorted=True, # If the input data is already sorted by the chosen index
    sort=False,
)

## Calculating Time-Series Features

In [None]:
!pip install cesium

In [None]:
from cesium import featurize

In [None]:
features_to_use = ['amplitude',
                'percent_beyond_1_std',
                'maximum',
                'median',
                'median_absolute_deviation',
                'percent_close_to_median',
                'minimum',
                'skew',
                'std',
                'weighted_average', 
                "flux_percentile_ratio_mid20", 
                "flux_percentile_ratio_mid35",
                "flux_percentile_ratio_mid50",
                "flux_percentile_ratio_mid65",
                "flux_percentile_ratio_mid80",
                "stetson_j",
                  "stetson_k"]

In [None]:
def compute_features(time, mag, magerr, flag, band, custom_cols=features_to_use):
    
    rmv = (flag == 0) & (~np.isnan(flag)) &  (band=='r') & (~np.isnan(mag)) & (~np.isnan(magerr)) & (magerr < 99) & (mag < 99)

    # Removed flagged data points
    time_, mag_, magerr_ = time[rmv], mag[rmv], magerr[rmv]

    # Compute features
    summary_ = {}

    if len(time_) > 0: 
        fset_cesium = featurize.featurize_time_series(
        times=time_,
        values=mag_,
        errors=magerr_,
        features_to_use=custom_cols)
    else:
        fset_cesium = pd.Series(list(np.zeros(len(features_to_use))), index=custom_cols) 
    
    return pd.Series(fset_cesium.values[0], index=custom_cols) 



In [None]:
# Define DataFrame with loc and scale as meta
my_meta = pd.DataFrame(columns=features_to_use, dtype=float)

In [None]:
_sources

In [None]:
%%time
# apply calc_biweight function
calc_ = ens.batch(
    compute_features,
    'mjd_ztf_zource', 'mag_ztf_zource', 
    'magerr_ztf_zource', 'catflags_ztf_zource',
    'band_ztf_zource',
    meta=my_meta,
    use_map=True)

In [None]:
%%time
#ens.object.join(calc_).update_ensemble()

In [None]:
client

In [None]:
dask.config.set({"dataframe.convert-string": False})

In [62]:
%%time
obj_features = calc_.compute()

KilledWorker: Attempted to run task 'perform_join_on-0d672da9-d883-486a-b64f-ba49306503fe' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:42437. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.