In [None]:
import os
import numpy as np
import re
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, freqz, hilbert
from datetime import datetime, timedelta
from scipy.interpolate import CubicSpline, interp1d
from mpl_toolkits.axes_grid1 import make_axes_locatable

plt.rcParams["figure.dpi"] = 300

In [None]:
def convert_gps_time(header_values):
    """Converts GPS time in milliseconds to week number, seconds-of-week, and UTC date"""
    gps_seconds_total = header_values // 1000
    gps_milliseconds = header_values % 1000
    #gps_seconds_total -= 18
    gps_weeks = gps_seconds_total // (7 * 24 * 3600)
    gps_seconds = (gps_seconds_total % (7 * 24 * 3600)) + (gps_milliseconds / 1000.0)
    gps_epoch = datetime(1980, 1, 6)
    utc_dates = np.array([gps_epoch + timedelta(weeks=int(week), seconds=float(sec)) 
                          for week, sec in zip(gps_weeks, gps_seconds)])
    hms_time = [utc.strftime("%H:%M:%S.%f")[:-3] for utc in utc_dates]
    return gps_weeks, gps_seconds, utc_dates, hms_time

def load_motion_data(file_path, data_start=18):
    """ Load GPS/IMU motion data from NovAtel logs"""
    dtypes = [
        ('time', 'U10'),  
        ('date', 'U10'),
        ('week_number', 'U10'),
        ('seconds_of_week', 'f8'),
        ('lat', 'f8'),
        ('lon', 'f8'),
        ('height-ell', 'f8'),
        ('height-msl', 'f8'),
        ('undulation', 'f8'),
        ('x_ecef', 'f8'),
        ('y_ecef', 'f8'),
        ('z_ecef', 'f8'),
        ('pitch', 'f8'),
        ('roll', 'f8'),
        ('heading', 'f8'),
        ('cog', 'f8')
    ]
    
    data = np.genfromtxt(file_path, skip_header=data_start, dtype=dtypes)
    return data

def extract_motion_data(data, gps_start_seconds, gps_end_seconds):
    """Extracts motion data for the given SAR data time range."""
    gps_seconds = data['seconds_of_week']
    sorted_indices = np.argsort(gps_seconds)
    gps_seconds = gps_seconds[sorted_indices]
    data = data[sorted_indices]
    start_idx = np.searchsorted(gps_seconds, gps_start_seconds, side='left')
    end_idx = np.searchsorted(gps_seconds, gps_end_seconds, side='right')
    start_idx = max(0, start_idx - 1)
    end_idx = min(len(gps_seconds) - 1, end_idx)
    extracted_motion = data[start_idx:end_idx + 1]
    return extracted_motion

def resample_motion_data(motion_data, radar_time_grid):
    """Resamples motion data using a CubicSpline to match the SAR time grid."""
    motion_timestamps = motion_data['seconds_of_week']
    resampled_motion = np.zeros(len(radar_time_grid), dtype=motion_data.dtype)
    resampled_motion['seconds_of_week'] = np.round(radar_time_grid, decimals=6)
    for field in motion_data.dtype.names:
        if field in ['time', 'date', 'week_number']:
            resampled_motion[field] = motion_data[field][0]
            continue
        motion_values = motion_data[field]
        if np.issubdtype(motion_values.dtype, np.number):
            interpolator = CubicSpline(motion_timestamps, motion_values, extrapolate=True)
            resampled_motion[field] = np.round(interpolator(radar_time_grid), decimals=6)
        else:
            resampled_motion[field] = motion_data[field][0]
    return resampled_motion

def sinc_interp_1d(data, x_actual, x_uniform, sinc_window=8, window_type=None, beta=8.6):
    """Interpolate SAR data in azimuth using sinc-interpolation."""

    N_uniform = len(x_uniform)
    N_rg = data.shape[1]
    data_interp = np.zeros((N_uniform, N_rg), dtype=data.dtype)

    dx = np.median(np.diff(x_actual))  #estimated spacing

    for i, xu in enumerate(x_uniform):
        #relative positions
        rel_pos = (xu - x_actual) / dx

        #limit to window
        mask = np.abs(rel_pos) < sinc_window
        rel = rel_pos[mask]

        if len(rel) == 0:
            continue

        #sinc weights
        weights = np.sinc(rel)

        #window functions
        if window_type == 'hamming':
            window = np.hamming(len(rel))
            weights *= window
        elif window_type == 'hanning':
            window = np.hanning(len(rel))
            weights *= window
        elif window_type == 'kaiser':
            window = np.kaiser(len(rel), beta)
            weights *= window

        #normalize and interpolate
        weights /= np.sum(weights)
        data_interp[i] = np.dot(weights, data[mask])

    return data_interp

# Load Raw Radar Data: #

In [None]:
def natural_sort_key(s):
    """Sort files naturally by extracting numeric parts."""
    return [int(text) if text.isdigit() else text for text in re.split(r'(\d+)', s)]

def load_data(directory, n_samp, header_samp, start_sample=0, end_sample=None, start_idx=0, stop_idx=-1):
    """Loads SAR data from binary files, allowing flexible sample range selection.
    Inputs:
    
    n_samp: number of total samples (should be 100,004 for 2025 SNOWWI datasets)
    header_samp: number of header samples (should be 4 for 2025 SNOWWI datasets)
    start_sample: desired starting range sample
    end_sample: desired ending range sample
    start_idx: starting radar data file
    stop_idx: ending radar data file

    """
    
    files = sorted(os.listdir(directory), key=natural_sort_key)
    num_files = len(files)
    
    if stop_idx == -1 or stop_idx >= num_files:
        stop_idx = num_files - 1
    if start_idx < 0 or start_idx >= num_files or stop_idx < start_idx:
        raise ValueError("Invalid start_idx or stop_idx range.")
    
    selected_files = files[start_idx:stop_idx + 1]
    num_selected_files = len(selected_files)
    
    first_file_path = os.path.join(directory, selected_files[0])
    first_file_size = os.path.getsize(first_file_path)

    #calculate records per file based on full `n_samp`
    num_records_per_file = first_file_size // (n_samp * 2)
    total_records = num_selected_files * num_records_per_file

    #set default `end_sample` if not provided
    if end_sample is None or end_sample > (n_samp - header_samp):
        end_sample = n_samp - header_samp

    if start_sample < 0 or start_sample >= end_sample:
        raise ValueError("Invalid start_sample or end_sample range.")

    #compute the final number of samples to be loaded per record
    num_samples_per_record = end_sample - start_sample

    #pre-allocate arrays
    header_array = np.empty((total_records, header_samp), dtype=np.uint16)
    radar_data = np.empty((total_records, num_samples_per_record), dtype=np.int16)

    record_idx = 0

    for file in selected_files:
        file_path = os.path.join(directory, file)
        
        with open(file_path, 'rb') as f:
            #read full records per file
            data = np.fromfile(f, dtype=np.int16).reshape(-1, n_samp)
            
            num_records = data.shape[0]
            header_array[record_idx:record_idx + num_records] = data[:, :header_samp]
            radar_data[record_idx:record_idx + num_records] = data[:, header_samp + start_sample : header_samp + end_sample]

            record_idx += num_records
    
    #convert header to integer values if necessary
    bit_shifts = np.array([2**48, 2**32, 2**16, 2**0])
    header_values = np.dot(header_array, bit_shifts)

    return radar_data, header_values, start_sample, end_sample

# Preprocessing: #
The chirp scaling algorithm, like other frequency-domain SAR processing techniques, assumes an ideal flight geometry. A key requirement is uniform along-track spacing between azimuth samples. Because these algorithms rely on FFT operations, such uniformity must be enforced through resampling. Although non-uniform Fourier transform methods exist, they are not employed here.

## Objectives: ##

- Load radar data

- Load motion (GPS/IMU) data

- Estimate the constant platform velocity

- Resample the radar data to achieve uniform azimuth spacing

In [None]:
start_time = datetime.now() #let's time the computation


sar_data_path = r'<path to the date files>'
start_idx = 720 #these files contain corner reflectors
stop_idx = 740

radar_data, header_values, start_sample, end_sample = load_data(sar_data_path, n_samp=100004, header_samp=4, start_sample=0, end_sample=None, start_idx=start_idx, stop_idx=stop_idx)
print('Radar Date Type: ', radar_data.dtype)
print('Radar Data Shape: ', radar_data.shape) #should be of shape (Naz, Nrg)
gps_weeks, gps_seconds, utc_dates, hms_time = convert_gps_time(header_values)

motion_data_path = r'~/motion data/20250205_gm.txt'
motion_data = load_motion_data(motion_data_path)

extracted_motion = extract_motion_data(motion_data, gps_start_seconds=gps_seconds[0], gps_end_seconds=gps_seconds[-1])
radar_time_grid = np.round(np.arange(gps_seconds[0], gps_seconds[-1], 1/1000), decimals=6) #prf means 1/1000 time spacing
print("flightline segment start time:", gps_seconds[0], "flightline segment end time:", gps_seconds[-1])
resampled_motion = resample_motion_data(extracted_motion, radar_time_grid)

#compute x,y,z velocities
prf = 1e3
dt = 1/prf

vx = np.diff(resampled_motion['x_ecef']) / dt
vx = np.append(vx, vx[-1])  #extend last value to match length
vy = np.diff(resampled_motion['y_ecef']) / dt
vy = np.append(vy, vy[-1])
vz = np.diff(resampled_motion['z_ecef']) / dt
vz = np.append(vz, vz[-1])

velocity = np.sqrt(vx**2 + vy**2 + vz**2)  #instantaneous speed (m/s)

distance = np.cumsum(velocity * dt)
distance -= distance[0] #start at 0

#create uniformly spaced distance vector
uniform_distance = np.linspace(distance[0], distance[-1], len(distance))  # same number of samples

#interpolate SAR data onto uniform distance grid (azimuth axis)
resampled_data = np.zeros_like(radar_data, dtype=np.int16)
for i in range(radar_data.shape[1]):  #loop over range bins
    f = interp1d(distance, radar_data[:, i], kind='linear', fill_value="extrapolate")
    resampled_data[:, i] = f(uniform_distance)

#compute average constant velocity
total_distance = distance[-1] - distance[0]
total_time = len(velocity) * dt
constant_velocity = total_distance / total_time
print("Constant velocity used:", constant_velocity)

In [None]:
def compute_flight_direction(x_ecef, y_ecef, z_ecef):
    """Returns the (unit) direction vector of the reference trajectory."""
    coords = np.stack([x_ecef, y_ecef, z_ecef], axis=1)
    mean = coords.mean(axis=0)
    u, s, vh = np.linalg.svd(coords - mean)
    direction = vh[0]
    direction /= np.linalg.norm(direction)
    return direction  #this is the local along-track direction

def ecef_to_local(x_ecef, y_ecef, z_ecef, direction_vec, 
                  origin_ecef=None):
    """Converts arrays of ECEF coordinates into platform-centric (along, cross, up),
    but first subtracts an origin so that 0,0,0 is your starting point."""
    #stack into an (N,3) array
    coords = np.stack([x_ecef, y_ecef, z_ecef], axis=1)
    #choose an origin: first sample or mean
    if origin_ecef is None:
        origin_ecef = coords[0] #or coords.mean(axis=0)
    #shift so origin is zero
    coords_rel = coords - origin_ecef
    
    #build constant unit-axes
    z_unit = origin_ecef / np.linalg.norm(origin_ecef)  #up
    x_unit = direction_vec                          
    y_unit = np.cross(z_unit, x_unit);  y_unit /= np.linalg.norm(y_unit)
    #re-orthogonalize X just to be safe
    x_unit = np.cross(y_unit, z_unit);  x_unit /= np.linalg.norm(x_unit)
    
    #now project all points relative to origin
    x_along = coords_rel @ x_unit
    y_cross = coords_rel @ y_unit
    z_up    = coords_rel @ z_unit

    return x_along, y_cross, z_up

def compute_ideal_trajectory(along_track, cross_track, up):
    """Fit a 3D straight line to (along_track, cross_track, up) platform-centric positions.
    Returns arrays for along_ideal, cross_ideal, up_ideal (projected points on the ideal line),
    oriented so “forward” is positive and anchored to start at (0,0,0)."""
    #build Nx3 array & center on the mean
    coords = np.stack([along_track, cross_track, up], axis=1)
    mean   = coords.mean(axis=0)
    M      = coords - mean

    #PCA via SVD to get the principal direction
    _, _, vh = np.linalg.svd(M, full_matrices=False)
    direction = vh[0]           #unit-length up to sign

    #make sure it points from start to end (not backwards)
    if np.dot(direction, coords[-1] - coords[0]) < 0:
        direction = -direction

    #project every point onto that line
    t_proj   = M @ direction               #scalar distance along the line for each sample
    projected = mean + np.outer(t_proj, direction)

    #shift so that the very first point is at the origin (0,0,0)
    projected -= projected[0]

    #unpack
    along_ideal, cross_ideal, up_ideal = projected[:,0], projected[:,1], projected[:,2]
    return along_ideal, cross_ideal, up_ideal

In [None]:
#motion compensation terms
times = resampled_motion['seconds_of_week'] - resampled_motion['seconds_of_week'][0]
x = resampled_motion['x_ecef']
y = resampled_motion['y_ecef']
z = resampled_motion['z_ecef']
elev = resampled_motion['heightell']
roll = resampled_motion['roll']
pitch = resampled_motion['pitch']
cog = resampled_motion['cog']
heading = resampled_motion['heading']

cog_corrected = np.array(
    [cog_i if cog_i < 180 else cog_i - 360 for cog_i in cog] #need to bound COG properly
)
yaw = np.mod(heading - cog_corrected + 360, 360) #yaw needs to be calculated from COG and Heading because it is not output by the NovAtel
yaw_corrected = np.array(
    [yaw_i if yaw_i < 180 else yaw_i - 360 for yaw_i in yaw]
)

#compute flight direction in ECEF
direction_vec = compute_flight_direction(
    resampled_motion['x_ecef'],
    resampled_motion['y_ecef'],
    resampled_motion['z_ecef'],
)

#convert ECEF to local (along-track, cross-track, up), zeroed at origin
x_along, y_cross, z_up = ecef_to_local(
    resampled_motion['x_ecef'],
    resampled_motion['y_ecef'],
    resampled_motion['z_ecef'],
    direction_vec
)

#fit the ideal straight‐line trajectory in local frame
x_along_ideal, y_cross_ideal, z_up_ideal = compute_ideal_trajectory(
    x_along,
    y_cross,
    z_up
)

#deviations in cross-track and up dimensions for 1st-Order Motion Compensation (MOCO)
delta_y = y_cross - y_cross_ideal
print("Mean Cross Track Deviation: ", np.mean(delta_y))
delta_z = z_up - z_up_ideal
print("Mean Height Deviation: ", np.mean(delta_z))

In [None]:
radar_data = resampled_data

In [None]:
#free up memory
del sar_data_path, motion_data_path, gps_weeks, gps_seconds, utc_dates, hms_time, y_cross, z_up, x_along, y_cross_ideal 
del z_up_ideal, direction_vec, x_along_ideal, resampled_data, vx, vy, vz, cog, extracted_motion, resampled_motion
del distance, header_values, heading, motion_data, radar_time_grid, yaw

# Plot Un-Modified Motion: #
This is useful for knowing how the motion is behaving over the flightline. It serves as a quick eye-ball check for how much squint is happening (mostly due to yaw).

In [None]:
fig, axs = plt.subplots(5, 1, figsize=(14, 18), sharex=True)

axs[0].plot(times, yaw_corrected, label='Yaw Corrected', color='tab:blue')
axs[0].set_ylabel('Yaw (deg)', fontsize=14)
axs[0].legend(loc='upper right')
axs[0].grid(True)

axs[1].plot(times, pitch, label='Pitch', color='tab:orange')
axs[1].set_ylabel('Pitch (deg)', fontsize=14)
axs[1].legend(loc='upper right')
axs[1].grid(True)

axs[2].plot(times, roll, label='Roll', color='tab:green')
axs[2].set_ylabel('Roll (deg)', fontsize=14)
axs[2].legend(loc='upper right')
axs[2].grid(True)

axs[3].plot(times, velocity, label='Velocity', color='tab:red') #this is prior to making the velocity constant, so it looks "noisy"
axs[3].set_ylabel('Velocity (m/s)', fontsize=14)
axs[3].legend(loc='upper right')
axs[3].grid(True)

axs[4].plot(times, elev, label='Elevation', color='tab:purple')
axs[4].set_ylabel('Elevation (m)', fontsize=14)
axs[4].set_xlabel('Time (s)', fontsize=14)
axs[4].legend(loc='upper right')
axs[4].grid(True)

plt.tight_layout()
plt.show()

# SAR Parameters: #
Below are the SAR parameters used for the last 20250205 GM flightline

In [None]:
fs = 1.2288e9 #sampling frequency of the SNOWWI system
t_p = 14e-6 #pulse width for Grand Mesa flightlines (determined preflight from slant range to ground)
c = 3e8 #speed of light
f0 = 5.39e9 #center frequency for C-band
wavelength = c/f0
print("Wavelength: ", wavelength, "[m]")
print("PRF : ", prf, "[Hz]")
V_r = constant_velocity #m/s
del constant_velocity
f_low = 340e6 #part of frequency stack for C-band
f_high = 420e6 #part of frequency stack for C-band
f_c = (f_high + f_low) / 2
print("Center Frequency : ", f_c, "[Hz]")
bandwidth = f_high - f_low
print("Bandwidth : ", bandwidth, "[Hz]")
chirp_delay = 0
K_r = (f_high - f_low)/t_p
print("Range Chirp Rate : ", K_r, "[Hz/s]")
print("PRI : ", dt, "[s]")

dx = np.mean(np.diff(uniform_distance))   #m/sample
prf_eff = V_r / dx                        #samples/sec
print("Effective PRF:", prf_eff)
#f_a = np.fft.fftfreq(N_az, d=1/prf_eff)

# Quadrature Demodulation (via Hilbert): #
Next, we need to convert the real-valued data to complex-valued which is performed using the Hilbert transform. Below is a great resource for more information on discrete complex downconversion.

https://www.dsprelated.com/showarticle/153.php

Since we are going to take FFTs and the FFT of a real-valued input is Hermitian-symmetric, we need to convert to complex values to get a single-sidebanded signal. A real-valued dataset would result in a double sidebanded FFT (folded over 0 Hz or DC) which would make it very difficult to distinguish direction.

In [None]:
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5, plot=False):
    nyq   = 0.5 * fs
    low   = lowcut  / nyq
    high  = highcut / nyq
    b, a  = butter(order, [low, high], btype='band')
    #normalize at center frequency
    center_freq = (lowcut + highcut) / 2
    w, h = freqz(b, a, worN=[2*np.pi*center_freq/fs])
    if plot:
        w, h = freqz(b, a, worN=data.shape[1])
        plt.figure(figsize=(10, 6))
        plt.plot(w * nyq / np.pi, abs(h), label='Frequency Response')  # Convert normalized frequency to Hz
        plt.axvline(lowcut, color='red', linestyle='--', label='Low Cutoff Frequency')
        plt.axvline(highcut, color='red', linestyle='--', label='High Cutoff Frequency')
        plt.title('Frequency Response of Bandpass Filter')
        plt.xlabel('Frequency (Hz)')
        plt.ylabel('Gain')
        plt.grid()
        plt.legend()
        plt.show()

    #b = b / abs(h[0])
    y = filtfilt(b, a, data, axis=1) #forward/backward filtering to preserve phase
    return y

In [None]:
radar_data = butter_bandpass_filter(radar_data, f_low-1e6, f_high+1e6, fs, order=5, plot=True)

# (Optional) Troubleshooting Plot: #
Plots the FFT of the middle azimuth line (so FFT over range).

In [None]:
#fft_mid_azm = np.fft.fft(radar_data[radar_data.shape[0]//2, :])
#fft_mid_azm_freqs = np.fft.fftfreq(len(fft_mid_azm), d=1/fs)
#plt.figure(figsize=(10, 6), dpi=500)
#plt.plot(fft_mid_azm_freqs, np.abs(fft_mid_azm), label='FFT of Middle Azimuth Bin', color='blue')
#plt.axvline(x=340e6, color='red', linestyle='--', label='Low Frequency (340 MHz)')
#plt.axvline(x=420e6, color='red', linestyle='--', label='High Frequency (420 MHz)')
#plt.title('FFT of Middle Azimuth Bin', fontsize=24, fontweight='bold')
#plt.xlabel('Frequency (Hz)', fontsize=20)
#plt.ylabel('Magnitude', fontsize=20)
#plt.xlim(0, fs/2) 
#plt.grid(True)
#plt.legend(fontsize=16)
#plt.tight_layout()

In [None]:
def analytic_downconvert(data, fs, f_c):
    """Downconvert real-valued SAR data to complex baseband using Hilbert transform."""
    analytic = hilbert(data, axis=1)
    N_rg = analytic.shape[1]
    n = np.arange(N_rg)

    mix = np.exp(-1j * 2 * np.pi * f_c * n / fs)
    analytic *= mix

    return analytic

iq_data = analytic_downconvert(radar_data, fs, f_c)
del radar_data  #free memory, doing this immediately after because radar_data is huge

# Remove DC Bias: #
I don't know if this is still needed but doing so just in case.

In [None]:
iq_data -= iq_data.mean(axis=1, ).reshape(-1, 1) #remove DC bias

# Azimuth & Range Calculations: #

In [None]:
data = iq_data[:,:50000] #preserving memory by removing half of the range samples
N_az = data.shape[0]
print("Number of Azimuth Lines : ", N_az)
t = np.linspace(0, data.shape[0] / prf, data.shape[0])  #slow time

N_rg = data.shape[1]
print("Number of Range Samples : ", N_rg)
dr = c / (2*fs)
print("Range Separation : ", dr, "[m]")
R_offset = 1.35e3 #range offset for Grand Mesa
#H = 2247  #height of the platform above ground in meters
Rmin = R_offset
Rmax = Rmin + (N_rg * dr)
print("Range Vector : ", Rmin, "to", Rmax, "[m]")
R = np.linspace(Rmin, Rmax, N_rg)
tau = 2 * R / c #fast time delay

# First-Order MOCO: #
First-order motion compensation corrects deviations of the platform trajectory from its ideal, straight-line path. Conceptually, if we imagine a straight reference line representing the nominal flight path, any deviation of the actual trajectory from this line introduces a change in the radar-to-target range. This range error translates directly into a phase error in the received signal.

To correct for this, the actual trajectory is projected onto the reference (ideal) path, and a corresponding phase correction term is applied to the radar data to remove the range difference between the measured and reference trajectories. First-order MOCO compensates for phase errors due to translational motion deviations along the line of sight.

# Calculating the LOS1 (first-order moco) Phase Term: #
- LOS1 is line-of-sight phase term for 1st-order MOCO.
This is performed by the following:
$$
rlos1 = -\Delta y \cdot \cos{\gamma_{l_{ref}}} + \Delta z \cdot \sin{\gamma_{l_{ref}}}
$$

where,

$$
\gamma_{l_{ref}} = \arcsin{\frac{h}{r_{ref}}}
$$

In [None]:
def first_order_moco(delta_y, delta_z, wavelength, h, r_ref):
    """First-Order motion compensation which corrects for translational motion errors."""
    #y is LEFT, look is LEFT -> LOS_y = +cosψ, LOS_z = -sinψ
    #use direction cosines without arcsin:
    uz = np.clip(h / r_ref, -1.0, 1.0)                 # = sinψ  (>=0)
    uy = np.sqrt(np.maximum(0.0, 1.0 - uz**2))         # = cosψ  (>=0)

    #ΔR1 = -Δy*cosψ + Δz*sinψ   (left-looking, y=left)
    dR1 = -delta_y * uy + delta_z * uz
    H_mc1 = np.exp(-1j * (4*np.pi / wavelength) * dR1)
    return H_mc1, dR1


r_ref = R[R.shape[0] // 2]  #scalar ref slant range
print("Reference Range:", r_ref)
H = 2247 #height per azimuth sample, this is a rough estimate, better estimates should be determined via DEM
print("Height per Azimuth Sample:", H)
ratio = H / r_ref
print("min:", np.min(ratio), "max:", np.max(ratio))
H_mc1, dR1 = first_order_moco(delta_y, delta_z, wavelength, H, r_ref)
data = (data * H_mc1[:, None]).astype(np.complex64)

# Calculate Doppler Bandwidth Using 3dB Sinc Beamwidth: #

In [None]:
#low squint case:
def doppler_bandwidth_estimation(V_r):
    doppler_band = 2 * 0.886 * V_r
    return doppler_band

In [None]:
dop_b_est = doppler_bandwidth_estimation(V_r)
print("Estimated Doppler Bandwidth (Hz):", dop_b_est)

# Calulate Exposure Time: #

In [None]:
def exposure_time_estimation(dop_b_est, V_r):
    """Estimate the exposure time based on the Doppler bandwidth and velocity for low squint case."""
    exposure_time = 0.886 * V_r / dop_b_est
    return exposure_time

In [None]:
exposure_time = exposure_time_estimation(dop_b_est, V_r)
print("Estimated Exposure Time (s):", exposure_time)

# Convert Exposure Time to Azimuth Samples: 

In [None]:
#function to convert exposure time to azimuth samples
def exposure_time_to_azimuth_samples(exposure_time, prf):
    """Convert exposure time to azimuth samples."""
    azimuth_samples = int(np.round(exposure_time * prf))
    return azimuth_samples

In [None]:
azimuth_zero_pad_len = exposure_time_to_azimuth_samples(exposure_time, prf)
print("Azimuth Zero Padding Length:", azimuth_zero_pad_len)

# Azimuth Fourier Transform: #

In [None]:
data = np.pad(data, ((0, azimuth_zero_pad_len), (0, 0)), mode='constant') #pad to prevent circular convolution troubles
data = np.fft.fft(data, axis=0).astype(np.complex64)
f_a = np.fft.fftfreq(data.shape[0], d=1/prf)

# Calculate Doppler Bandwidth: #

In [None]:
#function to determine Doppler centroid estimate from the data (brightest points in the range-Doppler image)
def estimate_doppler_centroid(data, f_a):
    """
    Estimate the Doppler centroid from the range-Doppler image.
    Returns the estimated Doppler frequency and its index.
    """
    mag = np.abs(data)
    peak_doppler_indices = np.argmax(mag, axis=0)  #axis=0: Doppler dimension
    peak_doppler_freqs = f_a[peak_doppler_indices]  #Doppler frequencies corresponding to the peak magnitude
    return peak_doppler_freqs, peak_doppler_indices 

def calculate_squint_from_doppler(f_dc, wavelength, V_r):
    """
    Calculate the squint angle from the Doppler centroid frequency.
    f_dc: Doppler centroid frequency
    wavelength: Wavelength of the radar signal
    V_r: Mean velocity of the platform
    """
    #squint angle formula
    squint_angle = np.arcsin((f_dc * wavelength) / (2 * V_r))
    return squint_angle #radians

In [None]:
bw = np.deg2rad(3) #beamwidth in radians, estimated beamwidth for C-band is around 3 deg
f_dc = estimate_doppler_centroid(data, f_a)[0]  #estimate Doppler centroid frequency
f_dc_mean = np.mean(f_dc[17203:])  #mean Doppler centroid frequency
print("Doppler Centroid Frequency (Hz):", f_dc)
print("Mean Doppler Centroid Frequency (Hz):", f_dc_mean)
squint_angle = calculate_squint_from_doppler(f_dc, wavelength, V_r)  #Calculate squint angle from Doppler centroid
theta_sq = np.mean(squint_angle[17203:]) #was 17300:
print("Mean Squint Angle (degrees):", np.rad2deg(theta_sq))
f_a_min = 2 * V_r / wavelength * np.sin(theta_sq + bw/2)
f_a_max = 2 * V_r / wavelength * np.sin(theta_sq - bw/2)
print("f_a_min: ", f_a_min, "[Hz]")
print("f_a_max: ", f_a_max, "[Hz]")
print("Doppler Bandwidth (Hz):", np.abs(f_a_max - f_a_min))

In [None]:
#plot Doppler centroid bandwidth over Doppler spectrum
plt.figure(figsize=(20, 10), dpi=500)
plt.imshow(20 * np.log10(np.abs(np.fft.fftshift(data, axes=0))), vmin=60, vmax=100, aspect='auto', cmap='gray', interpolation='none', extent=[R[0], R[-1], f_a.max(), f_a.min()])

# Overlay Doppler centroid and bandwidth
if 'f_a_min' in locals() and 'f_a_max' in locals() and 'f_dc_mean' in locals():
    plt.axhline(f_dc_mean, color='r', linestyle='--', linewidth=1.5, label=r'Doppler Centroid')
    plt.axhline(f_a_min, color='g', linestyle='--', linewidth=1.5, label=r'$f_{a}$ Low')
    plt.axhline(f_a_max, color='b', linestyle='--', linewidth=1.5, label=r'$f_{a}$ High')
else:
    print("Doppler centroid and bandwidth variables not found. Please run the Doppler centroid estimation cell.")

plt.title('Range/Doppler Domain (After Azimuth FFT)', fontsize=24, fontweight='bold')
plt.xlabel('Slant Range [m]', fontsize=20)
plt.ylabel(r'$f_a$ [Hz]', fontsize=20)

cbar = plt.colorbar(label='Magnitude (dB)')
cbar.ax.tick_params(labelsize=16)

plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.legend(fontsize=16)

plt.tight_layout()
plt.show()

In [None]:
del iq_data  #free memory

# Effective FM Rate: #
- The effective FM chirp rate in range,
$$
K_s(f; r) = \frac{K}{1+\frac{(\frac{2\lambda K r}{c^2}(\frac{\lambda f}{2 V(r)})^2)}{{[1-\left(\frac{\lambda f}{2 V(r)}\right)^2]^{3/2}}}}
$$

- The range distortion factor,

$$
\alpha(f; r) = \frac{2\lambda\left(\frac{\lambda f}{2V(r)}\right)^2}{c^2[1-\left(\frac{\lambda f}{2V(r)}\right)^2]^{3/2}}
$$

- Range distortion is not a function of the FM rate but is a function of the geometry
- This factor, if not compensated leads to range defocus and has a strong dependence on Doppler frequency and a weak dependence on range

In [None]:
#corrected implementation for range distortion factor
def range_distortion_factor(freq, wavelength, V_r):
    numerator = 2 * wavelength * (wavelength * freq / (2 * V_r))**2
    denominator = c**2 * (1 - (wavelength * freq / (2 * V_r))**2)**(3/2)
    return numerator / denominator

#corrected implementation for effective chirp rate
def K_s(K_r, r_ref, freq, wavelength, V_r):
    alpha = range_distortion_factor(freq, wavelength, V_r)
    denominator = 1 + (K_r * r_ref * alpha)
    return K_r / denominator


#range_distortion_value = range_distortion_factor(f_a - f_dc_mean, wavelength, V_r).astype(np.float64)
#print("Range distortion factor = ",range_distortion_value)
#print("Maximum range distortion factor = ", max(abs(range_distortion_value)))
#K_s_value = K_s(K_r, r_ref, f_a - f_dc_mean, wavelength, V_r).astype(np.float64)
#print("Effective chirp rate (K_s) = ", K_s_value)
#print("Maximum effective chirp rate = ", max(abs(K_s_value)), "[Hz/s]")
#print("K_s Shape : ", K_s_value.shape)
#print("range_distortion_value Shape : ", range_distortion_value.shape)

fa_mask = (f_a >= f_a_max) & (f_a <= f_a_min) if f_a_max < f_a_min else (f_a >= f_a_min) & (f_a <= f_a_max)
range_distortion_value = np.zeros_like(f_a)
K_s_value = np.zeros_like(f_a)

range_distortion_value[fa_mask] = range_distortion_factor(f_a[fa_mask] - f_dc_mean, wavelength, V_r)
K_s_value[fa_mask] = K_s(K_r, r_ref, f_a[fa_mask] - f_dc_mean, wavelength, V_r)

In [None]:
plt.figure(figsize=(6,4))
plt.plot(f_a[fa_mask], range_distortion_value[fa_mask], 'b.')
plt.xlabel('$f$ [Hz]')
plt.ylabel(r'$\alpha(f)$')
plt.title('Range Distortion Factor')
plt.grid()

In [None]:
plt.figure(figsize=(6,4))
plt.plot(f_a[fa_mask], K_s_value[fa_mask], 'b.')
plt.xlabel('$f$ [Hz]')
plt.ylabel(r'$K_e(f)$')
plt.title('Effective Chirp Rate')
plt.grid()

In [None]:
#implement curvature factor in a function
def C_s(freq, wavelength, V_r):
    numerator = 1
    denominator = np.sqrt(1 - (wavelength * freq / (2 * V_r))**2)
    return (numerator / denominator) - 1

#C_s_dc_value = C_s(mean_doppler_centroid, wavelength, V_r).astype(np.float64)
C_s_value = np.zeros_like(f_a)
C_s_value[fa_mask] = C_s(f_a[fa_mask] - f_dc_mean, wavelength, V_r).astype(np.float64)
print("Range Curvature Factor : ", C_s_value)
print("Maximum range curvature factor : ", max(abs(C_s_value)))
print("C_s Shape : ", C_s_value.shape)

In [None]:
plt.figure(figsize=(6,4))
plt.plot(f_a[fa_mask], C_s_value[fa_mask], 'b.')
plt.xlabel('$f$ [Hz]')
plt.ylabel(r'$C(f)$')
plt.title('Range Curvature Factor')
plt.grid()

In [None]:
def tau_ref(r_ref, C_s, c=3e8):
    return (2 * r_ref / c)*(1 + C_s)

tau_ref_value = np.zeros_like(f_a)
tau_ref_value[fa_mask] = tau_ref(r_ref, C_s_value[fa_mask], c).astype(np.float64)
print("Time loci of the reference range : ", tau_ref_value, "[s]")
print("Maximum time locus of the reference range : ", max(abs(tau_ref_value)), "[s]")
print("Tau_ref Shape : ", tau_ref_value.shape)

In [None]:
plt.figure(figsize=(6,4))
plt.plot(f_a[fa_mask], tau_ref_value[fa_mask], 'b.')
plt.xlabel('Azimuth Frequency [Hz]')
plt.ylabel(r'$\tau_{ref}$ [s]')
plt.title('Time Loci of the Reference Range')
plt.grid()

In [None]:
#reference range curvature
r_ref_curvature = 0.5 * c * tau_ref_value
print("Reference range curvature: ", r_ref_curvature)

In [None]:
def phi_1(K_s, C_s, tau, tau_ref):
    return np.exp(-1j * np.pi * K_s * C_s * ((tau - tau_ref) ** 2))

print("Range times = ", tau)
print("Reference range times = ", tau_ref_value)
phi_1_value = np.zeros(data.shape, dtype=np.complex64)  # initialize phi_1_value array
print("Azimuth FFT shape = ", data.shape)

#fa_mask = (f_a >= f_a_max) & (f_a <= f_a_min) if f_a_max < f_a_min else (f_a >= f_a_min) & (f_a <= f_a_max)
#remember to use the same fa_mask in azimuth compression
a_count = 0 
for i in range(data.shape[0]):
    if fa_mask[i]:
        phi_1_value[i, :] = phi_1(K_s_value[i], C_s_value[i], tau, tau_ref_value[i])
        a_count += 1
    else:
        phi_1_value[i, :] = 0.0

print("First phase term = ", phi_1_value)
print("First phase term shape = ", phi_1_value.shape)
print("Number of valid azimuth lines = ", a_count)

# (Optional) Troubleshooting Plot: #
Plots the first phase term.

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(np.angle(np.fft.fftshift(phi_1_value, axes=0)), aspect='auto', cmap='hsv', interpolation='none', extent=[R[0], R[-1], f_a.max(), f_a.min()])
#plt.axvline(x=R[R.shape[0]//2],color='r',linestyle='--',label='reference range')
#plt.axhline(y=f_dc_mean, color='r', linestyle='--', label='Doppler centroid')
#plt.plot(r_ref_curvature, f_a, 'b.', label='reference range curvature')

#plt.title('First Phase Term', fontsize=24, fontweight='bold')
#plt.xlabel('Range [m]', fontsize=20)
#plt.ylabel('Azimuth Frequency [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Phase (radians)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)
#plt.legend()

#plt.tight_layout()
#plt.show()

In [None]:
data = np.multiply(data, phi_1_value).astype(np.complex64)

In [None]:
del range_distortion_value, phi_1_value, tau_ref_value

In [None]:
pad_width = int(np.ceil((t_p * fs)/2))

#always pad at the end of axis=1 (range dimension)
data = np.pad(data, ((0, 0), (0, pad_width)), mode='constant')
print("Data shape after padding:", data.shape)

data = np.fft.fft(data, axis=1).astype(np.complex64)

f_r = np.fft.fftfreq(N_rg + pad_width, d=1/fs)

print("Range FFT freq shape:", f_r.shape)
print("Range frequencies before shift:", f_r)

In [None]:
#data = np.fft.fft(data, axis=1).astype(np.complex64)
#f_r = np.fft.fftfreq(N_rg, d=1/fs)

In [None]:
def phi_2_1(f_tau, K_s, C_s):
    phase_correction = np.exp(1j * np.pi * (f_tau**2) / (K_s * (1 + C_s))) #range focusing/compression
    return phase_correction

def phi_2_2(f_tau, r_ref, C_s, c=3e8):
    linear_phase = np.exp(4j * (np.pi / c) * f_tau * (r_ref * C_s)) #RCMC
    return linear_phase

# Range Compression in Two-Dimensional Frequency Domain: #
This achieves range compression via a two-dimensional frequency-domain matched filter (first term in the exponential below).

$$
\Phi_2(f_\tau, f; r_{ref}) = \exp \left( -j\pi \frac{f_\tau^2}{K_s(f; f_{ref})[1 + C_s(f)]} \right) \cdot \exp \left( +j\frac{4\pi}{c} f_\tau r_{ref} C_s(f) \right)
$$


In [None]:
phi_2_1_value = np.zeros(data.shape, dtype=np.complex64)
fr_mask = np.abs(f_r) <= (K_r * t_p) / 2
hamming_window = np.hamming(np.sum(fr_mask))
window_idx = 0

for j, fr in enumerate(f_r):
    if not fr_mask[j]:
        continue
    taper = hamming_window[window_idx]
    for i in range(data.shape[0]):
        if fa_mask[i]:
            denom = K_s_value[i] * (1 + C_s_value[i])
            if denom != 0:
                phi_2_1_value[i, j] = taper * np.exp(1j * np.pi * (fr**2) / denom)
        else:
            phi_2_1_value[i, j] = 0.0
    window_idx += 1

# (Optional) Troubleshooting Plot:
This plots the range compression phase term.

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(np.angle(np.fft.fftshift(phi_2_1_value)), aspect='auto', cmap='hsv', interpolation='none', extent=[f_r.min()/1e6, f_r.max()/1e6, f_a.max(), f_a.min()])

#plt.title('Range Matched Filter', fontsize=24, fontweight='bold')
#plt.xlabel(r'$f_{r}$ [MHz]', fontsize=20)
#plt.ylabel(r'$f_{a}$ [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Phase (radians)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)

#plt.tight_layout()
#plt.show()

In [None]:
data = np.multiply(data, phi_2_1_value).astype(np.complex64)

In [None]:
del phi_2_1_value

# (Optional) Troubleshooting Plot:
This plots the data in the range/Doppler domain as a quality check.

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(20*np.log10(np.abs(np.fft.ifft(np.fft.fftshift(data), axis=1))), vmin=80, vmax=120, aspect='auto', cmap='gray', interpolation='none', extent=[R[0], R[-1], f_a.max(), f_a.min()])
#plt.axvline(x=R[R.shape[0]//2],color='r',linestyle='--',label='reference range')
#plt.plot(r_ref_curvature, f_a, 'b.', label='reference range curvature')

#plt.title('Range Compressed (Magnitude)', fontsize=24, fontweight='bold')
#plt.xlabel(r'Slant Range [m]', fontsize=20)
#plt.ylabel(r'$f_{a}$ [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Magnitude (dB)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)
#plt.legend()

#plt.tight_layout()
#plt.show()

# Range Cell Migration Correction in Two-Dimensional Frequency Domain: #
This achieves range cell migration correction in the two-dimensional frequency domain (second term in the exponential below).

$$
\Phi_2(f_\tau, f; r_{ref}) = \exp \left( -j\pi \frac{f_\tau^2}{K_s(f; f_{ref})[1 + C_s(f)]} \right) \cdot \exp \left( +j\frac{4\pi}{c} f_\tau r_{ref} C_s(f) \right)
$$


In [None]:
phi_2_2_value = np.zeros_like(data, dtype=np.complex64)

#apply boolean mask to include azimuth frequencies within bandwidth
#fa_mask = (f_a >= f_a_max) & (f_a <= f_a_min) if f_a_max < f_a_min else (f_a >= f_a_min) & (f_a <= f_a_max)
for i in range(data.shape[0]):
    if fa_mask[i]:
        phi_2_2_value[i, :] = phi_2_2(f_r, r_ref, C_s_value[i], c)
    else:
        phi_2_2_value[i, :] = 0.0  # Set to zero outside the valid bandwidth

# (Optional) Troubleshooting Plot:
This plots the the RCMC phase term.

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(np.angle(np.fft.fftshift(phi_2_2_value)), aspect='auto', cmap='hsv', interpolation='none', extent=[f_r.min(), f_r.max(), f_a.max(), f_a.min()])
#plt.axvline(x=R[R.shape[0]//2],color='r',linestyle='--', linewidth=1, label='reference range')
#plt.plot(r_ref_curvature, f_a, 'b--', linewidth=0.5, label='reference range curvature')

#plt.title('RCMC Phase Term (Phase)', fontsize=24, fontweight='bold')
#plt.xlabel(r'$f_{\tau}$ [MHz]', fontsize=20)
#plt.ylabel(r'$f$ [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Phase (radians)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)
#plt.legend()

#plt.tight_layout()
#plt.show()

In [None]:
data = np.multiply(data, phi_2_2_value).astype(np.complex64)

In [None]:
del phi_2_2_value

# (Optional) Troubleshooting Plot:
This plots the the data in the range/Doppler domain as a quality check.

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(20*np.log10(np.abs(np.fft.ifft(np.fft.fftshift(data), axis=1))), vmin=80, vmax=120, aspect='auto', cmap='gray', interpolation='none', extent=[R[0], R[-1], f_a.max(), f_a.min()])
#plt.axvline(x=R[R.shape[0]//2],color='r',linestyle='--',label='reference range')
#plt.plot(r_ref_curvature, f_a, 'b.', label='reference range curvature')

#plt.title('RCMC', fontsize=24, fontweight='bold')
#plt.xlabel(r'Slant Range [m]', fontsize=20)
#plt.ylabel(r'$f_{a}$ [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Magnitude (dB)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)
#plt.legend()

#plt.tight_layout()
#plt.show()

# Range Inverse Fourier Transform: #

In [None]:
data = np.fft.ifft(data, axis=1).astype(np.complex64)

data = data[:, :N_rg] #remove extra range padding

# Second-Order MOCO: #
Second-order motion compensation corrects for residual motion errors that remain after first-order correction, particularly those caused by range-dependent motion effects. While first-order MOCO assumes all targets experience the same range correction (using a single reference range), second-order MOCO compensates for each range cell experiencing a slightly different motion-induced path length error.

These higher-order terms arise from the curvature of the flight path and the variation of the line-of-sight geometry across the swath. In practice, second-order MOCO applies a range-dependent phase correction to each pixel or range bin, accounting for differing range deviations between the actual and reference trajectories.

# Calculating LOS2 (second-order moco) Phase Term: #
- LOS2 is line-of-sight phase term for 2nd-order MOCO. This is performed by the following:
$$
rlos2 = -\Delta y \cdot \cos{\gamma_{l_{r}}} + \Delta z \cdot \sin{\gamma_{l_{r}}}
$$

where,

$$
\gamma_{l_{r}} = \arcsin{\frac{h}{r}}
$$

In [None]:
def second_order_moco(delta_y, delta_z, rlos1, lam, h, R):
    uz = np.clip(h / R, -1.0, 1.0)                     #sinψ(r)
    uy = np.sqrt(np.maximum(0.0, 1.0 - uz**2))         #cosψ(r)

    #left-looking, y=left:
    rlos = -delta_y[:, None] * uy[None, :] + delta_z[:, None] * uz[None, :]
    rlos2 = rlos - rlos1[:, None]
    return np.exp(-1j * (4*np.pi / lam) * rlos2).astype(np.complex64), rlos2

In [None]:
#azimuth ifft because 2nd-order moco needs to be applied to 2-D time domain data
data = np.fft.ifft(data, axis=0).astype(np.complex64)
data = data[:N_az, :]  #remove extra azimuth padding
print("Data shape after azimuth IFFT: ", data.shape)

#2nd-order moco
H_mc2, rlos2 = second_order_moco(delta_y, delta_z, dR1, wavelength, H, R)
#apply 2nd-order moco
data = (data * H_mc2).astype(np.complex64)

#azimuth fft back to range/Doppler domain for azimuth compression
data = np.pad(data, ((0, azimuth_zero_pad_len), (0, 0)), mode='constant')
data = np.fft.fft(data, axis=0).astype(np.complex64)
print("Data shape after azimuth FFT: ", data.shape)
#f_a already calculated above

# Third Phase Term: #
- The phase residual term (range/Doppler phase multiplier) is given by:
$$
\Phi_3(\tau, f) = \exp \left( -j \frac{2\pi}{\lambda} c\tau \left[ 1 - \left[ 1 - \left( \frac{\lambda f}{2V(r = \tau c/2)} \right)^2 \right]^{1/2} \right] + j\Theta_D(f; r) \right)
$$

- Theta delta term:
$$
\Theta_D(f; r) = \frac{4\pi}{c^2} K_s(f; f_{ref}) \left[ 1 + C_s(f) \right] C_s(f) (r - r_{ref})^2
$$

In [None]:
def theta_D(K_s, C_s, r, r_ref, c=3e8):
    return (4 * np.pi / (c**2)) * K_s * (1 + C_s) * C_s * ((r - r_ref)**2)

def phi_3(tau, freq, wavelength, V_r, theta_D_val):
    return np.exp(
        -1j * (2 * np.pi / wavelength) * c * tau *
        (1 - np.sqrt(1 - (((wavelength * freq) / (2 * V_r))**2)))
        + (1j * theta_D_val)
    )

print("Range time = ", tau, "[s]")
phi_3_value = np.zeros((data.shape[0], data.shape[1]), dtype=np.complex64)
theta_D_value = np.zeros((data.shape[0], data.shape[1]), dtype=np.float64)
print("Azimuth frequencies = ", f_a)

#create boolean mask for valid Doppler frequencies
fa_mask = (f_a >= f_a_max) & (f_a <= f_a_min) if f_a_max < f_a_min else (f_a >= f_a_min) & (f_a <= f_a_max)

#construct full-length taper with zeros outside valid region
azimuth_window = np.zeros_like(f_a)
azimuth_window[fa_mask] = np.hamming(np.sum(fa_mask))

for i in range(data.shape[0]):
    taper = azimuth_window[i]
    if taper > 0:
        theta_D_value[i, :] = theta_D(K_s_value[i], C_s_value[i], R, r_ref)
        phi_3_value[i, :] = taper * phi_3(tau, f_a[i] - f_dc_mean, wavelength, V_r, theta_D_value[i, :])
    else:
        theta_D_value[i, :] = 0.0
        phi_3_value[i, :] = 0.0

print("theta_D = ", theta_D_value)
print("phi_3 = ", phi_3_value)

# (Optional) Troubleshooting Plot:
The plots below represent the residual compensation term as a result of the chirp scaling operation (first phase term) and the phase of the azimuth compression term.

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(np.fft.fftshift(theta_D_value, axes=0), aspect='auto', cmap='hsv', interpolation='none', extent=[R[0], R[-1], f_a.max(), f_a.min()])

#plt.title('Residual Phase Term (Phase)', fontsize=24, fontweight='bold')
#plt.xlabel(r'Slant Range [m]', fontsize=20)
#plt.ylabel(r'$f$ [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Phase (radians)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)

#plt.tight_layout()
#plt.show()

In [None]:
#plt.figure(figsize=(20,10), dpi=500)
#plt.imshow(np.angle(np.fft.fftshift(phi_3_value, axes=0)), aspect='auto', cmap='hsv', interpolation='none', extent=[R[0], R[-1], f_a.max(), f_a.min()])

#plt.title('Azimuth Compression Phase Term (Phase)', fontsize=24, fontweight='bold')
#plt.xlabel(r'Slant Range [m]', fontsize=20)
#plt.ylabel(r'$f$ [Hz]', fontsize=20)

#cbar = plt.colorbar(label='Phase (radians)')
#cbar.ax.tick_params(labelsize=16)

#plt.xticks(fontsize=16)
#plt.yticks(fontsize=16)

#plt.tight_layout()
#plt.show()

In [None]:
#multiply range ifft by phi_3
data = np.multiply(data, phi_3_value).astype(np.complex64)

In [None]:
del phi_3_value, theta_D_value

# Azimuth Inverse Fourier Transform: #

In [None]:
azm_ifft_output = np.fft.ifft(data, axis=0).astype(np.complex64)
azm_ifft_output = azm_ifft_output[:N_az, :]  #remove extra azimuth padding

In [None]:
del data

In [None]:
end_time = datetime.now()
print(f"Processing completed in {end_time - start_time} seconds.")
print("Script finished successfully.")

In [None]:
along_track_distance =  times * V_r

In [None]:
plt.figure(figsize=(25, 5), dpi=500)
ax = plt.gca()  #get the current axis
im = ax.imshow(20*np.log10(np.abs(azm_ifft_output)), vmin=20, vmax=70, aspect='equal', cmap='gray', interpolation='none', extent=[R[0], R[-1], along_track_distance[-1], 0])

#add title and labels
plt.title(f'CSA Image ({f0 / 1e9} GHz, Files {start_idx} to {stop_idx} from Flightline: 20250205T235149)', fontsize=12, fontweight='bold')
plt.xlabel('Slant Range [m]', fontsize=16)
plt.ylabel('Along-Track Distance [m]', fontsize=16)

#create an axis on the right side of the current axis for the color bar
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="1%", pad=0.05)

#create the color bar and make it the same height as the image
cbar = plt.colorbar(im, cax=cax)
cbar.ax.tick_params(labelsize=16)
cbar.set_label('Magnitude (dB)', fontsize=16)

#set the font size for ticks
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.savefig(f'csa_image_20250205T235149_files_{start_idx}_to_{stop_idx}_1st2ndMOCO.png', dpi=1000)

plt.show()

# Corner Reflector Analysis: #

In [None]:
from scipy.ndimage import maximum_filter

#magnitude and a small “window” to look for maxima
mag = np.abs(azm_ifft_output)
footprint = np.ones((15,15))    # 15×15-pixel neighborhood

#find pixels that equal the local max in their 15×15 window
local_max = (mag == maximum_filter(mag, footprint=footprint))

#threshold these peaks to ignore speckle
thr = np.percentile(mag, 99.99975)  #top 0.1% amplitudes
candidates = np.where(local_max & (mag >= thr))
cr_rows, cr_cols = candidates

print("Found", len(cr_rows), "reflector candidates")

In [None]:
plt.figure(figsize=(10,6))
plt.imshow(20*np.log10(mag), cmap='gray', vmin=20, vmax=70)
plt.scatter(cr_cols, cr_rows, s=30, facecolors='none', edgecolors='yellow')
plt.title("Local‐maxima corner‐reflector picks")
plt.show()

In [None]:
#slant-range in meters:
r_axis = np.arange(azm_ifft_output.shape[1]) * (c/(2*fs))
#along-track in meters (centered on each CR later):
t_axis = np.arange(azm_ifft_output.shape[0]) / prf
v_avg  = np.mean(velocity)  # or your known platform speed

#plot cuts for each CR
img_db = 20*np.log10(mag)

for idx, (r, c) in enumerate(zip(cr_rows, cr_cols)):
    range_profile = img_db[r, :] #range cut through CR
    az_profile    = img_db[:, c] #azimuth cut through CR
    az_axis = (t_axis - t_axis[r]) * v_avg #zero-center the along-track axis at the CR

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,4), dpi=300)

    #range cuts
    ax1.plot(r_axis, range_profile, '-k')
    ax1.axvline(r_axis[c], color='r', linestyle='--')
    ax1.set_xlabel("Slant‐Range (m)")
    ax1.set_ylabel("Amplitude (dB)")
    ax1.set_title(f"CR#{idx+1} Range Cut")

    #azimuth cuts
    ax2.plot(az_axis, az_profile, '-k')
    ax2.axvline(0, color='r', linestyle='--')
    ax2.set_xlabel("Along‐Track Offset (m)")
    ax2.set_ylabel("Amplitude (dB)")
    ax2.set_title(f"CR#{idx+1} Azimuth Cut")

    plt.tight_layout()
    plt.show()

In [None]:
def compute_3db_beamwidth(x_axis, profile_db):
    """Given an axis x_axis (meters) and a profile in dB,
    returns (half_power_level, left_idx, right_idx, beamwidth_m)"""
    peak_idx   = np.argmax(profile_db)
    peak_level = profile_db[peak_idx]
    half_power = peak_level - 3.0

    #find left crossing
    left_candidates = np.where(profile_db[:peak_idx] <= half_power)[0]
    left_idx = left_candidates[-1] if left_candidates.size else 0

    #find right crossing
    right_candidates = np.where(profile_db[peak_idx:] <= half_power)[0]
    right_idx = peak_idx + right_candidates[0] if right_candidates.size else len(profile_db)-1

    bw = x_axis[right_idx] - x_axis[left_idx]
    return half_power, left_idx, right_idx, bw

for idx, (r, c) in enumerate(zip(cr_rows, cr_cols)):
    az_profile = img_db[:, c]
    az_axis    = (t_axis - t_axis[r]) * v_avg

    #compute 3 dB points
    hp_lvl, iL, iR, bw = compute_3db_beamwidth(az_axis, az_profile)

    fig, ax = plt.subplots(figsize=(6,4), dpi=300)

    ax.plot(az_axis, az_profile, '-k')
    ax.axhline(hp_lvl, color='C1', linestyle='--',
               label=f'–3 dB @ {hp_lvl:.1f} dB')
    ax.axvline( az_axis[iL], color='C1', linestyle=':')
    ax.axvline( az_axis[iR], color='C1', linestyle=':')
    ax.text(0, hp_lvl-1, f'BW₃dB = {bw:.1f} m',
            color='C1', ha='center', va='top',
            bbox=dict(facecolor='white', alpha=0.6, edgecolor='none'))

    ax.set_xlabel("Along‐Track Offset (m)")
    ax.set_ylabel("Amplitude (dB)")
    ax.set_title(f"CR#{idx+1} Azimuth Cut & 3 dB Beamwidth")
    ax.legend(loc='upper right')
    plt.tight_layout()
    plt.show()