# Tuto to Read The sources and Visits and Objects in LSSTComCam

- author : Sylvie Dagoret-Campagne
- affiliation : IJCLab/IN2P3/CNRS
- member : DESC, rubin-inkind
- creation date : 2025-05-03
- last update : 2025-05-03

## Note on work done on LSSTComCam Commissioning : https://sitcomtn-149.lsst.io/
## Note Data Product definition Document : https://lse-163.lsst.io/

In [None]:
import sys, os, glob
import matplotlib.pyplot as plt
import lsst.afw.display as afwDisplay
import numpy as np
import pandas as pd
from astropy.time import Time
# %matplotlib widget

In [None]:
import lsst.geom as geom
from lsst.geom import SpherePoint, degrees, Point2D, Point2I
from lsst.skymap import PatchInfo, Index2D

In [None]:
plt.rcParams["figure.figsize"] = (10, 6)
plt.rcParams["axes.labelsize"] = "x-large"
plt.rcParams["axes.titlesize"] = "x-large"
plt.rcParams["xtick.labelsize"] = "x-large"
plt.rcParams["ytick.labelsize"] = "x-large"

In [None]:
import gc

In [None]:
import traceback

In [None]:
# Define butler
from lsst.daf.butler import Butler

In [None]:
!eups list lsst_distrib

In [None]:
def nJy_to_ab_mag(f_njy):
    """Convert flux in nanojanskys to AB magnitude."""
    if f_njy > 0:
        return -2.5 * np.log10(f_njy) + 31.4
    else:
        return 0.0


def ab_mag_to_nJy(mag_ab):
    """Convert AB magnitude to flux in nanojanskys."""
    return 10 ** ((31.4 - mag_ab) / 2.5)

In [None]:
def select_visits_in_patch(df, tractInfo, patchInfo):
    """
    Sélectionne les lignes du DataFrame correspondant à des visites dans un patch donné.

    Paramètres
    ----------
    df : pandas.DataFrame
        Doit contenir les colonnes 'ra' et 'dec' en degrés.
    tractInfo : lsst.skymap.TractInfo
    patchInfo : lsst.skymap.PatchInfo

    Retourne
    --------
    df_filtered : pandas.DataFrame
        Les lignes du DataFrame dont les RA/Dec sont dans le patch spécifié.
    """
    wcs = tractInfo.getWcs()
    bbox = patchInfo.getOuterBBox()

    def is_inside_patch(ra, dec):
        coord = SpherePoint(ra * degrees, dec * degrees)
        if not tractInfo.contains(coord):
            return False
        pix = wcs.skyToPixel(coord)
        return bbox.contains(Point2I(pix))

    mask = df.apply(lambda row: is_inside_patch(row["ra"], row["dec"]), axis=1)
    return df[mask].copy()

In [None]:
# Tested but long
def assign_tract_patch(df, skyMap):
    """
    Attribue (tract, patch) à chaque ligne d’un DataFrame selon ses coordonnées célestes.

    Paramètres
    ----------
    df : pandas.DataFrame
        Doit contenir les colonnes 'ra' et 'dec' (en degrés).
    skyMap : lsst.skymap.SkyMap
        Obtenu via butler.get("skyMap")

    Retour
    ------
    df_result : pandas.DataFrame
        DataFrame original avec deux colonnes supplémentaires :
        'tract' et 'patch' (tuple (x, y) ou None si hors carte).
    """

    def find_tract_patch(ra, dec):
        coord = SpherePoint(ra * degrees, dec * degrees)
        for tractInfo in skyMap:
            if tractInfo.contains(coord):
                wcs = tractInfo.getWcs()
                pix = wcs.skyToPixel(coord)
                point = Point2I(pix)
                for patchInfo in tractInfo:
                    if patchInfo.getOuterBBox().contains(point):
                        return tractInfo.getId(), patchInfo.getSequentialIndex()
        return None, None

    results = df.apply(lambda row: find_tract_patch(row["ra"], row["dec"]), axis=1)
    df[["tract", "patch"]] = pd.DataFrame(results.tolist(), index=df.index)
    return df

In [None]:
# Not tested
def filter_in_patch_vectorized(df, tractInfo, patchInfo):
    """
    Sélection rapide (vectorisée) d’un sous-ensemble RA/Dec inclus dans un patch donné.

    Attention : plus rapide, mais approximatif si les coordonnées sont très en bordure.

    Paramètres
    ----------
    df : pandas.DataFrame avec colonnes 'ra', 'dec'
    tractInfo : TractInfo
    patchInfo : PatchInfo

    Retour
    ------
    df_filtered : pandas.DataFrame
    """
    wcs = tractInfo.getWcs()
    bbox = patchInfo.getOuterBBox()

    # Conversion RA/Dec → radians
    ra_rad = np.deg2rad(df["ra"].values)
    dec_rad = np.deg2rad(df["dec"].values)

    # Conversion → Point2D (skyToPixel produit Point2D, pas Point2I)
    pix_x = []
    pix_y = []
    for ra, dec in zip(ra_rad, dec_rad):
        sp = SpherePoint(ra, dec)
        pix = wcs.skyToPixel(sp)
        pix_x.append(pix.getX())
        pix_y.append(pix.getY())

    pix_x = np.array(pix_x)
    pix_y = np.array(pix_y)

    # BBox (xmin, xmax, ymin, ymax)
    xmin, xmax = bbox.getMinX(), bbox.getMaxX()
    ymin, ymax = bbox.getMinY(), bbox.getMaxY()

    # Masque vectoriel
    mask = (pix_x >= xmin) & (pix_x <= xmax) & (pix_y >= ymin) & (pix_y <= ymax)

    return df[mask].copy()

## RubinTV, Campaigns , quicklook
- RubinTV : https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/lsstcam
- https://rubinobs.atlassian.net/wiki/spaces/LSSTCOM/pages/467370016/LSSTCam+Commissioning+Planning
- LSSTCam DM campaign : https://rubinobs.atlassian.net/wiki/spaces/DM/pages/48834013/Campaigns#1.1.2.-LSSTCam-Nightly-Validation-Pipeline
- Check campaign also here  https://rubinobs.atlassian.net/wiki/pages/diffpagesbyversion.action?pageId=48834013&selectedPageVersions=145%2C143
- fov-quicklook : https://usdf-rsp-dev.slac.stanford.edu/fov-quicklook/

## Configuration

In [None]:
FLAG_DUMP_COLLECTIONS = False
FLAG_DUMP_DATASETS = False
FLAG_DUMP_OBJECTSTABLECOLUMNS = False
FLAG_CUT_OBJECTSMAG = True
FLAG_DUMP_VISITTABLEECOLUMNS = False
FLAG_COMPUTE_VISITSTRACTSPATCHS = False

In [None]:
MAGCUT = 20.0

In [None]:
all_bands = ["u", "g", "r", "i", "z", "y"]
all_bands_colors = ["blue", "green", "red", "orange", "yellow", "purple"]

### Choose instrument

In [None]:
# instrument = "LSSTCam"

# We focus here on sky fields oberved by LSSTComCam, so we select this camera
instrument = "LSSTComCam"

### Choose options

### For LSSTCam : RubinTV, Campaigns , quicklook
- RubinTV : https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/lsstcam
- https://rubinobs.atlassian.net/wiki/spaces/LSSTCOM/pages/467370016/LSSTCam+Commissioning+Planning
- LSSTCam DM campaign : https://rubinobs.atlassian.net/wiki/spaces/DM/pages/48834013/Campaigns#1.1.2.-LSSTCam-Nightly-Validation-Pipeline
- Check campaign also here  https://rubinobs.atlassian.net/wiki/pages/diffpagesbyversion.action?pageId=48834013&selectedPageVersions=145%2C143
- fov-quicklook : https://usdf-rsp-dev.slac.stanford.edu/fov-quicklook/

### For LSSTComCam check here : 
- Check here the collection available : https://rubinobs.atlassian.net/wiki/spaces/DM/pages/226656354/LSSTComCam+Intermittent+Cumulative+DRP+Runs

In [None]:
if instrument == "LSSTCam":
    repo = "/repo/embargo"
    instrument = "LSSTCam"
    collection_validation = instrument + "/runs/nightlyValidation"
    # collection_quicklook   = instrument + '/runs/quickLookTesting'
    collection_validation = os.path.join(collection_validation, "20250416/d_2025_04_15/DM-50157")
    date_start = 20250415
    date_selection = 20250416
    where_clause = "instrument = '" + f"{instrument}" + "'"
    where_clause_instrument = "instrument = '" + instrument + "'"
    where_clause_date = where_clause + f"and day_obs >= {date_start}"
    skymapName = "lsst_cells_v1"
    BANDSEL = "i"

elif instrument == "LSSTComCam":
    repo = "/repo/main"
    collection_validation = "LSSTComCam/runs/DRP/DP1/w_2025_10/DM-49359"  # work 2025-05-01
    # collection_validation = "LSSTComCam/runs/DRP/DP1/w_2025_17/DM-50530" #updated 2025-05-02
    date_start = 20241024
    date_selection = 20241211
    skymapName = "lsst_cells_v1"
    where_clause = "instrument = '" + instrument + "'"
    where_clause_instrument = "instrument = '" + instrument + "'"
    where_clause_date = where_clause + f"and day_obs >= {date_start}"

    NDET = 9
    TRACTSEL = 5063
    BANDSEL = "i"

In [None]:
collectionStr = collection_validation.replace("/", "_")

## Access to Butler registry

In [None]:
# Initialize the butler repo:
butler = Butler(repo, collections=collection_validation)
registry = butler.registry

## Create a skymap object and Camera

In [None]:
skymap = butler.get("skyMap", skymap=skymapName, collections=collection_validation)

In [None]:
camera = butler.get("camera", collections=collection_validation, instrument=instrument)

## Query for collections in Butler

- remove user collections
- remove calibration products

In [None]:
# mostly setup for LSSTCam
if FLAG_DUMP_COLLECTIONS:
    for _ in sorted(registry.queryCollections(expression=instrument + "/*")):
        if "/calib/" not in _ and "u/" not in _:
            print(_)

## Query for the dataset types in the Butler

- Refer to the Data Product definition Document to know about the definition of datasets
- https://www.lsst.org/about/dm/data-products
- https://lse-163.lsst.io/
- https://docushare.lsst.org/docushare/dsweb/Get/LSE-163

In [None]:
if FLAG_DUMP_DATASETS:
    for datasetType in registry.queryDatasetTypes():
        if registry.queryDatasets(datasetType, collections=collection_validation).any(
            execute=False, exact=False
        ):
            # Limit search results to the data products
            if (
                ("_config" not in datasetType.name)
                and ("_log" not in datasetType.name)
                and ("_metadata" not in datasetType.name)
                and ("_resource_usage" not in datasetType.name)
                and ("Plot" not in datasetType.name)
                and ("Metric" not in datasetType.name)
                and ("metric" not in datasetType.name)
            ):
                if "object" in datasetType.name or "Obj" in datasetType.name:
                    print(datasetType)
                if "source" in datasetType.name or "Source" in datasetType.name:
                    print(datasetType)
                if "visit" in datasetType.name or "Visit" in datasetType.name:
                    print(datasetType)

## Region of interest

In [None]:
lsstcomcam_targets = {}
lsstcomcam_targets["47 Tuc"] = {"field_name": "47 Tuc Globular Cluster", "ra": 6.02, "dec": -72.08}
lsstcomcam_targets["Rubin SV 38 7"] = {"field_name": "Low Ecliptic Latitude Field", "ra": 37.86, "dec": 6.98}
lsstcomcam_targets["Fornax dSph"] = {
    "field_name": "Fornax Dwarf Spheroidal Galaxy",
    "ra": 40.0,
    "dec": -34.45,
}
lsstcomcam_targets["ECDFS"] = {"field_name": "Extended Chandra Deep Field South", "ra": 53.13, "dec": -28.10}
lsstcomcam_targets["EDFS"] = {"field_name": "Euclid Deep Field South", "ra": 59.10, "dec": -48.73}
lsstcomcam_targets["Rubin SV 95 -25"] = {
    "field_name": "Low Galactic Latitude Field",
    "ra": 95.00,
    "dec": -25.0,
}
lsstcomcam_targets["Seagull"] = {"field_name": "Seagull Nebula", "ra": 106.23, "dec": -10.51}

In [None]:
# the_target = lsstcomcam_targets["Seagull"]
# the_target = lsstcomcam_targets["47 Tuc"] # bad
# the_target = lsstcomcam_targets["Fornax dSph"]
# the_target = lsstcomcam_targets["ECDFS"]


# key = "Seagull"
# key = "Fornax dSph"
key = "ECDFS"
# key = "EDFS"
# key = "47 Tuc"
# key = "Rubin SV 38 7"
# key = "Rubin SV 95 -25"

keyfield = "".join(key.split())

the_target = lsstcomcam_targets[key]
target_ra = the_target["ra"]
target_dec = the_target["dec"]
target_title = (
    the_target["field_name"] + f" band  {BANDSEL} " + f" (ra,dec) = ({target_ra:.2f},{target_dec:.2f}) "
)
target_point = SpherePoint(target_ra, target_dec, degrees)

## Get list of tracts from the objectTable_tract

In [None]:
# Find the dimension
print(butler.registry.getDatasetType("objectTable_tract").dimensions)

In [None]:
datasettype = "objectTable_tract"
therefs = butler.registry.queryDatasets(datasettype, collections=collection_validation)
tractsId_list = np.unique([ref.dataId["tract"] for ref in therefs])
tractsId_list = sorted(tractsId_list)
print(tractsId_list)

## Find the Tract and Patch of the region of interest

- tract in tractNbSel
- patch in patchNbSel

In [None]:
tract_info = skymap.findTract(target_point)
patch_info = tract_info.findPatch(target_point)
bbox = patch_info.getOuterBBox()

print("Patch bounding box:", bbox)

print("Tract ID :", tract_info.getId())
tractNbSel = tract_info.getId()

print("Patch Index :", patch_info.getIndex(), " , ", patch_info.getSequentialIndex())  # (x, y)
print("Bounding Box", bbox)

patchNbSel = patch_info.getSequentialIndex()

In [None]:
dataId = {"band": BANDSEL, "tract": tractNbSel, "patch": patchNbSel, "skymap": skymapName}

## The Objects

- Objects are extracted from object detection on deepcoadds
- all bands are included 

In [None]:
print(butler.registry.getDatasetType("objectTable").dimensions)

In [None]:
# cannot add a filter on band
where_clause_skymap = f"skymap = '{skymapName}' AND tract = {tractNbSel} AND patch = {patchNbSel}"
print(where_clause)

In [None]:
dataset_refs = list(
    butler.registry.queryDatasets("objectTable", collections=collection_validation, where=where_clause_skymap)
)
# Récupère un des refs valides
Nrefs = len(dataset_refs)
print(f"Number of objectTables : {Nrefs}")
ref = dataset_refs[0]
t = butler.get(ref)
Nobj = len(t)
# del t
# gc.collect()
print(f"Total Number of objects {Nobj}")

Oui, dans ce type de objectTable, les colonnes comme g_psfFlux, g_kronFlux, g_cModelFlux, etc., sont des flux calibrés (en nJy ou en unité de calibration interne du pipeline). Pour les convertir en magnitudes AB, tu peux utiliser la formule classique :

In [None]:
# Utilise ref.datasetType.name pour lister les colonnes disponibles
if FLAG_DUMP_OBJECTSTABLECOLUMNS:
    t_columns = list(butler.get(ref).columns)
    print(t_columns)

In [None]:
# La constante 31.4 correspond à la conversion AB standard pour un flux exprimé en nanoJanskys (nJy).
# À vérifier selon l’unité exacte utilisée par le pipeline sur ton RSP
# (souvent c’est bien nJy, mais ça peut être autre chose si la calibration a été changée).
# mag = -2.5 * np.log10(flux) + 31.4

## Extract the per-band tables of coordinates

In [None]:
# Check columns definitions in https://lse-163.lsst.io/
all_radecTable = []

for band in all_bands:
    id_name = "objectId"
    id_parentname = "parentObjectId"
    x_name = f"{band}_centroid_x"
    y_name = f"{band}_centroid_y"
    coord_ra_name = "coord_ra"
    coord_dec_name = "coord_dec"
    ra_name = f"{band}_ra"
    dec_name = f"{band}_dec"
    decl_name = f"{band}_decl"
    raerr_name = f"{band}_raErr"
    decerr_name = f"{band}_decErr"
    extendedness_name = f"{band}_extendedness"
    blendness_name = f"{band}_blendedness"
    psfflux_name = f"{band}_psfFlux"
    psfmag_name = f"{band}_psfMag"
    psfflux_free_name = f"{band}_free_psfFlux"
    df = t[
        [
            id_name,
            id_parentname,
            x_name,
            y_name,
            coord_ra_name,
            coord_dec_name,
            ra_name,
            dec_name,
            raerr_name,
            decerr_name,
            decl_name,
            extendedness_name,
            blendness_name,
            psfflux_name,
        ]
    ]

    # select primary objects
    df_sel = (df[df[id_parentname] == 0]).drop([id_parentname], axis=1)
    # compute magnitude AB
    df_sel[psfmag_name] = df_sel[psfflux_name].apply(nJy_to_ab_mag)
    # drop bad magnitudes
    df_sel = df_sel[df_sel[psfmag_name] != 0.0]
    # drop the fluxes
    df_sel = df_sel.drop([psfflux_name], axis=1)

    # select bright objects
    if FLAG_CUT_OBJECTSMAG:
        df_sel = df_sel[df_sel[psfmag_name] < MAGCUT]

    # save table in the list
    all_radecTable.append(df_sel)

In [None]:
for idx, band in enumerate(all_bands):
    df_obj = all_radecTable[idx]
    n = len(df_obj)
    print(f"Number of objects in band {band} : {n}")

## Visits

- Visits allows to make a link between a visit id and an observation time, the (ra,dec) of the telescope, the observation time and observation conditions such airmass, exposure time.
- This allow us to make a selection of visits for which we can select the sources in the Tract-Patch selected

In [None]:
datasettype = "visitTable"

In [None]:
print(butler.registry.getDatasetType(datasettype).dimensions)
print(where_clause_instrument)

In [None]:
dataset_refs = list(
    butler.registry.queryDatasets(
        datasettype, collections=collection_validation, where=where_clause_instrument
    )
)
# Récupère un des refs valides
Nrefs = len(dataset_refs)
print(f"Number of visitTables : {Nrefs}")
ref = dataset_refs[0]
t = butler.get(ref)
Nvis = len(t)
# del t
# gc.collect()
print(f"Total Number of visits {Nvis}")

In [None]:
if FLAG_DUMP_VISITTABLEECOLUMNS:
    t_columns = list(butler.get(ref).columns)
    print(t_columns)

In [None]:
df_visits = t

In [None]:
df_visits.head()

In [None]:
if FLAG_COMPUTE_VISITSTRACTSPATCHS:
    # Takes Very long time to compute.
    # It is better to filter out visits that does not belong to the selected tract patch as shown below
    df_visits_tractspatches = assign_tract_patch(df_visits, skymap)

### select visits in selected Tract and Patch 

- just need visits ra and dec coordinates to filter those visits in the tract-patch.

In [None]:
df_visits_in_patch = select_visits_in_patch(df_visits, tract_info, patch_info)

In [None]:
df_visits_in_patch

### Keep the list in memory

In [None]:
visitLists_sel = df_visits_in_patch.visitId.values

## Get the selected sources

In [None]:
datasettype = "sourceTable_visit"

In [None]:
Nvis_sel = len(visitLists_sel)
print(f"Number of visits {Nvis_sel}")
print(visitLists_sel)

In [None]:
visit_sel = visitLists_sel[0]
butler.registry.expandDataId(instrument=instrument, visit=visit_sel)

In [None]:
def load_sourceTables_for_visits(butler, datasettype, instrument, visit_ids):
    dfs = []
    for visit in visit_ids:
        refs = list(
            butler.registry.queryDatasets(
                datasetType=datasettype, dataId={"instrument": instrument, "visit": visit}, findFirst=True
            )
        )
        if refs:
            df = butler.get(refs[0])
            df["visit"] = visit
            dfs.append(df)
    if dfs:
        return pd.concat(dfs, ignore_index=True)
    else:
        return pd.DataFrame()

In [None]:
# dt = butler.registry.getDatasetType("src")
# print(dt.dimensions)

In [None]:
# crash
# print(f"Fetch the {datasettype} for each visit in the list")
# df_all_sources = load_sourceTables_for_visits(butler, datasettype,instrument, visitLists_sel)

In [None]:
# df_all_sources

### Manage input sources in parquet file

In [None]:
outputdir_sources = f"output_{instrument}_field{keyfield}_t{tractNbSel}_p{patchNbSel}_{datasettype}"
all_inputfiles = glob.glob(f"{outputdir_sources}/*.parquet")

#### Dump columns

In [None]:
inputfile = all_inputfiles[0]
df = pd.read_parquet(inputfile)

In [None]:
print(list(df.columns))

#### Read all sources

In [None]:
# Crash due to too big
df_all_src = pd.concat([pd.read_parquet(f) for f in all_inputfiles], ignore_index=True)

In [None]:
df_all_src

In [None]:
df_src_onevisit = df_all_src[(df_all_src.visit == visit_sel)]

In [None]:
df_src_onevisit

In [None]:
df_src_onevisit[["coord_ra", "ra", "coord_dec", "dec"]].aggregate(["mean", "std"], axis=0)

In [None]:
# fig,(ax1,ax2) = plt.subplots(1,2,figsize=(16,5))
# df_src_onevisit["dra"].hist(ax=ax1,bins=50,range=(-0.001,0.001))
# df_src_onevisit["ddec"].hist(ax=ax2,bins=50,range=(-0.001,0.001))

- coord_ra = ra
- coord_dec = dec

In [None]:
df_src_onevisit[
    ["calibFlux", "psfFlux", "ap12Flux", "ap17Flux", "ap25Flux", "ap35Flux", "calibMag", "calibMagErr"]
]

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(6, 4))

for ib, band in enumerate(all_bands):
    color = all_bands_colors[ib]
    df = df_all_src[df_all_src["band"] == band]
    df["calibMag"].hist(ax=ax, bins=50, histtype="step", color=color, label=band)
ax.legend()
ax.set_yscale("log")
ax.set_xlabel("calibMag")
ax.set_title(f"Magnitude of sources in (tract,patch) = ({tractNbSel},{patchNbSel})")
plt.show()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
for ib, band in enumerate(all_bands):
    color = all_bands_colors[ib]
    df = df_all_src[df_all_src["band"] == band]
    df.plot.scatter(x="calibMag", y="calibMagErr", ax=ax, marker=".", s=10, c=color, label=band, alpha=0.5)
    # produce a crash
ax.legend()
ax.set_xlabel("calibMag")
ax.set_ylabel("calibMagErr")
ax.set_title(f"Magnitude of sources in (tract,patch) = ({tractNbSel},{patchNbSel})")
ax.set_aspect("auto")
plt.show()