In [None]:
"""
NOTE: Clear cell output occasionally to optimize UI efficiency
"""

In [None]:
"""
Part 1: Create catalog
This script finds all .tt0.subim.fits files in VLASS epochs 2.1 and 2.2
from https://archive-new.nrao.edu/vlass/se_continuum_imaging/ and records 
the target folder URL, epoch, tile name, and target name into a CSV file 
in the current working directory
"""

import os
import csv
import re
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin

# Base URL and epochs to scan
top_url = "https://archive-new.nrao.edu/vlass/se_continuum_imaging/"
epoch_dirs = ["VLASS2.1/", "VLASS2.2/"]

# Patterns
fits_pattern = re.compile(r".*\.tt0\.subim\.fits$", re.IGNORECASE)
target_pattern = re.compile(r"[Jj]\d{6}[+-]\d{6}")

# Output file
output_csv = "epoch2_continuum_catalog.csv"


def load_processed(csv_file):
    # Load processed data from existing CSV to skip them on resume
    processed = set()
    if os.path.exists(csv_file):
        with open(csv_file, newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                processed.add(row['Link'])
    return processed


def list_tiles(epoch_url):
    # Return list of tile subdirectories under the given epoch
    print(f"[PARSE] Getting tiles in {epoch_url}")
    resp = requests.get(epoch_url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')
    return [a['href'] for a in soup.find_all('a', href=True)
            if a['href'].endswith('/') and a['href'].startswith('T')]


def list_targets(tile_url):
    # Return list of target subdirectories under the given tile
    print(f"[PARSE] Getting targets in {tile_url}")
    resp = requests.get(tile_url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')
    return [a['href'] for a in soup.find_all('a', href=True)
            if a['href'].endswith('/') and a['href'].lower().startswith('vlass2.') and '.se.' in a['href']]


def process_target(epoch, tile, target_dir, tile_url, writer, processed):
    # Process a target directory: if processed - skip; else - record its path data
    target_url = urljoin(tile_url, target_dir)
    if target_url in processed:
        print(f"[SKIP] Already processed {target_url}")
        return
    print(f"[PARSE] Opening {target_url}")
    resp = requests.get(target_url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')

    for a in soup.find_all('a', href=True):
        href = a['href']
        if fits_pattern.match(href):
            # Write row for target's folder URL
            folder_url = target_url
            match = target_pattern.search(href)
            target_name = match.group(0) if match else ''
            writer.writerow([folder_url, epoch, tile, target_name])
            print(f"[FOUND] {folder_url} | Epoch {epoch} | Tile {tile} | Target {target_name}")
            processed.add(folder_url)
            break


def main():
    # Open CSV in append or write mode, iterate through epochs, tiles, targets
    processed = load_processed(output_csv)
    mode = 'a' if processed else 'w'

    with open(output_csv, mode, newline='') as csvfile:
        writer = csv.writer(csvfile)
        if mode == 'w':
            writer.writerow(['Link', 'Epoch', 'Tile', 'Target'])

        for epoch_dir in epoch_dirs:
            epoch = epoch_dir.replace('VLASS', '').strip('/')
            epoch_url = urljoin(top_url, epoch_dir)
            for tile_dir in list_tiles(epoch_url):
                tile = tile_dir.rstrip('/')
                tile_url = urljoin(epoch_url, tile_dir)
                for target_dir in list_targets(tile_url):
                    process_target(epoch, tile, target_dir, tile_url, writer, processed)

    print(f"[COMPLETE] Created {output_csv}")


if __name__ == '__main__':
    main()

In [None]:
"""
Part 2: Download .fits files
Read the catalog CSV, open and scan each URL for .tt0.subim.fits files,
and download them to a directory
"""

import os

# Output directory
outdir = "epoch2_continuum_fits"

def download_fits(csv_file=output_csv, outdir=outdir):
    # Download all .tt0.subim.fits files listed in the catalog
    
    # Ensure directory exists
    os.makedirs(outdir, exist_ok=True)

    with open(csv_file, newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            folder_url = row['Link']
            print(f"[SCAN] {folder_url}")
            resp = requests.get(folder_url)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.text, 'html.parser')

            # Find and download matching FITS files
            for a in soup.find_all('a', href=True):
                href = a['href']
                if fits_pattern.match(href):
                    file_url = urljoin(folder_url, href)
                    filename = os.path.basename(href)
                    dest = os.path.join(outdir, filename)

                    if os.path.exists(dest):
                        print(f"[SKIP] {filename}")
                        continue

                    print(f"[GET] {file_url}")
                    file_resp = requests.get(file_url, stream=True)
                    file_resp.raise_for_status()
                    with open(dest, 'wb') as out_f:
                        for chunk in file_resp.iter_content(chunk_size=8192):
                            out_f.write(chunk)
                    print(f"[SAVE] {dest}")

def main_download():
    # Download FITS using the catalog
    download_fits()

if __name__ == '__main__':
    main_download()

In [None]:
"""
Part 3: Get RA/Dec
Iterate through each fits file, open the header with astropy to 
read data and parse RA/Dec in degrees and observation date/time 
data, and append to the catalog
"""

from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.wcs import WCS
import astropy.units as u
import numpy as np
import os
import csv

def update_catalog(csv_file=output_csv, fits_folder=outdir):
    """
    Update the existing catalog and append columns for RA, Dec, Date Start, Date
    End, Time Start, and Time End
    """
    # Verify FITS directory exists
    if not os.path.isdir(fits_folder):
        print(f"[ERROR] Not found: {fits_folder}")
        return

    # Read existing CSV into memory
    with open(csv_file, newline='') as f_in:
        reader = csv.DictReader(f_in)
        rows = list(reader)
        # Determine which additional columns to add
        new_cols = ['RA', 'Dec', 'Date Start', 'Date End', 'Time Start', 'Time End']
        # Only append missing columns
        fieldnames = reader.fieldnames + [col for col in new_cols if col not in reader.fieldnames]

    # Overwrite original CSV with updated data
    with open(csv_file, 'w', newline='') as f_out:
        writer = csv.DictWriter(f_out, fieldnames=fieldnames)
        writer.writeheader()

        for row in rows:
            # Skip if already contains RA
            if row.get('RA') not in (None, ''):
                writer.writerow(row)
                continue
            target = row.get('Target', '')
            # Find FITS
            matches = [fn for fn in os.listdir(fits_folder)
                       if fn.endswith('.tt0.subim.fits') and target in fn]

            if matches:
                filepath = os.path.join(fits_folder, matches[0])
                try:
                    # Read header for dates
                    hdr = fits.getheader(filepath)
                    date_obs = hdr.get('DATE-OBS', '')
                    date_end = hdr.get('DATE-END', '')

                    # Read full data array (auto‐decompresses if needed)
                    data = fits.getdata(filepath, memmap=False)
                    header2 = fits.getheader(filepath)
                    data = np.squeeze(data)
                    wcs_full = WCS(header2)
                    try:
                        wcs = wcs_full.celestial
                    except Exception:
                        wcs = wcs_full

                    # Find brightest pixel and convert to sky coordinates
                    linidx = np.nanargmax(data)
                    y_pix, x_pix = np.unravel_index(linidx, data.shape)
                    skycoord = wcs.pixel_to_world(x_pix, y_pix)
                    ra = skycoord.ra.deg
                    dec = skycoord.dec.deg
                    
                except Exception as e:
                    print(f"[ERROR] Cannot read {filepath}: {e}")
                    ra = dec = date_obs = date_end = ''
                # Parse date and time separately
                if 'T' in date_obs:
                    obs_date, obs_time = date_obs.split('T', 1)
                else:
                    obs_date = obs_time = ''
                if 'T' in date_end:
                    end_date, end_time = date_end.split('T', 1)
                else:
                    end_date = end_time = ''
                # Log successful extraction
                print(f"[SUCESS] {target} | RA={ra} | Dec={dec} | Date Start={obs_date} | Time Start={obs_time} | Date End={end_date} | Time End={end_time}")
            else:
                print(f"[ERROR] No file found for {target}")
                ra = dec = obs_date = obs_time = end_date = end_time = ''

            # Append new values to the row
            row['RA'] = ra
            row['Dec'] = dec
            row['Date Start'] = obs_date
            row['Date End'] = end_date
            row['Time Start'] = obs_time
            row['Time End'] = end_time
            writer.writerow(row)

    print(f"[COMPLETE] Updated {csv_file}")


def main_update():
    # Extract header info
    update_catalog()

if __name__ == '__main__':
    main_update()

In [None]:
"""
Part 4: Calculate Median Time (for plot display)
Takes the median time from Time Start and Time End to get the median
observation time for single display on charts
"""

import datetime


def compute_med_time(csv_file=output_csv):
    """
    Compute median of Time Start and Time End and append with a Median
    Time column
    """
    # Read existing CSV into memory
    with open(csv_file, newline='') as f_in:
        reader = csv.DictReader(f_in)
        rows = list(reader)
        # Determine if column exists
        has_median = 'Median Time' in reader.fieldnames
        fieldnames = reader.fieldnames + (['Median Time'] if not has_median else [])

    # Parse time string (with or without microseconds)
    def parse_time(ts):
        for fmt in ("%H:%M:%S.%f", "%H:%M:%S"):
            try:
                return datetime.datetime.strptime(ts, fmt)
            except ValueError:
                continue
        return None

    # Compute median time for each row
    for row in rows:
        target = row.get('Target', '')
        existing = row.get('Median Time', '')
        # Skip if already has data
        if existing:
            print(f"[SKIP] Median Time already computed for {target}")
            continue
        ts = row.get('Time Start', '')
        te = row.get('Time End', '')
        # Error if missing time data
        if not ts or not te:
            print(f"[WARN] No time data for {target}")
            row['Median Time'] = ''
            continue
        # Parse start and end times
        dt_s = parse_time(ts)
        dt_e = parse_time(te)
        if dt_s is None or dt_e is None:
            print(f"[WARN] Unrecognized time format: {ts} / {te}")
            row['Median Time'] = ''
            continue
        # Compute seconds since midnight
        sec_s = dt_s.hour * 3600 + dt_s.minute * 60 + dt_s.second + dt_s.microsecond/1e6
        sec_e = dt_e.hour * 3600 + dt_e.minute * 60 + dt_e.second + dt_e.microsecond/1e6
        avg_sec = (sec_s + sec_e) / 2
        # Convert back to HH:MM:SS
        h = int(avg_sec // 3600)
        m = int((avg_sec % 3600) // 60)
        s = int(avg_sec % 60)
        median_str = f"{h:02d}:{m:02d}:{s:02d}"
        row['Median Time'] = median_str
        print(f"[SUCCESS] {row.get('Target')} | Median Time={median_str}")

    # Append catalog with updated data
    with open(csv_file, 'w', newline='') as f_out:
        writer = csv.DictWriter(f_out, fieldnames=fieldnames)
        writer.writeheader()
        for row in rows:
            writer.writerow(row)

    print(f"[COMPLETE] Updated {csv_file}")


def main_median():
    # Compute median times
    compute_med_time()


if __name__ == '__main__':
    main_median()