In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from dask.distributed import Client, LocalCluster
client = Client(n_workers=1,
                threads_per_worker=4,
                memory_limit='15GB')
client

In [None]:
import copy
import sys
import xarray as xr
import numpy as np
import dask.array as da
import time
import os

import dask

import matplotlib.pyplot as plt
import hvplot.xarray
import holoviews as hv
import scipy.constants
import scipy

sys.path.append("../..")
import processing_dask as pr
import plot_dask

sys.path.append("../../../preprocessing/")
from generate_chirp import generate_chirp

In [None]:
prefix = "/media/thomas/Extreme SSD/orca_paper_data_files/phase_noise/b205/20240222_203345"

zero_sample_idx = 159
sig_speed = scipy.constants.speed_of_light * (2/3)

zarr_path = pr.save_radar_data_to_zarr(prefix, zarr_base_location="/home/thomas/Documents/StanfordGrad/RadioGlaciology/test_tmp_zarr_cache/", skip_if_cached=True)

zarr_path

In [None]:
raw = xr.open_zarr(zarr_path)
raw = raw[{'pulse_idx': slice(0, 50000)}]

In [None]:
chirp_ts, chirp = generate_chirp(raw.config)

compressed = pr.pulse_compress(raw, chirp,
                               fs=raw.config['GENERATE']['sample_rate'],
                               zero_sample_idx=zero_sample_idx,
                               signal_speed=sig_speed).persist()

In [None]:
ts = np.logspace(np.log10(250e-6), np.log10(2.5*60), 10)
#ts = np.logspace(np.log10(2e-2), np.log10(300), 20)
#ts = np.logspace(np.log10(2e-2), np.log10(1000), 20)

expected_reflector_distance_1way = 50 # m
reflector_peak_tol_bins = 2 # bins (on each side)
noise_start_distance_1way = 1000 # m

## Noise Floor Variance

In [None]:
actual_stack_t = np.nan * np.zeros_like(ts)
actual_stack_n = np.zeros_like(ts, dtype=int)

# Statistics to compute
stack_noise_var = np.nan * np.zeros_like(ts)
stack_noise_mean = np.nan * np.zeros_like(ts)
stack_signal_peak_pwr_mean = np.nan * np.zeros_like(ts)
stack_signal_peak_pwr_variance = np.nan * np.zeros_like(ts)
stack_signal_peak_phase = np.nan * np.zeros_like(ts)
stack_signal_peak_phase_variance = np.nan * np.zeros_like(ts)

In [None]:
for t_idx, t in enumerate(ts):
    if not np.isnan(stack_noise_mean[t_idx]):
        continue # Skip if already computed (in case of interruption and restart)
    
    timestamp = time.time() # Track computation time 

    actual_stack_n[t_idx] = max(1, int(t / raw.attrs['config']['CHIRP']['pulse_rep_int']))
    actual_stack_t[t_idx] = actual_stack_n[t_idx] * raw.attrs['config']['CHIRP']['pulse_rep_int'] # TODO: Account for errors?
    print(f"[{t_idx+1}/{len(ts)}] \tt={actual_stack_t[t_idx]} \tn_stack={actual_stack_n[t_idx]}")
    
    with dask.config.set(**{'array.slicing.split_large_chunks': False}):

        stacked = pr.stack(compressed, actual_stack_n[t_idx])
        compressed_pwr = xr.apply_ufunc(lambda x: np.abs(x)**2, stacked, dask='parallelized').chunk("auto")
        
        noise_pwr = compressed_pwr["radar_data"].where((compressed_pwr.reflection_distance > noise_start_distance_1way)).dropna('travel_time').chunk("auto")
        
        stack_noise_var[t_idx] = noise_pwr.var(dim="travel_time").mean().compute().item()
        stack_noise_mean[t_idx] = noise_pwr.mean().compute().item()

        # Compute the signal peak power and phase
        
        expected_peak_idx = (np.abs(stacked.reflection_distance - expected_reflector_distance_1way)).argmin().item()

        peak_idxs = stacked["radar_data"].reduce(
            lambda x, axis: (np.abs((x[:, expected_peak_idx-reflector_peak_tol_bins:expected_peak_idx+reflector_peak_tol_bins]))
                             ).argmax(axis=axis) + expected_peak_idx-reflector_peak_tol_bins, dim='travel_time').persist()
        
        true_peak_idx = peak_idxs[0].compute().item()
        if not (peak_idxs == true_peak_idx).all().compute().item():
            print("WARNING: Peak indices are not all the same!")

        peak_phases = xr.apply_ufunc(
            lambda x, idx: np.angle(x[idx]),
            stacked["radar_data"], peak_idxs,
            input_core_dims=[['travel_time'],[]], # The dimension operated over -- aka "don't vectorize over this"
            output_core_dims=[[]], # The output dimensions of the lambda function itself
            exclude_dims=set(("travel_time",)), # Dimensions to not vectorize over
            vectorize=True, # Vectorize other dimensions using a call to np.vectorize
            dask="parallelized", # Allow dask to chunk and parallelize the computation
            output_dtypes=[np.float32], # Needed for dask: explicitly provide the output dtype
            #dask_gufunc_kwargs={"output_sizes": {'travel_time': 1}} # Also needed for dask:
            # explicitly provide the output size of the lambda function. See
            # https://docs.dask.org/en/stable/generated/dask.array.gufunc.apply_gufunc.html
        ).persist()

        peak_pwr = xr.apply_ufunc(
            lambda x, idx: np.abs(x[idx])**2,
            stacked["radar_data"], peak_idxs,
            input_core_dims=[['travel_time'],[]], # The dimension operated over -- aka "don't vectorize over this"
            output_core_dims=[[]], # The output dimensions of the lambda function itself
            exclude_dims=set(("travel_time",)), # Dimensions to not vectorize over
            vectorize=True, # Vectorize other dimensions using a call to np.vectorize
            dask="parallelized", # Allow dask to chunk and parallelize the computation
            output_dtypes=[np.float32], # Needed for dask: explicitly provide the output dtype
            #dask_gufunc_kwargs={"output_sizes": {'travel_time': 1}} # Also needed for dask:
            # explicitly provide the output size of the lambda function. See
            # https://docs.dask.org/en/stable/generated/dask.array.gufunc.apply_gufunc.html
        ).persist()

        stack_signal_peak_pwr_mean[t_idx] = peak_pwr.mean().compute().item()
        stack_signal_peak_pwr_variance[t_idx] = peak_pwr.var().compute().item()
        stack_signal_peak_phase[t_idx] = peak_phases.mean().compute().item()
        stack_signal_peak_phase_variance[t_idx] = peak_phases.var().compute().item()
        
    
    print(f"Completed in {time.time() - timestamp} seconds from {len(noise_pwr)} stacked pulses")

In [None]:
fig, (ax_noise_var, ax_noise_mean, ax_signal_mag, ax_signal_ph, ax_signal_ph_var) = plt.subplots(5, 1, figsize=(10, 20))

ax_noise_var.scatter(actual_stack_n, stack_noise_var)
ax_noise_var.set_title("Noise Variance")
ax_noise_var.loglog()
ax_noise_var.grid()

ax_noise_mean.scatter(actual_stack_n, stack_noise_mean)
ax_noise_mean.set_title("Noise Mean")
ax_noise_mean.loglog()
ax_noise_mean.grid()

ax_signal_mag.scatter(actual_stack_n, stack_signal_peak_pwr)
ax_signal_mag.set_title("Signal Magnitude")
ax_signal_mag.loglog()
ax_signal_mag.grid()

ax_signal_ph.scatter(actual_stack_n, stack_signal_peak_phase)
ax_signal_ph.set_title("Signal Phase")
ax_signal_ph.semilogx()
ax_signal_ph.grid()

ax_signal_ph_var.scatter(actual_stack_n, stack_signal_peak_phase_variance)
ax_signal_ph_var.set_title("Signal Phase Variance")
ax_signal_ph_var.loglog()
ax_signal_ph_var.grid()



In [None]:
fig, axs = plt.subplots(2, 1, figsize=(10, 10))
ax_pwr, ax_ph = axs

ax_pwr.plot(actual_stack_t, stack_signal_peak_pwr_mean)
# Add a shaded region for the variance
ax_pwr.fill_between(actual_stack_t, stack_signal_peak_pwr_mean - np.sqrt(stack_signal_peak_pwr_variance),
                    stack_signal_peak_pwr_mean + np.sqrt(stack_signal_peak_pwr_variance), alpha=0.4)
ax_pwr.set_title("Signal Power")
ax_pwr.loglog()

ax_ph.plot(actual_stack_t, stack_signal_peak_phase)
# Add a shaded region for the variance
ax_ph.fill_between(actual_stack_t, stack_signal_peak_phase - np.sqrt(stack_signal_peak_phase_variance),
                    stack_signal_peak_phase + np.sqrt(stack_signal_peak_phase_variance), alpha=0.4)
ax_ph.set_title("Signal Phase")
ax_ph.semilogx()

for ax in axs:
    ax.grid(True)

plt.show()

In [None]:
fig, ax = plt.subplots()
ax.loglog()
ax.scatter(actual_stack_n, stack_noise_var)
ax.set_xlabel('n_stack')
ax.set_ylabel('Variance of noise floor (2-4km)')
ax.set_title(f"pulse_rep_int = {raw.attrs['config']['CHIRP']['pulse_rep_int']} s")
plt.grid()
#fig.savefig(output_base_stack + ".png")

## Signal peak phase

In [None]:
# Signal
reflector_distance_expected = 25
expected_peak_idx = (np.abs(compressed.reflection_distance - reflector_distance_expected)).argmin().item()

peak_idxs = compressed["radar_data"].reduce(
    lambda x, axis: (np.abs((x[:, expected_peak_idx-5:expected_peak_idx+5]))).argmax(axis=axis) + expected_peak_idx-5,
    dim='travel_time')
peak_idxs.persist()
true_peak_idx = peak_idxs[0].compute().item()
if not (peak_idxs == true_peak_idx).all().compute().item():
    print("WARNING: Peak indices are not all the same!")

In [None]:
expected_internal_path_idx = (np.abs(compressed.reflection_distance)).argmin().item()
expected_internal_path_idx

In [None]:
peak_phases = xr.apply_ufunc(
        lambda x, idx: np.angle(x[idx]),
        compressed["radar_data"], peak_idxs,
        input_core_dims=[['travel_time'],[]], # The dimension operated over -- aka "don't vectorize over this"
        output_core_dims=[[]], # The output dimensions of the lambda function itself
        exclude_dims=set(("travel_time",)), # Dimensions to not vectorize over
        vectorize=True, # Vectorize other dimensions using a call to np.vectorize
        dask="parallelized", # Allow dask to chunk and parallelize the computation
        output_dtypes=[np.float32], # Needed for dask: explicitly provide the output dtype
        #dask_gufunc_kwargs={"output_sizes": {'travel_time': 1}} # Also needed for dask:
        # explicitly provide the output size of the lambda function. See
        # https://docs.dask.org/en/stable/generated/dask.array.gufunc.apply_gufunc.html
    ).persist()

In [None]:
fs = raw.attrs['config']['GENERATE']['sample_rate']

actual_dt = np.zeros_like(ts)
var = np.zeros_like(ts)

for t_idx, t in enumerate(ts):
    print(f"[{t_idx}/{len(ts)}] \tt={t}")
    pulses = max(1, int(t / raw.attrs['config']['CHIRP']['pulse_rep_int']))
    actual_dt[t_idx] = pulses * raw.attrs['config']['CHIRP']['pulse_rep_int']
    ph_group_mean = peak_phases.rolling(pulse_idx=pulses).mean()
    var[t_idx] = ((ph_group_mean[:-pulses].drop_indexes("pulse_idx") - ph_group_mean[pulses:].drop_indexes("pulse_idx"))**2).mean().compute().item()

In [None]:
output_base_2svar = os.path.join("20230628-outputs/", raw.attrs["basename"]+"-2svar")

d = xr.Dataset({"var_2s": ("dt", var)}, coords={"dt": actual_dt})
d.to_netcdf(output_base_2svar + ".nc")

In [None]:
fig, ax = plt.subplots()
ax.loglog()
ax.scatter(actual_dt, var)
ax.set_xlabel('Time [s]')
ax.set_ylabel('Two sample phase variance')
ax.set_title(f"pulse_rep_int = {raw.attrs['config']['CHIRP']['pulse_rep_int']} s")
plt.grid()
fig.savefig(output_base_2svar + ".png")

In [None]:
output_base_phase = os.path.join("20230628-outputs/", raw.attrs["basename"]+"-phase")

peak_idx_plot = peak_idxs.hvplot.scatter(x='pulse_idx')
peak_phase_plot = peak_phases.hvplot.scatter(x='pulse_idx', datashade=True)
peak_phase_rolling_plot = peak_phases.rolling(pulse_idx=100).mean().hvplot.scatter(x='pulse_idx', datashade=True)

In [None]:
hv.save(peak_idx_plot, output_base_phase+"-peak-idx.png", fmt='png')
hv.save(peak_phase_plot, output_base_phase+"-peak-phase.png", fmt='png')
hv.save(peak_phase_rolling_plot, output_base_phase+"-peak-phase-rolling.png", fmt='png')

hv.save(peak_idx_plot, output_base_phase+"-peak-idx.html", fmt='widgets')
hv.save(peak_phase_plot, output_base_phase+"-peak-phase.html", fmt='widgets')
hv.save(peak_phase_rolling_plot, output_base_phase+"-peak-phase-rolling.html", fmt='widgets')

peak_idx_plot, peak_phase_plot, peak_phase_rolling_plot