# Content and Objective

+ Show estimation of psd w. ESPRIT
+ Method: Get noise, filtered noise and sinusoid, and perform psd estimation

In [None]:
# importing
from typing import List, Union
import numpy as np
from numpy.random import default_rng
from numpy.typing import ArrayLike
from scipy import signal
from enum import Enum, auto

import matplotlib.pyplot as plt
from shutil import which

# showing figures inline
%matplotlib inline

In [None]:
# plotting options
plt.rcParams.update({
    "font.size": 36,
    "text.usetex": bool(which("latex")),
    "figure.figsize": (30, 8)
})


# Helper Functions

### Functions for estimating spectra 

In [None]:
def find_periodogram(y: ArrayLike, omega: ArrayLike) -> ArrayLike:
    """
    estimates the periodogram out from the observation at given frequencies

    :param y observation
    :param omega frequencies
    :return psd estimator
    """
    N = len(y)
    per = sum(y[p] * np.exp(-1j * omega * (p + 1)) for p in range(N))
    per = (abs(per)**2) / N

    return per


def est_acf(y: ArrayLike, est_type: str) -> ArrayLike:
    """
    estimates the discrete autocorrelation function (vector) given an array of observation

    :param y observation
    :param est_type "biased" or "unbiased"
    :return estimated acf, centered around 0
    """

    N = np.size(y)
    r = np.zeros_like(y)

    # loop lags of acf
    for k in np.arange(0, N):

        temp = np.sum(y[k:N] * np.conjugate(y[0:(N - k)]))

        # type of estimator
        if est_type == 'biased':
            r[k] = temp / N
        elif est_type == 'unbiased':
            r[k] = temp / (N - k)

    # find values for negative indices
    r_reverse = np.conjugate(r[::-1])

    return np.append(r_reverse[0:len(r) - 1], r)


def find_esprit_estimate(y: ArrayLike, n: int, m: int,
                         omega: ArrayLike) -> ArrayLike:
    """
    estimates a pseudospectrum from the sampled signal at given frequencies using ESPRIT
    
    :param y observation
    :param n order
    :param m size of observation 
    :param omega frequencies
    :return PSD estimate
    """

    N = len(y)
    per = np.ones(len(omega), dtype=complex)

    # estimating the autocorrelation matrix
    R = np.zeros([m, m], dtype=complex)
    r = est_acf(y, 'biased')
    for p in np.arange(0, m):
        R[p, :] = r[N - 1 - p:N - 1 - p + m]

    # find eigendecomposition and assign signal space matrix S
    # second operation reverses order to have largest EV first
    Lambda_R, X_R = np.linalg.eig(R)
    indices = Lambda_R.argsort()[m - n:m][::-1]
    S = X_R[:, indices]

    # eliminating first and last row
    S1 = np.matrix(S[:S.shape[0] - 1, :])
    S2 = np.matrix(S[1:, :])

    # constructing matrix Phi = (S1^* S1)^-1 S1^* S2
    # and finding eigenvalues
    Phi = np.linalg.pinv(S1) * S2
    Lambda_Phi, X_Phi = np.linalg.eig(Phi)

    # frequencies and amplitudes as angle and abs of the eigenvalues
    freq_est = -np.angle(Lambda_Phi)
    ampl_est = np.abs(Lambda_Phi)

    # find denominator by multiplying contribution of each eigenvalue
    for p in np.arange(0, n):
        per *= (np.exp(1j * omega) - ampl_est[p] * np.exp(1j * freq_est[p]))

    return 1 / (np.abs(per)**2)


# Parameters

In [None]:
# number of samples and according time vector
N = int(1e2)
N_vec = np.arange(0, N)

# noise variance
sigma2 = 2

# number of realizations for averaging
N_real = int(1e2)

# number of freq. points and freq. range
N_freq = 512
Ome = np.linspace(-np.pi, np.pi, N_freq)

# Number of sinusoids in line spectrum
n = 3

# apply if noise should be filtered
filtered = False
noise_filter_taps = None
# activate parameter "filtered" in parameters if you like to see filtered noise
if filtered:
    # filter parameters
    cutoff_freq = 1.0 / 4.0
    ripple_db = 60  # ripples and transition width of the filter
    width = 1 / 5.0
    N_filter, beta = signal.kaiserord(
        ripple_db, width)  # find filter order and beta parameter
    noise_filter_taps = signal.firwin(N_filter,
                                      cutoff=cutoff_freq,
                                      window=('kaiser', beta))


### Different signals to be used

In [None]:
class SignalType(Enum):
    """
    Class for setting the noise type
    """
    WHITE_NOISE = 1
    AR_SPECTRUM = auto()
    # filtered noise from Kroschel's publication (requires a frequency)
    AR_KROSCHEL = auto()
    # Wideband noise from Kroschel's publication (requires a frequency)
    WIDEBAND_NOISE_KROSCHEL = auto()
    DUAL_SINES = auto()
    DUAL_SINUSOIDS = auto()
    MULTI_SINUSOIDS = auto()


def get_signal(signal_type: SignalType,
               N: int,
               sigma2: float,
               noise_filter_taps: Union[None, ArrayLike] = None,
               rng: Union[None, np.random.Generator] = default_rng(),
               type_specific_arguments=None) -> np.ndarray:
    '''
    Generate different kinds of noisy signals
    
    :param signal_type type of signal. See SignalType.
    :param N length of signal
    :sigma2 noise variance
    :noise_filter_taps for white noise, specify None (default). For filtered noise, specify filter taps.
    :rng the random number generator. Leave at default for true random values
    :type_specific_arguments Some generators require further parameters
    '''
    # make noise
    if rng is None:
        # make it reproducible; if you want different noise each run, pass
        # rng = default_rng() to the get_signal call
        rng = default_rng(42)
    noise = np.random.normal(scale=sigma2**0.5, size=N)
    N_vec = np.arange(0, N)

    # if noise filter taps were passed, use them to shape the noise
    if noise_filter_taps:
        noise = signal.lfilter(noise_filter_taps, 1.0, noise)
        noise /= np.linalg.norm(noise)

    ST = SignalType
    # different types of signal
    generators = {
        ST.WHITE_NOISE:
        lambda _: noise,
        ST.AR_SPECTRUM:
        lambda _: signal.lfilter(
            b=[1], a=[1, -2.2137, 2.9403, -2.1697, 0.9606], x=noise),
        ST.AR_KROSCHEL:
        lambda freq: signal.lfilter(b=[1],
                                    a=[1 - 1.372, 1.843, -1.238, .849],
                                    x=freq * sigma2**0.5 * np.arange(0, N)),
        ST.WIDEBAND_NOISE_KROSCHEL:
        lambda freq: signal.lfilter(b=[1, 0, 0, 0, -.5],
                                    a=[1],
                                    x=freq * sigma2**0.5 * np.arange(0, N)),
        ST.DUAL_SINES:
        lambda _: noise + sum(
            np.sin(freq * np.arange(0, N)) for freq in {1.0, 1.2}),
        ST.DUAL_SINUSOIDS:
        lambda _: sum(
            np.exp(1j * freq * np.arange(0, N))
            for freq in {1.0, 1.2}) + noise,
        ST.MULTI_SINUSOIDS:
        lambda freqs_ampls: sum(ampl * np.exp(1j * freq * np.arange(0, N))
                                for freq, ampl in freqs_ampls.items()) + noise
    }
    return generators[signal_type](type_specific_arguments)


# Loop for realizations

In [None]:
# initialize arrays
psd_per = np.empty([N_real, N_freq])
psd_esprit = np.empty([N_real, N_freq])
psd_esprit_plus_one = np.empty([N_real, N_freq])
if n > 1:
    psd_esprit_minus_one = np.empty([N_real, N_freq])

# loop for realizations
for _k in range(N_real):

    # generate signal
    if n == 2:
        y = get_signal(SignalType.DUAL_SINUSOIDS,
                       N,
                       sigma2,
                       noise_filter_taps=noise_filter_taps)
    # alternatively: make sinusoids that probably aren't too far apart in spectrum, and have narrowly distributed amplitude
    elif n > 2:
        gen = default_rng(n)
        freq = gen.exponential(scale=np.pi / 3, size=n) % np.pi - np.pi / 2
        ampl = gen.rayleigh(scale=1, size=n)
        y = get_signal(
            SignalType.MULTI_SINUSOIDS,
            N,
            sigma2,
            noise_filter_taps=noise_filter_taps,
            type_specific_arguments={f: a
                                     for f, a in zip(freq, ampl)})

    # find estimations
    psd_per[_k, :] = find_periodogram(y, Ome)

    # esprit estimate (correct parameter)
    psd_esprit[_k, :] = find_esprit_estimate(y, n, N, Ome)
    # esprit estimate (estimate one more tone than there is)
    psd_esprit_plus_one[_k, :] = find_esprit_estimate(y, n + 1, N, Ome)
    # esprit estimate (estimate one lesstone than there is)
    if n > 1:
        psd_esprit_minus_one[_k, :] = find_esprit_estimate(y, n - 1, N, Ome)


# Plotting

In [None]:
# plot psds
def plot(psd_realizations: List[ArrayLike],
         frequencies: ArrayLike,
         title="PSD estimate",
         xlabel="$\Omega$",
         ylabel="\Omega",
         ownfigure=True):
    if ownfigure:
        plt.figure()
    # averaging and finding variance
    psd_average = psd_realizations.mean(axis=0)
    max_average_db = 10 * np.log10(np.max(psd_average))
    min_average_db = 10 * np.log10(np.min(psd_average))

    for estimator_realization in psd_realizations:
        plt.plot(frequencies,
                 10 * np.log10(estimator_realization) - max_average_db,
                 color="black",
                 alpha=4.0 / N_real)
    plt.plot(frequencies, 10 * np.log10(psd_average) - max_average_db)
    plt.grid(True)
    plt.title(title)
    plt.axis([-np.pi, np.pi, min_average_db - max_average_db - 1, 1])
    plt.xlabel(xlabel)
    plt.ylabel(f"$\Phi_{ylabel}(\Omega)$ (dB)")


plot(psd_per, Ome, "Periodogram", ylabel="\mathrm{P}")
plot(psd_esprit, Ome, f"ESPRIT (correct order {n})", ylabel="\mathrm{esprit}")
plot(psd_esprit_plus_one, Ome, f"esprit (order {n+1})", ylabel="\mathrm{esprit}")
if n > 1:
    plot(psd_esprit_minus_one,
         Ome,
         f"esprit (order {n-1})",
         ylabel="\mathrm{ESPRIT}")
