# Gaia CV Hunter v2: Scaled Anomaly Detection Pipeline

**Goal:** Find rare cataclysmic variables (CVs), especially short-period dwarf novae, using ML anomaly detection on Gaia DR3 variability statistics.

**Key features:**
- Larger sample size (2000-5000 sources) with smart blue/faint filtering
- Dual ML models: Isolation Forest + One-Class SVM
- **Top 10+ anomaly analysis** with automated TESS light curve extraction
- **Automated multi-survey cross-matching:**
  - VSX (Variable Star Index) - known variable star classifications
  - ROSAT - X-ray detections (accretion indicator)
  - TESS - high-cadence light curves
  - SIMBAD - existing astronomical classifications
  - Gaia DR3 `vari_classifier_result` - automated variability classes
  - ZTF - dense optical light curves for short-period systems
  - GALEX - UV excess from hot accretion disks
- Prioritized candidate output with multi-wavelength evidence scoring

**Author:** Landon Mutch
**Repository:** https://github.com/toadlyBroodle/science

## Cell 1: Install Dependencies

In [None]:
# Cell 1: Install required packages
!pip install astroquery astropy scikit-learn pandas matplotlib seaborn --quiet
print("Dependencies installed!")

## Cell 2: Imports and Configuration

In [None]:
# Cell 2: Imports and configuration
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Astropy/Astroquery
from astropy.coordinates import SkyCoord
import astropy.units as u
from astroquery.gaia import Gaia
from astroquery.vizier import Vizier
from astroquery.simbad import Simbad as SimbadClass
from astroquery.mast import Tesscut, Catalogs

# ML
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# Configuration
CONFIG = {
    'sample_size': 5000,           # Target number of sources
    'bp_rp_max': 0.8,              # Blue color cut (CVs are blue)
    'g_mag_min': 14.0,             # Faint sources (avoid saturated)
    'g_mag_max': 18.5,             # Not too faint (need good S/N)
    'anomaly_percentile': 2,       # Top N% as anomalies
    # Quantitative thresholds (score-based, not count-based)
    'anomaly_score_min': 0.4,      # Min combined anomaly score [0-1] to cross-match
    'require_consensus': True,      # Require BOTH models to flag as anomaly
    'priority_score_min': 10.0,    # Min priority score for TESS LC extraction & deep investigation
    # Search radii
    'tess_search_radius': 21,      # arcsec (TESS pixel = 21")
    'rosat_search_radius': 30,     # arcsec
    'simbad_search_radius': 5,     # arcsec
    'ztf_search_radius': 3,        # arcsec
    'galex_search_radius': 5,      # arcsec
}

print("Configuration:")
for k, v in CONFIG.items():
    print(f"  {k}: {v}")
print("\nImports complete!")

## Cell 3: Query Gaia DR3 with Smart Filtering

Target: Blue, faint variables with good variability statistics

In [None]:
# Cell 3: Query Gaia DR3 for CV candidates
print("="*70)
print("QUERYING GAIA DR3 FOR VARIABLE STAR CANDIDATES")
print("="*70)

# Query focuses on:
# - Blue sources (BP-RP < 0.8) where CVs and WDs live
# - Moderate magnitudes (14 < G < 18.5) for good S/N
# - Sources with variability statistics
# - High amplitude or unusual skewness/kurtosis

query = f"""
SELECT TOP {CONFIG['sample_size']}
    vs.source_id,
    gs.ra, gs.dec,
    gs.phot_g_mean_mag,
    gs.bp_rp,
    gs.parallax,
    gs.parallax_error,
    gs.pmra, gs.pmdec,
    vs.mean_mag_g_fov,
    vs.std_dev_mag_g_fov,
    vs.range_mag_g_fov,
    vs.skewness_mag_g_fov,
    vs.kurtosis_mag_g_fov,
    vs.num_selected_g_fov,
    vs.mean_obs_time_g_fov,
    vs.time_duration_g_fov
FROM gaiadr3.vari_summary AS vs
JOIN gaiadr3.gaia_source AS gs ON vs.source_id = gs.source_id
WHERE gs.bp_rp IS NOT NULL
    AND gs.bp_rp < {CONFIG['bp_rp_max']}
    AND gs.phot_g_mean_mag > {CONFIG['g_mag_min']}
    AND gs.phot_g_mean_mag < {CONFIG['g_mag_max']}
    AND vs.std_dev_mag_g_fov > 0.05
    AND vs.num_selected_g_fov > 10
    AND gs.parallax IS NOT NULL
    AND gs.parallax > 0.1
ORDER BY vs.range_mag_g_fov DESC, vs.source_id ASC
"""

print(f"\nQuery parameters:")
print(f"  Max sample: {CONFIG['sample_size']} (deterministic by range_mag desc, source_id)")
print(f"  Color cut: BP-RP < {CONFIG['bp_rp_max']} (blue sources)")
print(f"  Magnitude range: {CONFIG['g_mag_min']} < G < {CONFIG['g_mag_max']}")
print(f"  Variability: std_dev > 0.05 mag, >10 observations")

print("\nSubmitting async query to Gaia Archive...")
print("(This may take 1-3 minutes)")

try:
    job = Gaia.launch_job_async(query)
    results = job.get_results()
    df = results.to_pandas()
    n_total = len(df)
    print(f"\n✓ Retrieved {n_total} sources from Gaia DR3")
    
    # Deterministic truncation if needed (already ordered by range DESC, source_id ASC)
    if CONFIG['sample_size'] > 0 and n_total > CONFIG['sample_size']:
        df = df.head(CONFIG['sample_size'])
        print(f"  Truncated to top {CONFIG['sample_size']} by variability range (deterministic)")
    print(f"  Working sample: {len(df)} sources")
    
    # Quick stats
    print(f"\nData summary:")
    print(f"  G magnitude: {df['phot_g_mean_mag'].min():.1f} - {df['phot_g_mean_mag'].max():.1f}")
    print(f"  BP-RP color: {df['bp_rp'].min():.2f} - {df['bp_rp'].max():.2f}")
    print(f"  Amplitude range: {df['range_mag_g_fov'].min():.2f} - {df['range_mag_g_fov'].max():.2f} mag")
    
except Exception as e:
    print(f"\nError: {e}")
    print("\nTry reducing sample_size or running again.")

## Cell 4: Feature Engineering

In [None]:
# Cell 4: Feature engineering for anomaly detection
print("="*70)
print("FEATURE ENGINEERING")
print("="*70)

# Calculate derived features
df['abs_mag_g'] = df['phot_g_mean_mag'] + 5 * np.log10(df['parallax'] / 100)
df['amplitude'] = df['range_mag_g_fov']
df['rel_std'] = df['std_dev_mag_g_fov'] / df['mean_mag_g_fov'].abs()
df['proper_motion'] = np.sqrt(df['pmra']**2 + df['pmdec']**2)
df['distance_pc'] = 1000 / df['parallax']

# Features for ML
feature_cols = [
    'amplitude',
    'std_dev_mag_g_fov',
    'skewness_mag_g_fov',
    'kurtosis_mag_g_fov',
    'bp_rp',
    'abs_mag_g',
    'rel_std',
    'proper_motion'
]

# Remove rows with NaN in features
df_clean = df.dropna(subset=feature_cols).copy()
print(f"\nSources after cleaning: {len(df_clean)} (dropped {len(df) - len(df_clean)} with NaN)")

# Prepare feature matrix
X = df_clean[feature_cols].values

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print(f"\nFeatures used ({len(feature_cols)}):")
for i, col in enumerate(feature_cols):
    print(f"  {i+1}. {col}: mean={X[:, i].mean():.3f}, std={X[:, i].std():.3f}")

print("\n✓ Features prepared and scaled")

## Cell 5: Dual ML Anomaly Detection

Using both Isolation Forest and One-Class SVM for robustness

In [None]:
# Cell 5: Dual ML anomaly detection
print("="*70)
print("ANOMALY DETECTION: ISOLATION FOREST + ONE-CLASS SVM")
print("="*70)

contamination = CONFIG['anomaly_percentile'] / 100

# Model 1: Isolation Forest
print("\n1. Training Isolation Forest...")
iso_forest = IsolationForest(
    contamination=contamination,
    random_state=42,
    n_estimators=200,
    max_samples='auto'
)
iso_labels = iso_forest.fit_predict(X_scaled)
iso_scores = -iso_forest.score_samples(X_scaled)  # Higher = more anomalous

n_iso_anomalies = (iso_labels == -1).sum()
print(f"   Isolation Forest anomalies: {n_iso_anomalies}")

# Model 2: One-Class SVM
print("\n2. Training One-Class SVM...")
oc_svm = OneClassSVM(
    nu=contamination,
    kernel='rbf',
    gamma='scale'
)
svm_labels = oc_svm.fit_predict(X_scaled)
svm_scores = -oc_svm.score_samples(X_scaled)  # Higher = more anomalous

n_svm_anomalies = (svm_labels == -1).sum()
print(f"   One-Class SVM anomalies: {n_svm_anomalies}")

# Combined scoring
# Normalize scores to [0, 1] and average
iso_norm = (iso_scores - iso_scores.min()) / (iso_scores.max() - iso_scores.min())
svm_norm = (svm_scores - svm_scores.min()) / (svm_scores.max() - svm_scores.min())
combined_score = (iso_norm + svm_norm) / 2

# Add scores to dataframe
df_clean['iso_score'] = iso_scores
df_clean['svm_score'] = svm_scores
df_clean['combined_score'] = combined_score
df_clean['iso_anomaly'] = iso_labels == -1
df_clean['svm_anomaly'] = svm_labels == -1
df_clean['both_anomaly'] = (iso_labels == -1) & (svm_labels == -1)

n_both = df_clean['both_anomaly'].sum()
print(f"\n3. Consensus anomalies (flagged by BOTH models): {n_both}")

# Rank by combined score
df_clean = df_clean.sort_values('combined_score', ascending=False)

print("\n✓ Anomaly detection complete")

## Cell 6: Visualize Anomalies

In [None]:
# Cell 6: Visualize anomalies
print("="*70)
print("ANOMALY VISUALIZATION")
print("="*70)

fig, axes = plt.subplots(2, 2, figsize=(14, 12))

# Select candidates by quantitative threshold, not fixed count
score_threshold = CONFIG['anomaly_score_min']
if CONFIG['require_consensus']:
    top_candidates = df_clean[
        (df_clean['combined_score'] >= score_threshold) & df_clean['both_anomaly']
    ].copy()
else:
    top_candidates = df_clean[
        df_clean['combined_score'] >= score_threshold
    ].copy()

n_selected = len(top_candidates)
print(f"\nCandidates selected: {n_selected}")
print(f"  Score threshold: combined_score >= {score_threshold}")
print(f"  Consensus required: {CONFIG['require_consensus']}")
if n_selected > 0:
    print(f"  Score range: {top_candidates['combined_score'].min():.3f} - {top_candidates['combined_score'].max():.3f}")

# 1. Color-Magnitude Diagram
ax1 = axes[0, 0]
ax1.scatter(df_clean['bp_rp'], df_clean['abs_mag_g'],
            c=df_clean['combined_score'], cmap='viridis',
            s=5, alpha=0.5)
ax1.scatter(top_candidates['bp_rp'], top_candidates['abs_mag_g'],
            c='red', s=50, marker='*', label=f'{n_selected} anomalies (score>={score_threshold})', zorder=10)
ax1.invert_yaxis()
ax1.set_xlabel('BP-RP Color')
ax1.set_ylabel('Absolute G Magnitude')
ax1.set_title('Color-Magnitude Diagram')
ax1.legend()

# 2. Amplitude vs Skewness
ax2 = axes[0, 1]
ax2.scatter(df_clean['amplitude'], df_clean['skewness_mag_g_fov'],
            c=df_clean['combined_score'], cmap='viridis',
            s=5, alpha=0.5)
ax2.scatter(top_candidates['amplitude'], top_candidates['skewness_mag_g_fov'],
            c='red', s=50, marker='*', zorder=10)
ax2.set_xlabel('Amplitude (mag)')
ax2.set_ylabel('Skewness')
ax2.set_title('Amplitude vs Skewness')
ax2.axhline(0, color='gray', linestyle='--', alpha=0.5)

# 3. Skewness vs Kurtosis
ax3 = axes[1, 0]
ax3.scatter(df_clean['skewness_mag_g_fov'], df_clean['kurtosis_mag_g_fov'],
            c=df_clean['combined_score'], cmap='viridis',
            s=5, alpha=0.5)
ax3.scatter(top_candidates['skewness_mag_g_fov'], top_candidates['kurtosis_mag_g_fov'],
            c='red', s=50, marker='*', zorder=10)
ax3.set_xlabel('Skewness')
ax3.set_ylabel('Kurtosis')
ax3.set_title('Light Curve Shape: Skewness vs Kurtosis')
ax3.axhline(3, color='gray', linestyle='--', alpha=0.5, label='Gaussian')
ax3.axvline(0, color='gray', linestyle='--', alpha=0.5)

# 4. Score distribution with threshold
ax4 = axes[1, 1]
ax4.hist(df_clean['combined_score'], bins=50, alpha=0.7, label='All sources')
ax4.axvline(score_threshold, color='red', linestyle='--', lw=2,
            label=f'Threshold = {score_threshold} ({n_selected} selected)')
# Also show consensus-only count
n_consensus = df_clean['both_anomaly'].sum()
ax4.axvline(df_clean[df_clean['both_anomaly']]['combined_score'].min(), color='orange',
            linestyle=':', lw=1.5, label=f'Both-model consensus ({n_consensus} total)')
ax4.set_xlabel('Combined Anomaly Score')
ax4.set_ylabel('Count')
ax4.set_title('Anomaly Score Distribution')
ax4.legend(fontsize=8)

plt.tight_layout()
plt.savefig('anomaly_detection_v2.png', dpi=150, bbox_inches='tight')
plt.show()

print(f"\n✓ Saved to anomaly_detection_v2.png")
print(f"  {n_selected} candidates passed threshold (vs fixed top-N approach)")

## Cell 7: Cross-match with VSX (Variable Star Index)

In [None]:
# Cell 7: Cross-match top candidates with VSX
print("="*70)
print("CROSS-MATCHING WITH VSX (Variable Star Index)")
print("="*70)

# top_candidates already defined by score threshold in Cell 6
print(f"\nChecking {len(top_candidates)} anomaly candidates against VSX...")
print(f"  (selected by combined_score >= {CONFIG['anomaly_score_min']}, consensus={CONFIG['require_consensus']})")

# VSX catalog in VizieR
vsx_catalog = 'B/vsx/vsx'
Vizier.ROW_LIMIT = 5

vsx_names = []
vsx_types = []
vsx_periods = []
in_vsx_flags = []

for i, (idx, row) in enumerate(top_candidates.iterrows()):
    coord = SkyCoord(ra=row['ra']*u.deg, dec=row['dec']*u.deg, frame='icrs')
    if (i + 1) % 10 == 0 or i == 0 or i + 1 == len(top_candidates):
        print(f"  [{i+1}/{len(top_candidates)}]...", flush=True)

    try:
        result = Vizier.query_region(coord, radius=5*u.arcsec, catalog=vsx_catalog)

        if result and len(result) > 0 and len(result[0]) > 0:
            vsx = result[0][0]
            vsx_names.append(vsx['Name'] if 'Name' in vsx.colnames else 'Unknown')
            vsx_types.append(vsx['Type'] if 'Type' in vsx.colnames else 'VAR')
            vsx_periods.append(float(vsx['Period']) if 'Period' in vsx.colnames and vsx['Period'] else None)
            in_vsx_flags.append(True)
        else:
            vsx_names.append(None)
            vsx_types.append(None)
            vsx_periods.append(None)
            in_vsx_flags.append(False)
    except Exception as e:
        vsx_names.append(None)
        vsx_types.append(None)
        vsx_periods.append(None)
        in_vsx_flags.append(False)

# Assign directly
top_candidates['vsx_name'] = vsx_names
top_candidates['vsx_type'] = vsx_types
top_candidates['vsx_period'] = vsx_periods
top_candidates['in_vsx'] = in_vsx_flags

n_in_vsx = top_candidates['in_vsx'].sum()
n_not_in_vsx = len(top_candidates) - n_in_vsx
print(f"\n✓ VSX cross-match complete:")
print(f"   In VSX: {n_in_vsx}")
print(f"   NOT in VSX (potentially uncatalogued): {n_not_in_vsx}")

# Show VSX types found
if n_in_vsx > 0:
    print(f"\nVSX types found:")
    type_counts = top_candidates[top_candidates['in_vsx']]['vsx_type'].value_counts()
    for vtype, count in type_counts.items():
        print(f"   {vtype}: {count}")

## Cell 8: Cross-match with ROSAT (X-ray)

In [None]:
# Cell 8: Cross-match with ROSAT X-ray catalog
print("="*70)
print("CROSS-MATCHING WITH ROSAT (X-ray Survey)")
print("="*70)

print(f"\nChecking {len(top_candidates)} candidates for X-ray counterparts...")
print("(X-ray detection is strong evidence for CVs)")

# ROSAT catalogs
rosat_catalogs = [
    'IX/10A/1rxs',    # ROSAT All-Sky Survey Bright Source Catalog
    'IX/29/rass_fsc'  # ROSAT All-Sky Survey Faint Source Catalog
]

has_xray_flags = []
xray_rates = []

for i, (idx, row) in enumerate(top_candidates.iterrows()):
    coord = SkyCoord(ra=row['ra']*u.deg, dec=row['dec']*u.deg, frame='icrs')
    found_xray = False
    xray_flux = None

    for cat in rosat_catalogs:
        try:
            result = Vizier.query_region(coord, radius=CONFIG['rosat_search_radius']*u.arcsec, catalog=cat)
            if result and len(result) > 0 and len(result[0]) > 0:
                found_xray = True
                if 'Count' in result[0].colnames:
                    xray_flux = float(result[0][0]['Count'])
                break
        except:
            pass

    has_xray_flags.append(found_xray)
    xray_rates.append(xray_flux)
    if (i + 1) % 10 == 0 or i + 1 == len(top_candidates):
        print(f"  [{i+1}/{len(top_candidates)}]...", flush=True)

# Assign directly
top_candidates['has_xray'] = has_xray_flags
top_candidates['xray_count_rate'] = xray_rates

n_xray = top_candidates['has_xray'].sum()
print(f"\n✓ ROSAT cross-match complete:")
print(f"   X-ray detections: {n_xray} / {len(top_candidates)}")

if n_xray > 0:
    print(f"\n   ⭐ X-ray sources are HIGH PRIORITY CV candidates!")

## Cell 9: Check TESS Coverage

In [None]:
# Cell 9: Check TESS coverage for top candidates
from concurrent.futures import ThreadPoolExecutor, as_completed

print("="*70)
print("CHECKING TESS COVERAGE")
print("="*70)

n_cands = len(top_candidates)
print(f"\nQuerying TIC for {n_cands} candidates...")

BATCH = 5

def _query_tic(args):
    idx, ra, dec = args
    try:
        coord = SkyCoord(ra=ra*u.deg, dec=dec*u.deg, frame="icrs")
        tic_result = Catalogs.query_region(coord, radius=CONFIG["tess_search_radius"]*u.arcsec, catalog="TIC")
        if tic_result and len(tic_result) > 0:
            tic = tic_result[0]
            teff = float(tic["Teff"]) if tic["Teff"] and not np.ma.is_masked(tic["Teff"]) else None
            rad = float(tic["rad"]) if tic["rad"] and not np.ma.is_masked(tic["rad"]) else None
            return idx, int(tic["ID"]), teff, rad
    except:
        pass
    return idx, None, None, None

tic_args = [(idx, row["ra"], row["dec"]) for idx, row in top_candidates.iterrows()]
tic_map = {}
done = 0

for b0 in range(0, len(tic_args), BATCH):
    batch = tic_args[b0:b0 + BATCH]
    with ThreadPoolExecutor(max_workers=BATCH) as pool:
        futs = {pool.submit(_query_tic, a): a[0] for a in batch}
        for f in as_completed(futs, timeout=60):
            try:
                idx, tic_id, teff, rad = f.result(timeout=30)
                if tic_id is not None:
                    tic_map[idx] = {"tic_id": tic_id, "teff": teff, "rad": rad}
            except:
                pass
    done = min(b0 + BATCH, n_cands)
    if done % 10 == 0 or done == n_cands:
        print(f"  {done}/{n_cands} ({len(tic_map)} matched)", flush=True)

# Assign results - skip slow Tesscut.get_sectors(), sector info discovered at cutout time
tic_ids, tess_teffs, tess_rads = [], [], []
for idx in top_candidates.index:
    tm = tic_map.get(idx, {})
    tic_ids.append(tm.get("tic_id"))
    tess_teffs.append(tm.get("teff"))
    tess_rads.append(tm.get("rad"))

top_candidates["tic_id"] = tic_ids
top_candidates["tess_teff"] = tess_teffs
top_candidates["tess_rad"] = tess_rads
top_candidates["n_tess_sectors"] = [1 if tid else 0 for tid in tic_ids]  # proxy; real count at cutout time
top_candidates["tess_sectors"] = [[] for _ in tic_ids]

n_with_tic = sum(1 for t in tic_ids if t is not None)
print(f"\n\u2713 TIC cross-match complete:")
print(f"   TIC matches: {n_with_tic} / {n_cands}")
print(f"   (Sector counts determined during light curve extraction)")


## Cell 10: Cross-match with SIMBAD

Check for any existing astronomical classifications in SIMBAD. Sources NOT in SIMBAD are potentially novel discoveries.

In [None]:
# Cell 10: Cross-match with SIMBAD for existing classifications
print("="*70)
print("CROSS-MATCHING WITH SIMBAD")
print("="*70)

n_cands = len(top_candidates)
print(f"\nChecking {n_cands} candidates via SIMBAD TAP batch query...")

# --- Approach: single SIMBAD TAP cross-match query ---
from astropy.table import Table as AstropyTable
import tempfile, os

upload_tbl = AstropyTable()
upload_tbl["source_id"] = [int(s) for s in top_candidates["source_id"]]
upload_tbl["ra"] = top_candidates["ra"].values
upload_tbl["dec"] = top_candidates["dec"].values

radius_deg = CONFIG["simbad_search_radius"] / 3600.0

simbad_tap_query = f"""
SELECT u.source_id, b.main_id, b.otype, b.otypes,
       DISTANCE(POINT('ICRS', b.ra, b.dec), POINT('ICRS', u.ra, u.dec)) AS sep
FROM TAP_UPLOAD.cands AS u
LEFT JOIN basic AS b
  ON 1=CONTAINS(POINT('ICRS', b.ra, b.dec), CIRCLE('ICRS', u.ra, u.dec, {radius_deg:.6f}))
ORDER BY u.source_id, sep
"""

simbad_results = {}
tap_success = False

try:
    from astroquery.simbad import Simbad as SimbadQ
    print("  Submitting TAP query...", flush=True)
    result = SimbadQ.query_tap(simbad_tap_query, cands=upload_tbl)
    if result is not None and len(result) > 0:
        tap_success = True
        # Take closest match per source_id
        seen = set()
        for row in result:
            sid = int(row["source_id"])
            if sid in seen:
                continue
            seen.add(sid)
            mid = str(row["main_id"]).strip() if row["main_id"] and not np.ma.is_masked(row["main_id"]) else None
            otype = str(row["otype"]).strip() if row["otype"] and not np.ma.is_masked(row["otype"]) else None
            otypes = str(row["otypes"]).strip() if row["otypes"] and not np.ma.is_masked(row["otypes"]) else None
            if mid:
                simbad_results[sid] = (True, mid, otype, otypes)
        print(f"  TAP returned {len(result)} rows, {len(simbad_results)} unique matches", flush=True)
except Exception as e:
    print(f"  TAP query failed: {e}", flush=True)

# --- Fallback: sequential queries if TAP failed ---
if not tap_success:
    print("  Falling back to sequential queries...", flush=True)
    custom_simbad = SimbadClass()
    custom_simbad.add_votable_fields("otype", "otypes")
    custom_simbad.TIMEOUT = 10

    def _simbad_val(table, col_variants):
        cols_lower = {c.lower(): c for c in table.colnames}
        for name in col_variants:
            actual = cols_lower.get(name.lower())
            if actual:
                val = table[0][actual]
                return str(val).strip() if not np.ma.is_masked(val) else None
        return None

    for i, (idx, row) in enumerate(top_candidates.iterrows()):
        sid = int(row["source_id"])
        if sid in simbad_results:
            continue
        try:
            result = custom_simbad.query_object(f"Gaia DR3 {sid}")
            if result is None:
                coord = SkyCoord(ra=row["ra"]*u.deg, dec=row["dec"]*u.deg, frame="icrs")
                result = custom_simbad.query_region(coord, radius=CONFIG["simbad_search_radius"]*u.arcsec)
            if result is not None and len(result) > 0:
                name = _simbad_val(result, ["MAIN_ID", "main_id"])
                otype = _simbad_val(result, ["OTYPE", "otype", "main_type"])
                otypes = _simbad_val(result, ["OTYPES", "otypes", "all_types"])
                if name:
                    simbad_results[sid] = (True, name, otype, otypes)
        except:
            pass
        if (i + 1) % 10 == 0 or i + 1 == n_cands:
            print(f"    {i+1}/{n_cands}", flush=True)

# --- Assign results ---
simbad_names, simbad_otypes, simbad_all_otypes, in_simbad_flags = [], [], [], []
for idx, row in top_candidates.iterrows():
    sid = int(row["source_id"])
    if sid in simbad_results:
        found, name, otype, otypes = simbad_results[sid]
        simbad_names.append(name)
        simbad_otypes.append(otype)
        simbad_all_otypes.append(otypes)
        in_simbad_flags.append(True)
    else:
        simbad_names.append(None)
        simbad_otypes.append(None)
        simbad_all_otypes.append(None)
        in_simbad_flags.append(False)

top_candidates["simbad_name"] = simbad_names
top_candidates["simbad_otype"] = simbad_otypes
top_candidates["simbad_otypes"] = simbad_all_otypes
top_candidates["in_simbad"] = in_simbad_flags

n_in_simbad = sum(in_simbad_flags)
n_not_simbad = n_cands - n_in_simbad

print(f"\n\u2713 SIMBAD cross-match complete:")
print(f"   Found in SIMBAD: {n_in_simbad} / {n_cands}")
print(f"   NOT in SIMBAD: {n_not_simbad}")

if n_in_simbad > 0:
    print(f"\nSIMBAD object types:")
    type_counts = top_candidates[top_candidates["in_simbad"]]["simbad_otype"].value_counts()
    for otype, count in type_counts.items():
        print(f"   {otype}: {count}")
    cv_types = ["CV", "CV*", "No*", "DN*", "DQ*", "AM*", "NL*", "XB*",
                "CataclyV*", "Nova", "DwarfNova", "AMHer", "NovaLike"]
    cv_matches = top_candidates[top_candidates["simbad_otype"].isin(cv_types)]
    if len(cv_matches) > 0:
        print(f"\n   \u2b50 {len(cv_matches)} sources have CV-related SIMBAD types!")

if n_not_simbad > 0:
    print(f"\n   \u2b50 {n_not_simbad} sources NOT in SIMBAD - potentially novel discoveries!")


## Cell 11: Gaia DR3 Variability Classifier

Query `gaiadr3.vari_classifier_result` to check if Gaia's automated pipeline already classified these sources.

In [None]:
# Cell 11: Cross-match with Gaia DR3 variability classifier
print("="*70)
print("CROSS-MATCHING WITH GAIA DR3 VARIABILITY CLASSIFIER")
print("="*70)

print(f"\nQuerying gaiadr3.vari_classifier_result for {len(top_candidates)} candidates...")

source_ids = top_candidates['source_id'].astype(int).tolist()

# Query in batches
batch_size = 30
vari_lookup = {}  # source_id -> (class, score)

for i in range(0, len(source_ids), batch_size):
    batch = source_ids[i:i+batch_size]
    id_list = ','.join(str(sid) for sid in batch)
    print(f"  Batch {i//batch_size + 1}: querying {len(batch)} sources...", end=' ', flush=True)
    
    vari_query = f"""
    SELECT source_id, best_class_name, best_class_score
    FROM gaiadr3.vari_classifier_result
    WHERE source_id IN ({id_list})
    """
    
    try:
        job = Gaia.launch_job(vari_query)
        result = job.get_results()
        if result and len(result) > 0:
            for row in result:
                vari_lookup[int(row['source_id'])] = (
                    str(row['best_class_name']),
                    float(row['best_class_score'])
                )
            print(f"{len(result)} classified")
        else:
            print("0 classified")
    except Exception as e:
        print(f"error: {str(e)[:50]}")

# Assign directly to avoid merge issues
top_candidates['gaia_vari_class'] = top_candidates['source_id'].apply(
    lambda sid: vari_lookup.get(int(sid), (None, None))[0])
top_candidates['gaia_vari_score'] = top_candidates['source_id'].apply(
    lambda sid: vari_lookup.get(int(sid), (None, None))[1])

n_classified = top_candidates['gaia_vari_class'].notna().sum()
print(f"\n✓ Gaia DR3 variability classifier results:")
print(f"   Classified by Gaia: {n_classified} / {len(top_candidates)}")

if n_classified > 0:
    print(f"\nGaia variability classifications:")
    class_counts = top_candidates['gaia_vari_class'].dropna().value_counts()
    for cls, count in class_counts.items():
        print(f"   {cls}: {count}")

## Cell 12: Cross-match with ZTF

ZTF provides dense optical light curves with better cadence than Gaia, ideal for resolving short-period orbital modulations and outburst profiles.

In [None]:
# Cell 12: Cross-match with ZTF (Zwicky Transient Facility)
print("="*70)
print("CROSS-MATCHING WITH ZTF (Zwicky Transient Facility)")
print("="*70)

print(f"\nChecking {len(top_candidates)} candidates in ZTF catalogs...")
print("(ZTF provides dense optical light curves ideal for short-period systems)")

# ZTF-related VizieR catalogs (periodic variables + variable candidates)
ztf_catalogs = [
    'J/ApJS/249/18',   # ZTF Catalog of Periodic Variable Stars (Chen+ 2020)
    'J/AJ/159/198',    # CVs in ZTF Year 1 (Szkody+ 2020)
    'J/MNRAS/499/5782', # ZTF DR1 variable source candidates (Ofek+ 2020)
]

Vizier.ROW_LIMIT = 5
first_match_logged = False

in_ztf_flags = []
ztf_nobs_list = []
ztf_period_list = []
ztf_catalog_list = []
ztf_mag_list = []

for i, (idx, row) in enumerate(top_candidates.iterrows()):
    coord = SkyCoord(ra=row['ra']*u.deg, dec=row['dec']*u.deg, frame='icrs')
    gaia_id = int(row['source_id'])
    print(f"  [{i+1:2d}/{len(top_candidates)}] Gaia DR3 {gaia_id}...", end=' ', flush=True)

    found = False
    nobs, period, mag, cat_name = 0, None, None, None

    for cat in ztf_catalogs:
        try:
            result = Vizier(columns=['**'], row_limit=5, timeout=15).query_region(
                coord, radius=CONFIG['ztf_search_radius']*u.arcsec, catalog=cat)
            if result and len(result) > 0 and len(result[0]) > 0:
                table = result[0]
                found = True
                cat_name = cat

                # Log column names on first match for debugging
                if not first_match_logged:
                    print(f"\n    [DEBUG] Matched catalog {cat}, columns: {table.colnames}")
                    first_match_logged = True

                r0 = table[0]
                # Extract observation counts
                # Chen+2020 uses Ng, Nr; others may use nobs, Nobs, etc.
                for col in table.colnames:
                    cl = col.lower()
                    if cl in ('ng', 'nr', 'nobs', 'nepochs', 'numobs', 'ndet', 'n'):
                        try:
                            val = r0[col]
                            if not np.ma.is_masked(val):
                                nobs += int(val)
                        except: pass
                    elif 'nobs' in cl or 'nepoch' in cl or 'ngood' in cl or 'ndet' in cl:
                        try:
                            val = r0[col]
                            if not np.ma.is_masked(val):
                                nobs += int(val)
                        except: pass

                # Extract period - Chen+2020 uses Per-g, Per-r
                for col in table.colnames:
                    cl = col.lower()
                    if cl in ('per', 'period', 'p', 'per-g', 'per-r') or cl.startswith('per'):
                        try:
                            val = r0[col]
                            if not np.ma.is_masked(val) and float(val) > 0:
                                period = float(val)
                                break
                        except: pass

                # Extract magnitude
                for col in table.colnames:
                    cl = col.lower()
                    if cl in ('gmag', 'rmag', 'magg', 'magr', 'mag'):
                        try:
                            val = r0[col]
                            if not np.ma.is_masked(val):
                                mag = float(val)
                                break
                        except: pass

                parts = []
                if nobs > 0: parts.append(f"{nobs} obs")
                if period: parts.append(f"P={period:.4f}d")
                if mag: parts.append(f"mag={mag:.1f}")
                print(f"ZTF [{cat.split('/')[-1]}]: {', '.join(parts) if parts else 'matched'}")
                break
        except Exception as e:
            continue

    if not found:
        print("not in ZTF catalogs")

    in_ztf_flags.append(found)
    ztf_nobs_list.append(nobs)
    ztf_period_list.append(period)
    ztf_catalog_list.append(cat_name)
    ztf_mag_list.append(mag)

# Assign directly
top_candidates['in_ztf'] = in_ztf_flags
top_candidates['ztf_nobs'] = ztf_nobs_list
top_candidates['ztf_period'] = ztf_period_list
top_candidates['ztf_catalog'] = ztf_catalog_list
top_candidates['ztf_mag'] = ztf_mag_list

# Drop old split columns if they exist from prior runs
for old_col in ['ztf_nobs_g', 'ztf_nobs_r', 'ztf_med_mag_g', 'ztf_med_mag_r']:
    if old_col in top_candidates.columns:
        top_candidates.drop(columns=[old_col], inplace=True)

n_in_ztf = top_candidates['in_ztf'].sum()
print(f"\n✓ ZTF cross-match complete:")
print(f"   Found in ZTF catalogs: {n_in_ztf} / {len(top_candidates)}")

if n_in_ztf > 0:
    with_period = top_candidates['ztf_period'].notna().sum()
    with_nobs = (top_candidates['ztf_nobs'] > 0).sum()
    print(f"   With ZTF periods: {with_period}")
    print(f"   With observation counts: {with_nobs}")
    for _, r in top_candidates[top_candidates['in_ztf']].iterrows():
        parts = []
        if r['ztf_nobs'] > 0: parts.append(f"{r['ztf_nobs']} obs")
        if pd.notna(r['ztf_period']): parts.append(f"P={r['ztf_period']:.5f}d ({r['ztf_period']*24*60:.1f}min)")
        print(f"   Gaia DR3 {int(r['source_id'])}: {', '.join(parts) if parts else 'matched'}")

print(f"\n   NOTE: VizieR ZTF catalogs only contain periodic variables (Chen+ 2020)")
print(f"   and known CVs (Szkody+ 2020). Most dwarf novae show aperiodic outbursts")
print(f"   and won't appear in these catalogs.")
print(f"   For dense ZTF light curves, request forced photometry from IRSA:")
print(f"   https://irsa.ipac.caltech.edu/cgi-bin/ZTF/nph_light_curves")


## Cell 13: Cross-match with GALEX (UV)

UV excess from a hot accretion disk is a strong CV indicator. GALEX FUV/NUV detections are especially diagnostic.

In [None]:
# Cell 13: Cross-match with GALEX for UV emission
print("="*70)
print("CROSS-MATCHING WITH GALEX (UV Survey)")
print("="*70)

print(f"\nChecking {len(top_candidates)} candidates for UV counterparts...")
print("(UV excess indicates hot accretion disk - strong CV indicator)")

galex_catalog = 'II/335/galex_ais'
Vizier.ROW_LIMIT = 3

in_galex_flags = []
galex_fuv_list = []
galex_nuv_list = []
galex_fuv_nuv_list = []

for i, (idx, row) in enumerate(top_candidates.iterrows()):
    coord = SkyCoord(ra=row['ra']*u.deg, dec=row['dec']*u.deg, frame='icrs')
    print(f"  [{i+1:2d}/{len(top_candidates)}] Gaia DR3 {int(row['source_id'])}...", end=' ', flush=True)
    
    found = False
    fuv, nuv, fuv_nuv = None, None, None
    
    try:
        result = Vizier.query_region(coord, radius=CONFIG['galex_search_radius']*u.arcsec,
                                     catalog=galex_catalog)
        
        if result and len(result) > 0 and len(result[0]) > 0:
            galex = result[0][0]
            found = True
            fuv = float(galex['FUVmag']) if 'FUVmag' in galex.colnames and not np.ma.is_masked(galex['FUVmag']) else None
            nuv = float(galex['NUVmag']) if 'NUVmag' in galex.colnames and not np.ma.is_masked(galex['NUVmag']) else None
            fuv_nuv = (fuv - nuv) if (fuv is not None and nuv is not None) else None
            
            parts = []
            if fuv is not None: parts.append(f"FUV={fuv:.1f}")
            if nuv is not None: parts.append(f"NUV={nuv:.1f}")
            print(' '.join(parts) if parts else "detected (no mags)")
        else:
            print("not in GALEX")
    except Exception as e:
        print(f"error: {str(e)[:40]}")
    
    in_galex_flags.append(found)
    galex_fuv_list.append(fuv)
    galex_nuv_list.append(nuv)
    galex_fuv_nuv_list.append(fuv_nuv)

# Assign directly to avoid merge issues
top_candidates['in_galex'] = in_galex_flags
top_candidates['galex_fuv'] = galex_fuv_list
top_candidates['galex_nuv'] = galex_nuv_list
top_candidates['galex_fuv_nuv'] = galex_fuv_nuv_list

n_galex = top_candidates['in_galex'].sum()
n_fuv = top_candidates['galex_fuv'].notna().sum()
n_nuv = top_candidates['galex_nuv'].notna().sum()

print(f"\n✓ GALEX cross-match complete:")
print(f"   Any UV detection: {n_galex} / {len(top_candidates)}")
print(f"   FUV detections: {n_fuv}")
print(f"   NUV detections: {n_nuv}")

if n_fuv > 0:
    print(f"\n   ⭐ FUV-detected sources are HIGH PRIORITY CV candidates!")
    fuv_sources = top_candidates[top_candidates['galex_fuv'].notna()]
    for _, r in fuv_sources.iterrows():
        fuv_str = f"FUV={r['galex_fuv']:.1f}" if pd.notna(r['galex_fuv']) else ""
        nuv_str = f"NUV={r['galex_nuv']:.1f}" if pd.notna(r['galex_nuv']) else ""
        print(f"      Gaia DR3 {int(r['source_id'])}: {fuv_str} {nuv_str}")

## Cell 14: Multi-Wavelength Priority Scoring

Scoring incorporates evidence from all cross-matches: anomaly score, X-ray, UV, SIMBAD, Gaia variability class, ZTF coverage, TESS coverage, period, and light curve shape.

In [None]:
# Cell 14: Calculate priority score and create final table
print("="*70)
print("MULTI-WAVELENGTH PRIORITY SCORING")
print("="*70)

def calculate_priority(row):
    score = row['combined_score'] * 5  # Base anomaly score

    # X-ray detection is strong CV indicator (ROSAT)
    if row.get('has_xray', False):
        score += 3

    # GALEX UV detection - hot accretion disk indicator
    if pd.notna(row.get('galex_fuv')):
        score += 3  # FUV detection is very strong
    elif pd.notna(row.get('galex_nuv')):
        score += 1.5  # NUV only is still interesting

    # SIMBAD classification
    if not row.get('in_simbad', True):
        score += 5  # Not in SIMBAD - potentially novel discovery (high priority)
    else:
        simbad_otype = str(row.get('simbad_otype', ''))
        if any(t in simbad_otype for t in ['CV', 'No*', 'DN', 'DQ', 'AM', 'NL']):
            score += 1  # Confirmed CV-related

    # Gaia variability classification
    gaia_class = str(row.get('gaia_vari_class', ''))
    if gaia_class and gaia_class not in ['None', 'nan', '']:
        if any(c in gaia_class for c in ['CV', 'SYST', 'ECL']):
            score += 1.5

    # Not catalogued or generic in VSX
    if not row.get('in_vsx', True):
        score += 4  # Not catalogued as variable - potentially novel
    elif row.get('vsx_type') in ['VAR', 'VAR:', None, '']:
        score += 2.5  # Generic/uncertain type - needs investigation

    # ZTF coverage for follow-up potential
    ztf_obs = row.get('ztf_nobs', 0) or 0
    if ztf_obs > 100:
        score += 1.5
    elif ztf_obs > 50:
        score += 1
    elif ztf_obs > 0:
        score += 0.5

    # ZTF period agreement with VSX
    ztf_period = row.get('ztf_period')
    vsx_period = row.get('vsx_period')
    if pd.notna(ztf_period) and pd.notna(vsx_period) and vsx_period > 0:
        if abs(ztf_period - vsx_period) / vsx_period < 0.1:
            score += 1  # Period agreement bonus

    # TESS coverage for validation
    if row.get('tic_id') is not None:
        score += 2  # TIC match = TESS observable

    # Short period (potential CV)
    period = row.get('vsx_period')
    if period and period < 0.1:  # < 2.4 hours
        score += 2

    # Negative skewness (outburst-like)
    if row['skewness_mag_g_fov'] < -1:
        score += 1

    return score

top_candidates['priority_score'] = top_candidates.apply(calculate_priority, axis=1)
top_candidates = top_candidates.sort_values('priority_score', ascending=False)

print("\nPriority scoring criteria:")
print("  +5 x anomaly_score (base ML score)")
print("  +3 if X-ray detection (ROSAT)")
print("  +3 if FUV detection (GALEX), +1.5 if NUV only")
print("  +5 if not in SIMBAD (novel discovery), +1 if CV-related SIMBAD type")
print("  +1.5 if Gaia vari class matches CV/SYST/ECL")
print("  +4 if not in VSX, +2.5 if generic VAR type")
print("  +0.5-1.5 based on ZTF observation count")
print("  +1 if ZTF period agrees with VSX period")
print("  +2 if TIC match (TESS observable)")
print("  +2 if period < 2.4 hours")
print("  +1 if negative skewness (outburst-like)")

# Filter by priority score threshold
priority_threshold = CONFIG['priority_score_min']
high_priority = top_candidates[top_candidates['priority_score'] >= priority_threshold]
n_hp = len(high_priority)

print(f"\n  Candidates above priority threshold ({priority_threshold}): {n_hp} / {len(top_candidates)}")
print(f"  Score range: {top_candidates['priority_score'].min():.1f} - {top_candidates['priority_score'].max():.1f}")

print("\n" + "="*70)
print(f"PRIORITIZED CANDIDATES (priority_score >= {priority_threshold}): {n_hp} found")
print("="*70)

for i, (idx, row) in enumerate(high_priority.iterrows()):
    print(f"\n{'='*60}")
    print(f"RANK #{i+1} | Priority Score: {row['priority_score']:.2f}")
    print(f"{'='*60}")
    print(f"  Gaia DR3 {int(row['source_id'])}")
    print(f"  Position: RA={row['ra']:.5f}, Dec={row['dec']:.5f}")
    print(f"  G={row['phot_g_mean_mag']:.2f}, BP-RP={row['bp_rp']:.2f}")
    print(f"  Amplitude: {row['amplitude']:.2f} mag")
    print(f"  Skewness: {row['skewness_mag_g_fov']:.2f}, Kurtosis: {row['kurtosis_mag_g_fov']:.2f}")

    # VSX
    vsx_str = f"Yes - {row['vsx_type']}" if row.get('in_vsx') else 'NOT CATALOGUED'
    if row.get('vsx_period'):
        vsx_str += f" (P={row['vsx_period']:.4f}d = {row['vsx_period']*24*60:.1f} min)"
    print(f"  VSX: {vsx_str}")

    # SIMBAD
    if row.get('in_simbad'):
        print(f"  SIMBAD: {row['simbad_otype']} ({row['simbad_name']})")
    else:
        print(f"  SIMBAD: NOT FOUND")

    # Gaia variability
    gaia_class = row.get('gaia_vari_class')
    if pd.notna(gaia_class) and str(gaia_class) not in ['None', 'nan', '']:
        print(f"  Gaia Vari: {gaia_class} (score={row.get('gaia_vari_score', 0):.2f})")
    else:
        print(f"  Gaia Vari: Not classified")

    # X-ray (ROSAT)
    xray_str = 'YES' if row.get('has_xray') else 'No'
    if row.get('has_xray') and pd.notna(row.get('xray_count_rate')):
        xray_str += f" (rate={row['xray_count_rate']:.3f} ct/s)"
    print(f"  X-ray: {xray_str}")

    # GALEX UV
    fuv_str = f"FUV={row['galex_fuv']:.1f}" if pd.notna(row.get('galex_fuv')) else "No FUV"
    nuv_str = f"NUV={row['galex_nuv']:.1f}" if pd.notna(row.get('galex_nuv')) else "No NUV"
    print(f"  GALEX: {fuv_str}, {nuv_str}")

    # ZTF
    ztf_nobs = int(row.get('ztf_nobs', 0) or 0)
    ztf_per = row.get('ztf_period')
    if row.get('in_ztf'):
        ztf_parts = [f"{ztf_nobs} obs" if ztf_nobs > 0 else "matched"]
        if pd.notna(ztf_per):
            ztf_parts.append(f"P={ztf_per:.5f}d")
        print(f"  ZTF: {', '.join(ztf_parts)}")
    else:
        print(f"  ZTF: No")

    # TESS
    tic_str = f"TIC {int(row['tic_id'])}" if pd.notna(row.get('tic_id')) else 'N/A'
    print(f"  TESS: {tic_str}, {int(row.get('n_tess_sectors', 0))} sectors")

## Cell 15: Save Results

In [None]:
# Cell 15: Save results to files
print("="*70)
print("SAVING RESULTS")
print("="*70)

timestamp = datetime.now().strftime('%Y%m%d_%H%M')

# Save full candidate table
output_file = f'cv_candidates_{timestamp}.csv'
top_candidates.to_csv(output_file, index=False)
print(f"\n✓ Saved {len(top_candidates)} candidates to {output_file}")

# Save high-priority subset
high_priority = top_candidates[top_candidates['priority_score'] >= CONFIG['priority_score_min']]
hp_file = f'cv_candidates_high_priority_{timestamp}.csv'
high_priority.to_csv(hp_file, index=False)
print(f"✓ Saved {len(high_priority)} high-priority candidates to {hp_file}")

# Summary stats
print(f"\n" + "="*70)
print("MULTI-WAVELENGTH CROSS-MATCH SUMMARY")
print("="*70)
print(f"\nTotal sources queried: {len(df)}")
print(f"Sources after cleaning: {len(df_clean)}")
print(f"Top candidates analyzed: {len(top_candidates)}")

print(f"\nCross-match results:")
print(f"  VSX matches:        {top_candidates['in_vsx'].sum()} / {len(top_candidates)}")
print(f"  SIMBAD matches:     {top_candidates['in_simbad'].sum()} / {len(top_candidates)}")
gaia_vari_n = top_candidates['gaia_vari_class'].notna().sum() if 'gaia_vari_class' in top_candidates.columns else 0
print(f"  Gaia vari classes:  {gaia_vari_n} / {len(top_candidates)}")
print(f"  ROSAT X-ray:        {top_candidates['has_xray'].sum()} / {len(top_candidates)}")
galex_n = top_candidates['in_galex'].sum() if 'in_galex' in top_candidates.columns else 0
print(f"  GALEX UV:           {galex_n} / {len(top_candidates)}")
ztf_n = top_candidates['in_ztf'].sum() if 'in_ztf' in top_candidates.columns else 0
print(f"  ZTF coverage:       {ztf_n} / {len(top_candidates)}")
print(f"  TESS coverage:      {(top_candidates['n_tess_sectors'] > 0).sum()} / {len(top_candidates)}")

# Top 5 with evidence summary
best = top_candidates[top_candidates['priority_score'] >= CONFIG['priority_score_min']].head(10)
print(f"\nTOP 5 CANDIDATES TO INVESTIGATE:")
for i, (idx, row) in enumerate(best.iterrows()):
    flags = []
    if row.get('has_xray'): flags.append('X-ray')
    if pd.notna(row.get('galex_fuv')): flags.append('FUV')
    elif pd.notna(row.get('galex_nuv')): flags.append('NUV')
    if not row.get('in_vsx'): flags.append('Not-in-VSX')
    elif row.get('vsx_type') == 'VAR': flags.append('Generic-VAR')
    if not row.get('in_simbad'): flags.append('Not-in-SIMBAD')
    if row['skewness_mag_g_fov'] < -1: flags.append('Outburst-like')
    gaia_class = str(row.get('gaia_vari_class', ''))
    if gaia_class and gaia_class not in ['None', 'nan', '']:
        flags.append(f'Gaia:{gaia_class}')
    
    print(f"  {i+1}. Gaia DR3 {int(row['source_id'])} (TIC {int(row['tic_id']) if pd.notna(row.get('tic_id')) else 'N/A'})")
    print(f"     G={row['phot_g_mean_mag']:.1f}, Amp={row['amplitude']:.2f}mag, Score={row['priority_score']:.1f}")
    print(f"     Evidence: {', '.join(flags) if flags else 'ML anomaly only'}")

## Cell 16: Extract TESS Light Curves for High-Priority Candidates

In [None]:
# Cell 16: Extract TESS light curves for top 10 candidates
import time as _time

print("="*70)
print("TESS LIGHT CURVE EXTRACTION: TOP 10 CANDIDATES")
print("="*70)

# Select candidates by priority score threshold + TESS coverage
priority_threshold = CONFIG['priority_score_min']
candidates_with_tess = top_candidates[
    (top_candidates['n_tess_sectors'] > 0) &
    (top_candidates['priority_score'] >= priority_threshold)
].copy()
n_extract = len(candidates_with_tess)

print(f"\nExtracting light curves for {n_extract} candidates")
print(f"  (priority_score >= {priority_threshold} AND TESS coverage > 0)")

n_rows = (n_extract + 1) // 2
fig, axes = plt.subplots(n_rows, 2, figsize=(18, 4 * n_rows))
axes = axes.flatten()

lc_data = {}  # Store light curve data for later analysis

def fetch_tess_cutout(coord, size, sector, max_retries=3):
    """Fetch TESS cutout with retry logic for transient network errors."""
    for attempt in range(max_retries):
        try:
            cutout = Tesscut.get_cutouts(coordinates=coord, size=size, sector=sector)
            return cutout
        except Exception as e:
            err_str = str(e).lower()
            if any(kw in err_str for kw in ['disconnect', 'connection', 'timeout', 'reset', 'refused']):
                if attempt < max_retries - 1:
                    wait = 5 * (attempt + 1)
                    print(f"  Network error, retrying in {wait}s (attempt {attempt+2}/{max_retries})...", flush=True)
                    _time.sleep(wait)
                    continue
            raise  # Non-network errors or final attempt: re-raise

for i, (idx, target) in enumerate(candidates_with_tess.iterrows()):
    ax = axes[i]
    tic_id = int(target['tic_id']) if pd.notna(target.get('tic_id')) else 0
    gaia_id = int(target['source_id'])

    print(f"\n[{i+1}/{n_extract}] TIC {tic_id} (Gaia DR3 {gaia_id})")

    coord = SkyCoord(ra=target['ra']*u.deg, dec=target['dec']*u.deg, frame='icrs')
    sectors = target['tess_sectors']

    if not sectors or len(sectors) == 0:
        ax.text(0.5, 0.5, 'No sector data', ha='center', va='center', transform=ax.transAxes)
        ax.set_title(f'#{i+1} TIC {tic_id}', fontsize=10)
        continue

    sector = sectors[0]

    try:
        cutout = fetch_tess_cutout(coord, size=5, sector=sector)

        if cutout and len(cutout) > 0:
            hdu = cutout[0]
            data = hdu[1].data

            time = data['TIME']
            flux_cube = data['FLUX']

            # Simple aperture photometry (3x3 central pixels)
            aperture_flux = np.sum(flux_cube[:, 1:4, 1:4], axis=(1, 2))

            # Clean bad data
            good = np.isfinite(aperture_flux) & (aperture_flux > 0)
            time_clean = time[good]
            flux_clean = aperture_flux[good]

            if len(flux_clean) == 0:
                ax.text(0.5, 0.5, 'No valid data', ha='center', va='center', transform=ax.transAxes)
                ax.set_title(f'#{i+1} TIC {tic_id}', fontsize=10)
                continue

            # Normalize
            flux_norm = flux_clean / np.nanmedian(flux_clean)

            # Store for later analysis
            lc_data[tic_id] = {
                'time': time_clean,
                'flux': flux_norm,
                'sector': sector,
                'gaia_id': gaia_id
            }

            # Plot
            ax.scatter(time_clean, flux_norm, s=1, alpha=0.5, c='darkblue')

            # Mark outliers (potential outbursts)
            std = np.nanstd(flux_norm)
            med = np.nanmedian(flux_norm)
            outliers = (flux_norm > med + 3*std) | (flux_norm < med - 3*std)
            if np.any(outliers):
                ax.scatter(time_clean[outliers], flux_norm[outliers], s=8, c='red', zorder=10)

            # Title with VSX type and key flags
            vsx_info = f" [{target['vsx_type']}]" if target.get('in_vsx') and target.get('vsx_type') else ""
            flags = []
            if target.get('has_xray'): flags.append('Xray')
            if pd.notna(target.get('galex_fuv')): flags.append('FUV')
            elif pd.notna(target.get('galex_nuv')): flags.append('NUV')
            flag_str = f" ({','.join(flags)})" if flags else ""

            ax.set_title(f'#{i+1} TIC {tic_id}{vsx_info}{flag_str} | S{sector}', fontsize=10)
            ax.set_ylabel('Norm. Flux', fontsize=8)
            ax.tick_params(labelsize=8)
            ax.grid(True, alpha=0.2)

            # Annotate with key stats
            n_outliers = np.sum(outliers)
            max_excursion = (flux_norm.max() - med) / std if std > 0 else 0
            ax.text(0.02, 0.95, f'N={len(flux_norm)}, Out={n_outliers}, Max={max_excursion:.1f}\u03c3',
                   transform=ax.transAxes, fontsize=7, va='top',
                   bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

            print(f"  Sector {sector}: {len(flux_norm)} pts, {n_outliers} outliers, max {max_excursion:.1f}\u03c3")
        else:
            ax.text(0.5, 0.5, 'No cutout data', ha='center', va='center', transform=ax.transAxes)
            ax.set_title(f'#{i+1} TIC {tic_id}', fontsize=10)

    except Exception as e:
        ax.text(0.5, 0.5, f'Error: {str(e)[:40]}', ha='center', va='center',
                transform=ax.transAxes, fontsize=7)
        ax.set_title(f'#{i+1} TIC {tic_id}', fontsize=10)
        print(f"  Error: {e}")

# Hide unused axes
for j in range(i + 1, len(axes)):
    axes[j].set_visible(False)

# Common x-label for bottom row
for ax in axes[-2:]:
    if ax.get_visible():
        ax.set_xlabel('TESS BJD (days)', fontsize=9)

plt.tight_layout()
lc_file = 'tess_lc_top10.png'
plt.savefig(lc_file, dpi=150, bbox_inches='tight')
plt.show()

print(f"\n✓ Light curves saved to {lc_file}")
print(f"  Successfully extracted: {len(lc_data)} / {n_extract}")

# Summary of interesting features
print(f"\nLight curve assessment:")
for tic_id, lc in lc_data.items():
    flux = lc['flux']
    std = np.nanstd(flux)
    med = np.nanmedian(flux)
    outliers = np.sum((flux > med + 3*std) | (flux < med - 3*std))
    max_exc = (flux.max() - med) / std if std > 0 else 0

    flags = []
    if outliers > 10 and max_exc > 5: flags.append('POTENTIAL OUTBURST')
    if std > 0.05: flags.append('HIGH VARIABILITY')
    if max_exc > 10: flags.append('EXTREME EXCURSION')

    if flags:
        print(f"  TIC {tic_id}: {', '.join(flags)} (max={max_exc:.1f}\u03c3, outliers={outliers})")

## Cell 17: Deep Candidate Investigation

In [None]:
# Cell 17: Deep Candidate Investigation - Extended Parameters & Classification
print("="*70)
print("DEEP CANDIDATE INVESTIGATION")
print("="*70)

# Investigate all candidates above priority threshold
priority_threshold = CONFIG['priority_score_min']
deep_candidates = top_candidates[top_candidates['priority_score'] >= priority_threshold].copy()
n_deep = len(deep_candidates)

print(f"\nQuerying extended Gaia DR3 parameters for {n_deep} candidates")
print(f"  (priority_score >= {priority_threshold})")

# --- Extended Gaia DR3 parameters ---
gaia_ids = [str(int(sid)) for sid in deep_candidates['source_id']]
id_list = ', '.join(gaia_ids)

extended_query = f"""
SELECT source_id, ra, dec, parallax, parallax_error, pmra, pmdec,
       phot_g_mean_mag, bp_rp, teff_gspphot, logg_gspphot,
       mh_gspphot, radial_velocity, ruwe,
       astrometric_excess_noise, phot_bp_rp_excess_factor,
       phot_variable_flag, non_single_star
FROM gaiadr3.gaia_source
WHERE source_id IN ({id_list})
"""

try:
    from astroquery.gaia import Gaia
    job = Gaia.launch_job(extended_query)
    ext_results = job.get_results()
    print(f"  Retrieved extended parameters for {len(ext_results)} sources")
except Exception as e:
    print(f"  Error querying extended params: {e}")
    ext_results = None

# --- CV-specific catalog searches ---
print(f"\nSearching CV-specific catalogs...")

cv_catalog_results = {}
for i, (idx, row) in enumerate(deep_candidates.iterrows()):
    gaia_id = int(row['source_id'])
    coord = SkyCoord(ra=row['ra']*u.deg, dec=row['dec']*u.deg, frame='icrs')
    print(f"\n  [{i+1}/{n_deep}] Gaia DR3 {gaia_id}")
    
    info = {'gaia_id': gaia_id}
    
    # Extended Gaia params
    if ext_results is not None:
        match = ext_results[ext_results['source_id'] == gaia_id]
        if len(match) > 0:
            m = match[0]
            teff = m['teff_gspphot'] if not np.ma.is_masked(m['teff_gspphot']) else None
            logg = m['logg_gspphot'] if not np.ma.is_masked(m['logg_gspphot']) else None
            rv = m['radial_velocity'] if not np.ma.is_masked(m['radial_velocity']) else None
            ruwe = m['ruwe'] if not np.ma.is_masked(m['ruwe']) else None
            plx = m['parallax'] if not np.ma.is_masked(m['parallax']) else None
            nsstar = m['non_single_star'] if not np.ma.is_masked(m['non_single_star']) else None
            
            info.update({'teff': teff, 'logg': logg, 'rv': rv, 'ruwe': ruwe,
                        'parallax': plx, 'non_single_star': nsstar})
            
            if teff: print(f"    Teff = {teff:.0f} K", end='')
            if logg: print(f", log(g) = {logg:.2f}", end='')
            if rv: print(f", RV = {rv:.1f} km/s", end='')
            if ruwe: print(f", RUWE = {ruwe:.2f}", end='')
            print()
            
            if plx and plx > 0:
                dist_pc = 1000.0 / plx
                abs_g = m['phot_g_mean_mag'] + 5 * np.log10(plx / 100)
                info['dist_pc'] = dist_pc
                info['abs_g'] = abs_g
                print(f"    Distance ~ {dist_pc:.0f} pc, M_G = {abs_g:.2f}")
    
    # Search XMM-Newton (4XMM-DR14)
    try:
        xmm = Vizier(columns=['*'], row_limit=3, timeout=10).query_region(
            coord, radius=10*u.arcsec, catalog='IX/68/xmm4d14s')
        if xmm and len(xmm) > 0 and len(xmm[0]) > 0:
            info['xmm_detected'] = True
            info['xmm_matches'] = len(xmm[0])
            print(f"    XMM-Newton: {len(xmm[0])} detection(s)")
        else:
            info['xmm_detected'] = False
    except:
        info['xmm_detected'] = None
    
    # Search eROSITA (if available via VizieR)
    try:
        erosita = Vizier(columns=['*'], row_limit=3, timeout=10).query_region(
            coord, radius=15*u.arcsec, catalog='J/A+A/661/A1')
        if erosita and len(erosita) > 0 and len(erosita[0]) > 0:
            info['erosita_detected'] = True
            print(f"    eROSITA: detected")
        else:
            info['erosita_detected'] = False
    except:
        info['erosita_detected'] = None
    
    # Search SDSS spectroscopy
    try:
        sdss_spec = Vizier(columns=['*'], row_limit=3, timeout=10).query_region(
            coord, radius=3*u.arcsec, catalog='V/154/sdss16')
        if sdss_spec and len(sdss_spec) > 0 and len(sdss_spec[0]) > 0:
            info['sdss_spec'] = True
            print(f"    SDSS spectroscopy: available")
        else:
            info['sdss_spec'] = False
    except:
        info['sdss_spec'] = None
    
    # --- Physical interpretation ---
    classification = []
    confidence_flags = []
    
    # Check color-magnitude position
    bp_rp = row.get('bp_rp', None)
    if bp_rp is not None and bp_rp < 0.5:
        classification.append('Blue color consistent with hot accretion disk or WD')
        confidence_flags.append('blue')
    
    # Check X-ray + UV combination
    has_xray = row.get('has_xray', False) or info.get('xmm_detected', False)
    has_fuv = pd.notna(row.get('galex_fuv'))
    if has_xray and has_fuv:
        classification.append('X-ray + UV: Strong accretion indicator')
        confidence_flags.append('xray+uv')
    elif has_xray:
        classification.append('X-ray detected: possible magnetic CV or coronal emitter')
        confidence_flags.append('xray')
    elif has_fuv:
        classification.append('FUV detected: hot component present')
        confidence_flags.append('fuv')
    
    # Check RUWE for binarity
    if info.get('ruwe') and info['ruwe'] > 1.4:
        classification.append(f'High RUWE ({info["ruwe"]:.2f}): astrometric binary signature')
        confidence_flags.append('binary')
    
    # Check Teff
    if info.get('teff'):
        if info['teff'] > 10000:
            classification.append(f'Hot (Teff={info["teff"]:.0f}K): WD/sdO/sdB or accretion-heated')
        elif info['teff'] < 4500:
            classification.append(f'Cool (Teff={info["teff"]:.0f}K): likely M-dwarf donor or flare star')
    
    # Check variability amplitude
    if row.get('phot_g_n_obs', 0) > 0 and row.get('std_dev_over_rms_err_mag', 0) > 5:
        classification.append('High photometric variability (>5x expected)')
        confidence_flags.append('var')
    
    # Overall assessment
    n_cv_indicators = len(confidence_flags)
    if n_cv_indicators >= 3:
        assessment = 'STRONG CV CANDIDATE'
    elif n_cv_indicators >= 2:
        assessment = 'MODERATE CV CANDIDATE'
    elif n_cv_indicators >= 1:
        assessment = 'WEAK CV CANDIDATE'
    else:
        assessment = 'UNCERTAIN - needs more data'
    
    info['classification_notes'] = classification
    info['assessment'] = assessment
    info['n_cv_indicators'] = n_cv_indicators
    cv_catalog_results[gaia_id] = info
    
    print(f"    Assessment: {assessment}")
    for note in classification:
        print(f"      - {note}")
    
    # Generate finder chart / archive links
    ra_str = f"{row['ra']:.6f}"
    dec_str = f"{row['dec']:.6f}"
    print(f"    Links:")
    print(f"      Aladin: https://aladin.cds.unistra.fr/AladinLite/?target={ra_str}+{dec_str}&fov=0.05")
    print(f"      ESASky: https://sky.esa.int/?target={ra_str}%20{dec_str}&hips=DSS2+color&fov=0.05")

# --- Summary table ---
print(f"\n{'='*70}")
print("DEEP INVESTIGATION SUMMARY")
print(f"{'='*70}")
print(f"{'Gaia DR3 ID':<22} {'Teff':>6} {'log(g)':>7} {'RUWE':>6} {'XMM':>5} {'eROS':>5} {'SDSS':>5} {'Assessment'}")
print("-" * 90)
for gaia_id, info in cv_catalog_results.items():
    teff_s = f"{info.get('teff', 0):.0f}" if info.get('teff') else '---'
    logg_s = f"{info.get('logg', 0):.2f}" if info.get('logg') else '---'
    ruwe_s = f"{info.get('ruwe', 0):.2f}" if info.get('ruwe') else '---'
    xmm_s = 'Y' if info.get('xmm_detected') else ('?' if info.get('xmm_detected') is None else 'N')
    eros_s = 'Y' if info.get('erosita_detected') else ('?' if info.get('erosita_detected') is None else 'N')
    sdss_s = 'Y' if info.get('sdss_spec') else ('?' if info.get('sdss_spec') is None else 'N')
    print(f"{gaia_id:<22} {teff_s:>6} {logg_s:>7} {ruwe_s:>6} {xmm_s:>5} {eros_s:>5} {sdss_s:>5} {info['assessment']}")

# Store for use in priority refinement
top_candidates_deep = cv_catalog_results

## Cell 18: Period Analysis (Lomb-Scargle + Phase Folding)

In [None]:
# Cell 18: Period Analysis for Top Candidates with TESS Data
from astropy.timeseries import LombScargle
from scipy.ndimage import median_filter

print("="*70)
print("PERIOD ANALYSIS: LOMB-SCARGLE + PHASE FOLDING")
print("="*70)

n_period = len(lc_data)
print(f"\nAnalyzing all {n_period} candidates with extracted TESS light curves...")
print("Strategy: detrend outbursts first, then search for orbital period")

period_results = {}

fig, axes = plt.subplots(max(n_period, 1), 2, figsize=(16, 4.5 * max(n_period, 1)), squeeze=False)

for plot_idx, (tic_id, lc) in enumerate(list(lc_data.items())[:n_period]):
    ax_lc = axes[plot_idx][0]   # left: detrended light curve
    ax_ls = axes[plot_idx][1]   # right: periodogram
    t = lc['time']
    flux = lc['flux']
    gaia_id = lc['gaia_id']

    print(f"\n[{plot_idx+1}/{n_period}] TIC {tic_id} (Gaia DR3 {gaia_id})")

    # Look up VSX period
    cand_row = top_candidates[top_candidates['source_id'] == gaia_id]
    vsx_period = None
    if len(cand_row) > 0 and pd.notna(cand_row.iloc[0].get('vsx_period')):
        vsx_period = cand_row.iloc[0]['vsx_period']

    # Calculate cadence and Nyquist limit
    dt = np.diff(np.sort(t))
    dt_clean = dt[dt > 0]
    median_cadence = np.median(dt_clean) if len(dt_clean) > 0 else 30.0 / (24 * 60)
    nyquist_period = 2 * median_cadence
    print(f"  Cadence: {median_cadence*24*60:.1f} min -> Nyquist: {nyquist_period*24*60:.1f} min")

    # --- Step 1: Identify and remove outburst data ---
    med = np.nanmedian(flux)
    std = np.nanstd(flux)

    # For dwarf novae: outbursts are ABOVE quiescence (brightening = flux increase)
    # Use iterative sigma clipping to find quiescent level
    quiescent_mask = np.ones(len(flux), dtype=bool)
    for clip_iter in range(3):
        q_med = np.nanmedian(flux[quiescent_mask])
        q_std = np.nanstd(flux[quiescent_mask])
        if q_std <= 0:
            break
        # Clip points >2.5 sigma above quiescent median (outbursts)
        # Keep points below (dips/eclipses are useful for period finding)
        quiescent_mask = flux < (q_med + 2.5 * q_std)

    t_q = t[quiescent_mask]
    flux_q = flux[quiescent_mask]
    n_outburst = np.sum(~quiescent_mask)
    pct_quiescent = 100 * len(t_q) / len(t)
    print(f"  Quiescent points: {len(t_q)}/{len(t)} ({pct_quiescent:.0f}%), outburst points removed: {n_outburst}")

    # --- Step 2: Detrend quiescent data (remove slow variations) ---
    if len(t_q) > 50:
        # Median filter to remove trends longer than ~1 day
        filter_width = max(3, int(1.0 / median_cadence))  # ~1 day window
        if filter_width % 2 == 0:
            filter_width += 1
        flux_trend = median_filter(flux_q, size=min(filter_width, len(flux_q) // 3 * 2 + 1))
        flux_detrend = flux_q / flux_trend
        # Re-normalize
        flux_detrend = flux_detrend / np.nanmedian(flux_detrend)
    else:
        flux_detrend = flux_q / np.nanmedian(flux_q) if len(flux_q) > 0 else flux_q

    # Plot detrended quiescent light curve
    ax_lc.scatter(t_q, flux_detrend, s=1, alpha=0.4, c='darkblue', label='Quiescent (detrended)')
    outburst_t = t[~quiescent_mask]
    outburst_f = flux[~quiescent_mask] / med  # rough normalization for display
    ax_lc.scatter(outburst_t, np.full_like(outburst_f, np.nanmax(flux_detrend)*1.02),
                  s=8, c='red', marker='v', alpha=0.5, label=f'Outburst ({n_outburst} pts)')
    ax_lc.set_xlabel('TESS BJD')
    ax_lc.set_ylabel('Detrended Flux')
    ax_lc.legend(fontsize=7, loc='upper right')
    ax_lc.set_title(f'TIC {tic_id} - Quiescent', fontsize=10)
    ax_lc.grid(True, alpha=0.2)

    # --- Step 3: Lomb-Scargle on detrended quiescent data ---
    if len(t_q) < 30:
        ax_ls.text(0.5, 0.5, f'Too few quiescent pts ({len(t_q)})',
                   ha='center', va='center', transform=ax_ls.transAxes)
        ax_ls.set_title(f'TIC {tic_id} | Insufficient data', fontsize=10)
        print(f"  Skipping L-S: only {len(t_q)} quiescent points")
        continue

    min_period = nyquist_period
    max_period = min(1.0, (t_q[-1] - t_q[0]) / 3)  # up to 1 day (24 hr) - CV orbital range

    if min_period >= max_period:
        ax_ls.text(0.5, 0.5, 'Period range too narrow', ha='center', va='center', transform=ax_ls.transAxes)
        ax_ls.set_title(f'TIC {tic_id}', fontsize=10)
        print(f"  Skipping: Nyquist ({nyquist_period*24*60:.0f} min) >= max search ({max_period*24*60:.0f} min)")
        continue

    # Frequency grid
    n_freq = int(10 * (1/min_period - 1/max_period) * (t_q[-1] - t_q[0]))
    n_freq = min(max(n_freq, 5000), 100000)
    frequency = np.linspace(1/max_period, 1/min_period, n_freq)

    try:
        ls = LombScargle(t_q, flux_detrend)
        power = ls.power(frequency)

        # Find top 5 peaks
        peak_indices = []
        power_copy = power.copy()
        for _ in range(5):
            if np.max(power_copy) <= 0:
                break
            peak_idx = np.argmax(power_copy)
            peak_indices.append(peak_idx)
            peak_freq = frequency[peak_idx]
            mask = np.abs(frequency - peak_freq) < 0.03 * peak_freq
            power_copy[mask] = 0

        best_freq = frequency[peak_indices[0]]
        best_period = 1 / best_freq
        fap = ls.false_alarm_probability(power.max())

        # Check if VSX period matches any of the top peaks (or harmonics)
        vsx_match_peak = None
        vsx_match_type = None
        if vsx_period and vsx_period * 24 * 60 >= nyquist_period * 24 * 60:
            for pi in peak_indices:
                p = 1/frequency[pi]
                for harmonic, label in [(1, '1:1'), (2, '2:1'), (0.5, '1:2'), (3, '3:1'), (1/3, '1:3')]:
                    test_p = vsx_period * harmonic
                    if test_p > 0 and abs(p - test_p) / test_p < 0.05:
                        vsx_match_peak = p
                        vsx_match_type = label
                        break
                if vsx_match_peak:
                    break

        period_results[tic_id] = {
            'best_period_days': best_period,
            'best_period_min': best_period * 24 * 60,
            'fap': fap,
            'vsx_period': vsx_period,
            'gaia_id': gaia_id,
            'nyquist_min': nyquist_period * 24 * 60,
            'vsx_match': vsx_match_peak is not None,
            'vsx_match_type': vsx_match_type,
            'n_quiescent': len(t_q),
            'n_outburst': n_outburst,
            'top_periods_min': [1/frequency[pi] * 24 * 60 for pi in peak_indices[:5]]
        }

        print(f"  Best L-S period: {best_period*24*60:.2f} min ({best_period:.6f} d)")
        print(f"  FAP: {fap:.2e}")
        top5_str = ', '.join(f'{1/frequency[pi]*24*60:.1f}' for pi in peak_indices[:5])
        print(f"  Top 5 peaks (min): {top5_str}")
        if vsx_period:
            print(f"  VSX period: {vsx_period*24*60:.1f} min")
            if vsx_match_peak:
                print(f"  VSX MATCH: {vsx_match_type} harmonic (L-S peak at {vsx_match_peak*24*60:.1f} min)")
            else:
                if vsx_period * 24 * 60 < nyquist_period * 24 * 60:
                    print(f"  VSX period ({vsx_period*24*60:.0f} min) is BELOW Nyquist limit ({nyquist_period*24*60:.0f} min) - undetectable at this cadence")
                else:
                    print(f"  No match in top L-S peaks")

        # Plot periodogram
        period_arr = 1 / frequency
        ax_ls.semilogx(period_arr * 24 * 60, power, 'b-', lw=0.5)
        ax_ls.axvline(best_period * 24 * 60, color='red', ls='--', lw=1.5,
                      label=f'Best: {best_period*24*60:.1f} min')
        if vsx_period:
            ax_ls.axvline(vsx_period * 24 * 60, color='orange', ls=':', lw=2,
                          label=f'VSX: {vsx_period*24*60:.1f} min')
            # Also mark 2x harmonic of VSX period
            ax_ls.axvline(vsx_period * 2 * 24 * 60, color='orange', ls=':', lw=1, alpha=0.4)
        ax_ls.axvline(nyquist_period * 24 * 60, color='gray', ls='-.', lw=1, alpha=0.5,
                      label=f'Nyquist: {nyquist_period*24*60:.0f} min')
        ax_ls.set_xlabel('Period (min)')
        ax_ls.set_ylabel('L-S Power')
        fap_str = f'{fap:.1e}' if fap > 0 else '<1e-16'
        match_str = f' VSX:{vsx_match_type}' if vsx_match_peak else ''
        ax_ls.set_title(f'TIC {tic_id} | FAP={fap_str}{match_str}', fontsize=10)
        ax_ls.legend(fontsize=7)
        ax_ls.set_xlim(nyquist_period * 24 * 60 * 0.9, max_period * 24 * 60 * 1.1)

    except Exception as e:
        ax_ls.text(0.5, 0.5, f'Error: {str(e)[:30]}', ha='center', va='center',
                   transform=ax_ls.transAxes, fontsize=8)
        ax_ls.set_title(f'TIC {tic_id}', fontsize=10)
        print(f"  Error: {e}")

plt.suptitle('Period Analysis: Outburst-Detrended Quiescent Data', fontsize=14, fontweight='bold', y=1.01)
plt.tight_layout()
plt.savefig('periodograms_top5.png', dpi=150, bbox_inches='tight')
plt.show()
print(f"\n✓ Saved periodograms to periodograms_top5.png")

# Phase-folded light curves for significant periods
sig_periods = {k: v for k, v in period_results.items() if v['fap'] < 0.01}
if sig_periods:
    n_sig = len(sig_periods)
    n_phase_cols = min(n_sig, 4)
    n_phase_rows = (n_sig + n_phase_cols - 1) // n_phase_cols
    fig2, axes2 = plt.subplots(n_phase_rows, n_phase_cols,
                                figsize=(5 * n_phase_cols, 4 * n_phase_rows), squeeze=False)

    for j, (tic_id, pr) in enumerate(list(sig_periods.items())[:8]):
        ax = axes2[j // n_phase_cols][j % n_phase_cols]
        t_q_local = t_q if tic_id == list(lc_data.keys())[plot_idx] else None

        # Re-extract quiescent data for this target
        lc_local = lc_data[tic_id]
        t_loc = lc_local['time']
        f_loc = lc_local['flux']
        q_mask = np.ones(len(f_loc), dtype=bool)
        for _ in range(3):
            qm = np.nanmedian(f_loc[q_mask])
            qs = np.nanstd(f_loc[q_mask])
            if qs <= 0: break
            q_mask = f_loc < (qm + 2.5 * qs)
        t_phase = t_loc[q_mask]
        f_phase = f_loc[q_mask] / np.nanmedian(f_loc[q_mask])

        # Choose fold period: prefer VSX if matched, else best L-S
        fold_period = pr['best_period_days']
        period_label = f"L-S: {pr['best_period_min']:.1f} min"
        if pr['vsx_period'] and pr['vsx_match']:
            fold_period = pr['vsx_period']
            period_label = f"VSX: {pr['vsx_period']*24*60:.1f} min"
        elif pr['vsx_period'] and pr['vsx_period'] * 24 * 60 >= pr['nyquist_min']:
            # Also try VSX period even if not in top peaks
            fold_period = pr['vsx_period']
            period_label = f"VSX: {pr['vsx_period']*24*60:.1f} min (forced)"

        phase = (t_phase % fold_period) / fold_period
        ax.scatter(phase, f_phase, s=1, alpha=0.3, c='navy')
        ax.scatter(phase + 1, f_phase, s=1, alpha=0.3, c='navy')

        # Binned phase curve
        n_bins = 30
        bin_edges = np.linspace(0, 1, n_bins + 1)
        bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
        bin_means = []
        for b in range(n_bins):
            in_bin = (phase >= bin_edges[b]) & (phase < bin_edges[b+1])
            bin_means.append(np.nanmedian(f_phase[in_bin]) if np.any(in_bin) else np.nan)
        bin_means = np.array(bin_means)
        ax.plot(bin_centers, bin_means, 'r-', lw=2, alpha=0.8, label='Binned median')
        ax.plot(bin_centers + 1, bin_means, 'r-', lw=2, alpha=0.8)

        ax.set_xlabel('Phase')
        ax.set_ylabel('Norm. Flux')
        ax.set_title(f'TIC {tic_id}\n{period_label}', fontsize=10)
        ax.set_xlim(0, 2)
        ax.legend(fontsize=7)

    # Hide unused
    for jj in range(j + 1, n_phase_rows * n_phase_cols):
        axes2[jj // n_phase_cols][jj % n_phase_cols].set_visible(False)

    plt.suptitle('Phase-Folded Light Curves (Quiescent Only, FAP < 0.01)', fontsize=12, fontweight='bold')
    plt.tight_layout()
    plt.savefig('phase_folded_top.png', dpi=150, bbox_inches='tight')
    plt.show()
    print(f"✓ Saved phase-folded plots to phase_folded_top.png")
else:
    print("\nNo candidates with significant periods (FAP < 0.01) in quiescent data.")

# Summary
print(f"\n{'='*70}")
print("PERIOD & VARIABILITY SUMMARY")
print(f"{'='*70}")
for tic_id, pr in period_results.items():
    flux = lc_data[tic_id]['flux']
    std = np.nanstd(flux)
    med_f = np.nanmedian(flux)
    n3sig = np.sum((flux > med_f + 3*std) | (flux < med_f - 3*std))
    n5sig = np.sum((flux > med_f + 5*std) | (flux < med_f - 5*std))
    t_loc = lc_data[tic_id]['time']
    baseline = t_loc[-1] - t_loc[0]

    sig_str = "SIGNIFICANT" if pr['fap'] < 0.01 else "not significant"
    print(f"\n  TIC {tic_id} (Gaia DR3 {pr['gaia_id']}):")
    print(f"    Best period (quiescent): {pr['best_period_min']:.2f} min (FAP={pr['fap']:.2e}, {sig_str})")
    print(f"    Nyquist limit: {pr['nyquist_min']:.1f} min")
    print(f"    Top 5 peaks: {', '.join(f'{p:.1f}' for p in pr['top_periods_min'])} min")
    if pr['vsx_period']:
        vsx_min = pr['vsx_period']*24*60
        match_info = f"MATCHED ({pr['vsx_match_type']})" if pr['vsx_match'] else "no match"
        if vsx_min < pr['nyquist_min']:
            match_info = f"BELOW NYQUIST ({vsx_min:.0f} < {pr['nyquist_min']:.0f} min)"
        print(f"    VSX period: {vsx_min:.1f} min - {match_info}")
    print(f"    Data: {pr['n_quiescent']} quiescent + {pr['n_outburst']} outburst pts, {baseline:.1f} day baseline")
    if baseline > 0 and n3sig > 0:
        print(f"    Outliers: {n3sig} (>3sig), {n5sig} (>5sig) | Flare rate: ~{n3sig/baseline:.1f}/day")

print(f"\n  NOTE: TESS FFI cadence (~30 min) limits orbital period detection to >{60:.0f} min.")
print(f"  For CVs with shorter orbital periods, use:")
print(f"    - TESS 2-min short-cadence data (if available)")
print(f"    - ZTF forced photometry via IRSA")
print(f"    - Ground-based fast photometry (time-resolved CCD)")

## Cell 19: Next Steps

In [None]:
# Cell 19: Next steps summary
from IPython.display import Markdown, display

next_steps = """
# Next Steps for Candidate Follow-up

## Automated pipeline completed in this notebook:

### Cross-matching (Cells 7-13)
- **VSX**: Variable star classifications and known periods
- **ROSAT**: X-ray detections (accretion indicator)
- **TESS**: Sector coverage check
- **SIMBAD**: Existing astronomical classifications
- **Gaia DR3 vari_classifier_result**: Automated variability classes
- **ZTF**: Optical light curve coverage and observation counts
- **GALEX**: UV excess from hot accretion disks (FUV/NUV)

### Analysis (Cells 14-18)
- **Multi-wavelength priority scoring**: Composite ranking from all cross-matches
- **TESS light curve extraction**: Top 10 candidates with aperture photometry
- **Deep candidate investigation**: Extended Gaia params (Teff, log(g), RUWE), XMM-Newton, eROSITA, SDSS spectroscopy, physical interpretation
- **Period analysis**: Lomb-Scargle periodograms + phase folding for significant periods

## Remaining manual follow-up:

### 1. For STRONG CV candidates
- **ZTF forced photometry**: Request via IRSA for dense multi-year light curves
- **TESS extended sectors**: Extract all available sectors (this pipeline extracts first sector only)
- **Spectroscopy**: Check SDSS/LAMOST archives or request new observations
  - Key diagnostics: H-alpha emission, He II 4686, Balmer decrement

### 2. Period refinement
- Cross-check Lomb-Scargle periods against ZTF forced photometry
- For eclipsing systems: measure eclipse timing for O-C diagrams
- For superhumpers: look for period excess over orbital period

### 3. Classification confirmation
- Compare with Ritter & Kolb CV catalog (Edition 7.24+)
- Check against AAVSO International Database for historical outbursts
- Verify against Downes et al. CV atlas

### 4. Publication path
- **Quick announcement**: RNAAS (1-2 weeks turnaround)
- **Full characterization**: JAAVSO, A&A, or PASP
- **Multiple new candidates**: Batch paper in MNRAS

## Files generated
- `cv_candidates_YYYYMMDD_HHMM.csv` - Full candidate table with all cross-matches
- `cv_candidates_high_priority_YYYYMMDD_HHMM.csv` - High-priority subset
- `anomaly_detection_v2.png` - ML diagnostic plots
- `tess_lc_top10.png` - TESS light curves for top 10
- `periodograms_top5.png` - Lomb-Scargle periodograms
- `phase_folded_top.png` - Phase-folded light curves (if significant periods found)

## Re-run with different parameters
- Adjust `bp_rp_max` to target bluer/redder sources
- Increase `sample_size` for more candidates
- Focus on specific sky regions (add RA/Dec filters to Gaia query)
- Adjust `n_lc_extract` to extract more/fewer TESS light curves
"""

display(Markdown(next_steps))

print("\n" + "="*70)
print("PIPELINE COMPLETE")
print("="*70)

In [None]:

# Update CONFIG in the running kernel
CONFIG['sample_size'] = 5000
CONFIG['anomaly_score_min'] = 0.3
CONFIG['require_consensus'] = False

print("CONFIG updated in kernel:")
print(f"  sample_size: {CONFIG['sample_size']}")
print(f"  anomaly_score_min: {CONFIG['anomaly_score_min']}")
print(f"  require_consensus: {CONFIG['require_consensus']}")
print(f"  priority_score_min: {CONFIG['priority_score_min']}")
