# Diving Into WFI Thermal Vacuum (TVAC) Bright Star Test Data

## Kernel Information and Read-Only Status

To run this notebook, please select the "Roman Calibration" kernel at the top right of your window.

This notebook is read-only. You can run cells and make edits, but you must save changes to a different location. We recommend saving the notebook within your home directory, or to a new folder within your home (e.g. <span style="font-variant:small-caps;">file > save notebook as > my-nbs/nb.ipynb</span>). Note that a directory must exist before you attempt to add a notebook to it.

## Imports
 Libraries used
- *romancal* for running the processing pipeline
- *roman_datamodels* for opening Roman WFI ASDF files
- *asdf* for opening Roman WFI ASDF files
- *os* for checking if files exist
- *copy* for making copies of Python objects
- *s3fs* for streaming files from an S3 bucket
<font color='red'>Update above</font>

## Introduction
In Fall 2023 and Spring 2024, the Wide Field Instrument (WFI) underwent thermal vacuum (TVAC) to collect performance and trending data in a flight-like environment to assess and data collection in a flight-like environment. To learn more about TVAC, please visit the [RDox pages on the WFI Ground Testing Campaigns](https://roman-docs.stsci.edu/roman-instruments-home/wfi-imaging-mode-user-guide/wfi-detectors/detector-performance/wfi-ground-testing-campaigns#WFIGroundTestingCampaigns-WFI_Testing).

As part of this TVAC2 campaign, the WFI Bright Star Saturation test was conducted to characterize how WFI Sensor Chip Assemblies (SCAs) respond to saturation from a wide range of bright, in-focus point sources, similar to those expected during Roman's science surveys. A telescope simulator was used to project nine simulated point sources with magnitudes ranging from ~4 to ~18 mag through the F146 filter, onto a grid of locations on SCAs 4 and 11.

In this notebook we explore a subset of the point source data. We apply a non-complete set of calibrations with the Roman WFI science calibration pipeline RomanCal (Python package name `romancal`) and visualize the results. 

- <font color='red'>To do: SCA11 - Mag4 is Activity 28; Include a table mapping from activity to mags?</font>
- <font color='red'>Issue caveats that the optical properties of this set-up are not flight-like.</font>

## Tutorial Data
In this tutorial, we use L1 WFI test data files stored in the Nexus S3 bucket. See the [Data Discovery and Access](../data_discovery_and_access/data_discovery_and_access.ipynb) and [Exposure Pipeline](../exposure_pipeline/exposure_pipeline.ipynb) tutorials for more information on how to pull data and run Romancal.
<font color='red'>Notebook links might be incorrect.</font>

In [None]:
%matplotlib inline
import numpy as np
from scipy.signal import convolve2d
import asdf
import os
import copy
import roman_datamodels as rdm
from roman_datamodels.dqflags import pixel as dqflags
from romancal.saturation import SaturationStep
from roman_datamodels.datamodels import SaturationRefModel
import matplotlib.pyplot as plt
import s3fs
import romancal
from romancal.pipeline import ExposurePipeline

## Pull the data
We will ingest data corresponding to a magnitude 4 point source imaged on detector 11 and three subsequent darks.

In [None]:
# Utility function to open a given asdf file
def open_asdf_datamodel(asdf_uri: str, anon: bool = True):
    """
    Open an ASDF file from S3 or locally and return a Roman datamodel
    """
    if asdf_uri.startswith("s3://"):
        fs = s3fs.S3FileSystem(anon=anon)
        with fs.open(asdf_uri, "rb") as fb:
            with asdf.open(fb) as af:
                return rdm.open(af).copy()
    else:
        with open(asdf_uri, "rb") as fb:
            with asdf.open(fb) as af:
                return rdm.open(af).copy()

In [None]:
# This is hard-coded, better to have a search for the right activity/mag/SCA.
Mag = 4
SCA = 11
WFI_TAG = f'WFI{SCA:02d}'
asdf_dir_uri = 's3://stpubdata/roman/nexus/tvac/'
base = asdf_dir_uri + 'OTP00651_BrightStar_TV2b_R1_MCEB/'

# Illuminated image
asdf_file_uri_illum = base + f'Activity_22/TVAC2_TOHOTQUAL_WFISAT_20240506204523_{WFI_TAG}_uncal.asdf'

dm_illum = open_asdf_datamodel(asdf_file_uri_illum, anon=True)

# Final 3 dark exposures collected in this test
asdf_file_uri_d1 = base + f'Activity_40/TVAC2_TOHOTQUAL_WFISAT_20240506225304_{WFI_TAG}_uncal.asdf'
asdf_file_uri_d2 = base + f'Activity_40/TVAC2_TOHOTQUAL_WFISAT_20240506225611_{WFI_TAG}_uncal.asdf'
asdf_file_uri_d3 = base + f'Activity_40/TVAC2_TOHOTQUAL_WFISAT_20240506225917_{WFI_TAG}_uncal.asdf'
dm16 = open_asdf_datamodel(asdf_file_uri_d1, anon=True)
dm17 = open_asdf_datamodel(asdf_file_uri_d2, anon=True)
dm18 = open_asdf_datamodel(asdf_file_uri_d3, anon=True)

# When done:
dm_illum.close(); dm16.close(); dm17.close(); dm18.close()

In [None]:
# Type of datamodel and info
print(type(dm_illum))
dm_illum.info()

## Calibrate the data
We will ingest data corresponding to a magnitude 4 point source imaged through the F146 filter on detector 11 and three subsequent darks.

In [None]:
# Apply data quality initialization; saturation detection; reference pixel correction; linearity
# The rest of the steps are skipped

# Put datamodels in a dictionary
exps = {
    "illum": dm_illum,
    "exp16": dm16,
    "exp17": dm17,
    "exp18": dm18,
}

# Pipeline options, can also add an output directory
pipe_kwargs = {
    "save_results": False,
    "steps": {
        "dark_current": {"skip": True},
        "rampfit": {"skip": True},
        "assign_wcs": {"skip": True},
        "flatfield": {"skip": True},
        "photom": {"skip": True},
        "source_catalog": {"skip": True},
        "tweakreg": {"skip": True},
    },
}

results = {}
for tag, f in exps.items():
    try:
        res = ExposurePipeline.call(f, **pipe_kwargs)
        results[tag] = res
        # Close model if it does not need to be stored in memory
        # f.close()
        # Adding a break to only do the illuminated exposure due to memory limitations
        break 
    except Exception as e:
        print(f"[{tag}] pipeline failed: {e}")
        results[tag] = None
        break

In [None]:
# Example: access returned models
#dm_illum_out = results["illum"]
#dm16_out     = results["exp16"]
#dm17_out     = results["exp17"]
#dm18_out     = results["exp18"]

## Make some plots
<font color='red'>Write something about the ramps?</font>

In [None]:
dm_cube = results["illum"].data
frames = [2, 27, 54]
baseline_frame = 1

# Compute per-frame differences and percentiles
diffs, vmins, vmaxs = [], [], []
for nframe in frames:
    d = dm_cube[nframe] - dm_cube[baseline_frame]
    diffs.append(d)
    vmin_i = np.percentile(d, 2)
    vmax_i = np.percentile(d, 98)
    vmins.append(vmin_i)
    vmaxs.append(vmax_i)
    print(f"frame {nframe}: 2nd={vmin_i:.3g}, 98th={vmax_i:.3g}")

# Plotting
vmin = min(vmins)
vmax = max(vmaxs)
fig, axes = plt.subplots(1, len(frames), figsize=(5*len(frames), 4), constrained_layout=True)
if len(frames) == 1:
    axes = [axes]

im = None
for ax, nframe, d in zip(axes, frames, diffs):
    im = ax.imshow(d, vmin=vmin, vmax=vmax)
    ax.set_title(
        f"SCA {SCA}, Mag {Mag}, frames {nframe}-1",
        fontweight="bold", fontsize=14
    )
    ax.set_xlabel("x [pix]")
    ax.set_ylabel("y [pix]")

# One shared colorbar
cbar = fig.colorbar(im, ax=axes, shrink=0.9)
cbar.set_label("DN", fontweight="bold", fontsize=16)

plt.show()

### Cutouts around the bright source
We can now take a closer look at a cutout around the bright source. The dark core corresponds to pixels that saturate in the first frame of the ramp and is an artifact of the image processing.

In [None]:
# Define the cutout region
col_0=1900
row_0=1900
q1=256
q2=256
# Get the cutouts for the raw and corrected data
raw_cube_cutout = np.asarray(dm_illum.data[:, row_0-q1:row_0+q1, col_0-q2:col_0+q2])
dm_cube_cutout = dm_cube[:,row_0-q1:row_0+q1, col_0-q2:col_0+q2]
#sat_dn_cutout = sat_dn[row_0-q1:row_0+q1, col_0-q2:col_0+q2]

#### Data quality flags and saturation
<font color='red'>Need description here.</font>
https://roman-pipeline.readthedocs.io/en/latest/roman/dq_init/reference_files.html

In [None]:
mask_bits = 1 | 2 | 16 | 512 | 1024 | 2048 | 2097152 | 1048576 # bad pixel, saturated, gw_affected_data, non_science, dead, hot, no sat check, no lin corr available
pixeldq = results["illum"].pixeldq[row_0-q1:row_0+q1, col_0-q2:col_0+q2]
good = (pixeldq & mask_bits) == 0
print("Good pixel count:         ", good.sum(), "of", pixeldq.size)

In [None]:
# Set masked color to white in the colormap
cmap = plt.cm.viridis.copy()
cmap.set_bad(color="white")

# Compute diffs and per-frame percentiles over good pixels only
diffs_masked, vmins, vmaxs = [], [], []
for nframe in frames:
    d = dm_cube_cutout[nframe] - dm_cube_cutout[baseline_frame]
    dm = np.ma.array(d, mask=bad)
    diffs_masked.append(dm)

    if good.any():
        vmin_i = np.percentile(d[good], 2)
        vmax_i = np.percentile(d[good], 98)
    else:
        # if everything is masked
        vmin_i, vmax_i = np.min(d), np.max(d)
    vmins.append(vmin_i); vmaxs.append(vmax_i)
    print(f"frame {nframe}: 2nd={vmin_i:.3g}, 98th={vmax_i:.3g}")

# Shared limits across panels
vmin, vmax = min(vmins), max(vmaxs)

# Plot side-by-side with one shared colorbar
fig, axes = plt.subplots(1, len(frames), figsize=(5*len(frames), 4), constrained_layout=True)
if len(frames) == 1:
    axes = [axes]

im = None
for ax, nframe, dm in zip(axes, frames, diffs_masked):
    im = ax.imshow(dm, vmin=vmin, vmax=vmax, cmap=cmap, origin="lower")
    ax.set_title(f"SCA {SCA}, Mag {Mag}, frames {nframe}-1", fontweight="bold", fontsize=14)
    ax.set_xlabel("x [pix]"); ax.set_ylabel("y [pix]")

cbar = fig.colorbar(im, ax=axes, shrink=0.9)
cbar.set_label("DN", fontweight="bold", fontsize=16)
plt.show()

In [None]:
# There's probably a better way to do this
# Read the saturation thresholds (DN) from the reference file
sat_path = os.path.expanduser("~/crds_cache/references/roman/wfi/roman_wfi_saturation_0030.asdf")
print(sat_path)

with SaturationRefModel(sat_path) as satref:
    # 4096 x 4096 array thresholds in DN
    sat_dn = np.asarray(satref.data)

In [None]:
# Do we want to do any processing of superbias to compare removing that vs using the romancal steps?

In [None]:
# Functions to identify saturation
# homogenize format with earlier function definitions
def _align_sat_dn(sat_dn, frames_hw):
    """
    Make sat_dn match the (H, W) of the frames.
    If sat_dn is 4096x4096 and frames are 4088x4088 (no ref pixels),
    crop 4 px off each edge.
    """
    H, W = frames_hw
    sh, sw = sat_dn.shape
    if (sh, sw) == (H, W):
        return sat_dn
    if (sh - H == 8) and (sw - W == 8):
        # trim reference pixels: center-crop by 4 on each side
        return sat_dn[4:-4, 4:-4]
    raise ValueError(f"sat_dn shape {sat_dn.shape} does not match frames {(H, W)}")

def make_saturation_mask_dn(frames_dn, sat_dn):
    """
    DN-based saturation masks using a per-pixel DN threshold.

    Parameters
    ----------
    frames_dn : (N, H, W) float/uint array
        Non-destructive-read frames in DN.
    sat_dn : (H, W) float array
        Per-pixel saturation limits in DN. NaNs are treated as 'no check' (never saturate).

    Returns
    -------
    sat_mask        : (N, H, W) bool  -- pixels >= sat_dn in each frame
    nearby_sat_mask : (N, H, W) bool  -- 4-connected neighbors of saturated pixels (excluding the saturated ones)
    new_sat_mask    : (N, H, W) bool  -- pixels that become saturated in current frame (unsat in prev)
    diag_sat_mask   : (N, H, W) bool  -- diagonal neighbors (excluding sat + nearby)
    """
    assert frames_dn.ndim == 3, "frames_dn must be (N,H,W)"
    N, H, W = frames_dn.shape

    # Align and sanitize sat_dn
    sat_dn_use = _align_sat_dn(np.asarray(sat_dn), (H, W))
    # Treat NaN thresholds as 'never saturating'
    sat_dn_use = np.where(np.isfinite(sat_dn_use), sat_dn_use, np.inf).astype(frames_dn.dtype, copy=False)

    # Core saturation check (broadcast across frames)
    sat_mask = frames_dn >= sat_dn_use  # (N,H,W) bool

    # Neighborhood kernels (no center in 4-neighborhood kernel)
    kernel_4 = np.array([[0,1,0],
                         [1,0,1],
                         [0,1,0]], dtype=np.uint8)
    kernel_diag = np.array([[1,0,1],
                            [0,0,0],
                            [1,0,1]], dtype=np.uint8)

    nearby_sat_mask = np.zeros_like(sat_mask, dtype=bool)
    diag_sat_mask   = np.zeros_like(sat_mask, dtype=bool)
    new_sat_mask    = np.zeros_like(sat_mask, dtype=bool)

    for i in range(N):
        s = sat_mask[i]

        # Convolve boolean as uint8 to count neighboring saturated pixels
        conv4  = convolve2d(s.astype(np.uint8), kernel_4,  mode='same', boundary='fill', fillvalue=0) > 0
        convdg = convolve2d(s.astype(np.uint8), kernel_diag, mode='same', boundary='fill', fillvalue=0) > 0

        # neighbors that are not themselves saturated
        nearby_sat_mask[i] = (~s) & conv4
        # diagonal neighbors that are neither saturated nor 4-neighbors
        diag_sat_mask[i]   = (~s) & (~nearby_sat_mask[i]) & convdg

        # newly saturated in this frame (unsat previously)
        if i == 0:
            new_sat_mask[i] = s
        else:
            new_sat_mask[i] = s & (~sat_mask[i-1])

    return sat_mask, nearby_sat_mask, new_sat_mask, diag_sat_mask


def make_mean_debiased_difference_image_dn(frames_dn, raw_frames_dn, sat_dn):
    """
    Compute the mean of frame-to-frame differences (DN) while excluding
    saturated pixels and their 4-neighbors based on per-pixel DN limits.

    Parameters
    ----------
    frames_dn     : (N,H,W) array in DN
        Pre-processed frames (e.g., bias-subtracted) used to compute differences.
    raw_frames_dn : (N,H,W) array in DN
        Raw (or minimally processed) frames aligned with `frames_dn` used for saturation masking.
    sat_dn        : (H,W) array in DN
        Per-pixel saturation limits in DN.

    Returns
    -------
    mean_diff_frame : (H,W) float32 array
        Mean of diffs along time, excluding saturated + nearby pixels.
        NaN where no valid samples remain.
    """
    assert frames_dn.shape == raw_frames_dn.shape, "frames_dn and raw_frames_dn must align (N,H,W)"
    # Differences along time axis
    frame_diffs = np.diff(frames_dn, axis=0)  # shape (N-1,H,W)

    # Build masks from the raw data (DN)
    sat_mask, nearby_mask, _, _ = make_saturation_mask_dn(raw_frames_dn, sat_dn)
    # Align mask count with diffs (diffs start at frame 1)
    valid = ~(sat_mask[1:] | nearby_mask[1:])  # (N-1,H,W) bool

    # Weighted mean with binary weights; safe against zero-division
    w = valid.astype(np.float32)
    num = (frame_diffs * w).sum(axis=0, dtype=np.float64)
    den = w.sum(axis=0, dtype=np.float64)

    mean_diff_frame = np.divide(
        num, den,
        out=np.full(den.shape, np.nan, dtype=np.float32),
        where=den > 0
    ).astype(np.float32, copy=False)

    return mean_diff_frame


In [None]:
# Make a cutout for the saturation array to match the earlier raw and calibrated cutouts
sat_dn_cutout = sat_dn[row_0-q1:row_0+q1, col_0-q2:col_0+q2]

# Calculate the Mean slope image
mean_slope_image = make_mean_debiased_difference_image_dn(dm_cube_cutout, raw_cube_cutout, sat_dn_cutout)

# Get the 3D saturation mask
sat_mask, nearby_sat_mask, new_sat_mask, diag_mask= make_saturation_mask_dn(raw_cube_cutout, sat_dn_cutout)

# Get the difference images (AKA the instantaneous slope)
difference_images = np.diff(dm_cube_cutout, axis=0)                                                 

For three representative frames (the 3rd, the middle, and the last), it makes one figure per frame with three side-by-side panels:

Left – the raw image for that frame (scaled by 1e3 for display).

Center – a relative slope map = (difference_images[i-1] / mean_diff) to highlight pixels whose per-frame change deviates from the average slope (saturation and nonlinear effects can push values below 1).

Right – a combined mask visualization encoding saturated/nearby/corrupted pixels via a weighted sum.

In [None]:
# Need to clean up the below:
#frames = cube_cutout                      # shape: (N, y, x)
frames = dm_cube_cutout
N = len(frames)

# Choose exactly three frames: first usable (2), middle, and last
frames_to_plot = sorted({2, max(2, N//2), N-1})
print("Plotting frames:", frames_to_plot)

for i in frames_to_plot:
    if i < 2 or i >= N:
        continue  # safety

    fig, axes = plt.subplots(1, 3, figsize=(10, 3), sharex=True, sharey=True)

    # LEFT: current frame signal (rescale to ~same range as your original)
    cax1 = axes[0].imshow(frames[i] / 1e3, cmap='inferno_r', origin='lower', vmin=0, vmax=80)
    cbar1 = plt.colorbar(cax1, ax=axes[0], label='Signal [$\\times 10^3\\, DN$]')

    # CENTER: difference / mean slope (robust to zeros/NaNs)
    ratio = np.divide(
        difference_images[i-1],                    # assumes difference_images is length N-1
        mean_diff,
        out=np.full_like(difference_images[i-1], np.nan, dtype=np.float32),
        where=np.isfinite(mean_diff) & (mean_diff != 0)
    )
    cax2 = axes[1].imshow(ratio, cmap='RdYlBu', vmin=0.75, vmax=1.25, origin='lower')
    cbar2 = plt.colorbar(cax2, ax=axes[1], label='Frame Slope / Mean Slope')

    # RIGHT: masks (saturated, nearby saturated, corrupted)
    cax3 = axes[2].imshow(sat_mask[i]*3. + nearby_sat_mask[i]*2 + diag_mask[i], origin='lower', cmap='magma_r')
    cbar3 = plt.colorbar(cax3, ax=axes[2], label='Mask')
    
    # Titles & annotation
    axes[0].set_title('Image')
    axes[1].set_title('Difference / Mean Slope')
    axes[2].set_title('Saturated or Corrupted')

    axes[0].text(
        0.05, 0.95, f'Frame {i}', ha='left', va='top', transform=axes[0].transAxes,
        bbox=dict(facecolor='w', edgecolor='0.8', pad=5.0)
    )

    # Keep your zoom window
    #for ax in axes:
    #    ax.set_xlim(6, 36)
    #    ax.set_ylim(6, 36)

    plt.tight_layout()

    # Optional: save just these three figures (folder name updated to mag4)
    # plt.savefig(f'mag4_gif_files/mag4_saturation_mask_frame_{i:03d}.png', dpi=150, bbox_inches='tight')

    plt.show()


## Additional Resources
- [Schlieder et al. 2024](https://ui.adsabs.harvard.edu/abs/2024SPIE13092E..0SS/abstract)
- [TVAC2 Bright Star Saturation Test Summary](https://asd.gsfc.nasa.gov/roman/WFI_Bright_Star/TVAC2_Bright_star_saturation_summary_release.pdf)
- [romancal](https://roman-pipeline.readthedocs.io/en/latest/index.html)
- [Roman Documentation](https://roman-docs.stsci.edu)

## About this Notebook
**Author:** Dana Louie, Robby Wilson, Ami Choi\
**Updated On:** 2025-09-29

***

[Top of Page](#top)
<img style="float: right;" src="https://raw.githubusercontent.com/spacetelescope/notebooks/master/assets/stsci_pri_combo_mark_horizonal_white_bkgd.png" alt="Space Telescope Logo" width="200px"/> 