In [3]:
import numpy as np
import math
import matplotlib as mpl
import matplotlib.pyplot as plt
plt.style.use('seaborn-deep')
from mpl_toolkits import mplot3d as mpl3d
from scipy import signal, stats, linalg, fft

# import our 'specsens' package with the help of 'sys'
import sys
sys.path.insert(0, '..')
import specsens as ss

In [14]:
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('seaborn-deep')
import multiprocessing as mp
import tqdm
from functools import partial

from specsens import util
from specsens import util_sim
from specsens import WirelessMicrophone
from specsens import WhiteGaussianNoise
from specsens import Stft
from specsens import WidebandEnergyDetector

def generation(f_sample, length_sec, itrs, noise_power, signal_power,
               noise_uncert, window, fft_len, f_center, num_bands,
               band_noise_est, seeds):

    # create new signal objects
    wm = WirelessMicrophone(f_sample=f_sample, t_sec=length_sec, seed=seeds[0])
    wgn = WhiteGaussianNoise(f_sample=f_sample,
                             t_sec=length_sec,
                             seed=seeds[1])

    # local rng
    rng = np.random.default_rng(seeds[2])

    # calculate noise power with uncertainty
    gen_noise_power = rng.normal(loc=noise_power, scale=noise_uncert)

    # list for classic noise estimation
    noise_est_list_classic = np.array([])

    # list for eigenvalue noise estimation
    noise_est_list_eigenvalue = np.array([])

    # 'inner' interations loop
    for _ in range(itrs):

        # generate signal
        sig = wm.soft(f_center=f_center, power=signal_power, dB=True)

        # generate noise
        noise = wgn.signal(power=gen_noise_power, dB=True)

        # create acutal signal
        both = sig + noise

        # create a Short Time Fourier Transform object
        sft = Stft(n=fft_len, window=window)

        # use the stft to transform the signal into the frequency domain
        f, psd = sft.stft(both, f_sample, normalized=False, dB=False)
        
        # create a Wideband Energy Detector object
        fed = WidebandEnergyDetector(num_bands=num_bands,
                                     f_sample=f_sample,
                                     fft_len=fft_len,
                                     freqs=f)

        # compute energy for all bands
        bands = fed.detect(psd)

        noise_est_classic = bands[band_noise_est] / (fft_len / num_bands)
        noise_est_classic = ss.util.dB_power(noise_est_classic) 
        
        mean, fit_hist, fit_kde, mle = ss.noise_est.estimate(
            both, int(f_sample * length_sec), l=50)
        noise_est_eigenvalue = fit_kde
        
#         print('classic ', noise_est_classic)
#         print('eigenval', noise_est_eigenvalue)

        # log estimates
        noise_est_list_classic = np.append(noise_est_list_classic, noise_est_classic)
        noise_est_list_eigenvalue = np.append(noise_est_list_eigenvalue, noise_est_eigenvalue)

    # calculate average estimate and return
    return np.mean(noise_est_list_classic), np.mean(noise_est_list_eigenvalue)


def estimation_sim(
    gens=50,  # generations, number of environments
    itrs=300,  # iterations, number of tests in each environment
    f_sample=1e6,  # in Hz
    signal_power=0.,  # in dB
    f_center=-1e5,  # signal center frequency
    noise_power=0.,  # in dB
    length_sec=None,  # length of each section in seconds
    num_samples=None,  # number of samples
    noise_uncert=0.0,  # standard deviation of the noise normal distribution
    seed=None,  # random seed used for rng
    num_procs=None,  # number of processes to run in parallel
    window='box',  # window used with fft
    fft_len=1024,  # samples used for fft
    num_bands=1,  # total number of bands
    band_noise_est=None):  # band to use for noise estimation

    # set number of processes used
    if num_procs is None:
        num_procs = mp.cpu_count()
    assert num_procs > 0, 'num_procs must be greater than 0'
    assert num_procs <= gens, 'num_procs must be less or equal to gens'

    # check and calculate length (in seconds and number of samples)
    if num_samples is not None:
        assert num_samples > 0., 'num_samples must be greater than 0'
        length_sec = num_samples / f_sample
    elif length_sec is not None:
        assert length_sec > 0., 'length_sec must be greater than 0'
        length_sec = length_sec
        num_samples = int(f_sample * length_sec)
    else:
        assert False, 'either num_samples or length_sec needed'

    print('---- Simulation parameters ----')
    print('Generations:    %d' % (gens))
    print('Iterations:     %d' % (itrs))
    print('Total iters:    %d' % (gens * itrs))
    print('Signal power:   %.2f dB' % (signal_power))
    print('Sig cent. freq: %.1f Hz' % (f_center))
    print('Noise power:    %.2f dB' % (noise_power))
    print('Noise uncert:   %.2f dB' % (noise_uncert))
    print('SNR:            %.2f dB' % (signal_power - noise_power))
    print('Signal length:  %.6f s' % (length_sec))
    print('Signal samples: %d' % (num_samples))
    print('FFT length:     %d' % (fft_len))
    print('Num. of bands:  %d' % (num_bands))
    print('Band noise est: %d' % (band_noise_est))

    print('---- Running simulation ----')
    print('Using %d processes on %d cores' % (num_procs, mp.cpu_count()))

    # generate child seeds for wm and wgn
    seed_seq = np.random.SeedSequence(seed)
    seeds = list(
        zip(seed_seq.spawn(gens), seed_seq.spawn(gens), seed_seq.spawn(gens)))

    # prepare parallel execution
    p = mp.Pool(processes=num_procs)
    f = partial(generation, f_sample, length_sec, itrs, noise_power,
                signal_power, noise_uncert, window, fft_len, f_center,
                num_bands, band_noise_est)

    # run simulation while showing progress bar
    res = list(tqdm.tqdm(p.imap(f, seeds), total=gens))

    # cleanup parallel execution
    p.close()
    p.join()

    # 'unwrap' res tuples
    est_classic_list = [r[0] for r in res]
    est_eigenvalue_list = [r[1] for r in res]
    
    est_classic = np.mean(est_classic_list)
    est_eigenvalue = np.mean(est_eigenvalue_list)

    print('---- Simulation stats ----')
    print('Actual:   %.4f dB' % (noise_power))
    print('Classic:  %.4f dB  err: %.4f dB' %
      (est_classic, ss.util.dB_rel_err(noise_power, est_classic)))
    print('Eigenval: %.4f dB  err: %.4f dB' %
      (est_eigenvalue, ss.util.dB_rel_err(noise_power, est_eigenvalue)))

    return None

_ = estimation_sim(
    gens=100,  # generations, number of environments
    itrs=10,  # iterations, number of tests in each environment
    f_sample=1e6,  # in Hz
    signal_power=0.0,  # in dB
    noise_power=15.0,  # in dB
    length_sec=0.008192,  # length of each sample in seconds
    noise_uncert=0.0,  # standard deviation of the noise normal distribution in dB
    num_procs=None,  # number of processes to run in parallel
    seed=None,  # set seed for reproducibility
    window='box',  # window used with fft
    fft_len=8192,  # samples used for fft
    num_bands=4,  # total number of bands
    f_center=-1.5e5,  # signal center frequency
    band_noise_est=3)  # band to use for noise estimation


---- Simulation parameters ----
Generations:    100
Iterations:     10
Total iters:    1000
Signal power:   0.00 dB
Sig cent. freq: -150000.0 Hz
Noise power:    15.00 dB
Noise uncert:   0.00 dB
SNR:            -15.00 dB
Signal length:  0.008192 s
Signal samples: 8192
FFT length:     8192
Num. of bands:  4
Band noise est: 3
---- Running simulation ----
Using 8 processes on 8 cores


100%|██████████| 100/100 [08:15<00:00,  4.95s/it]

---- Simulation stats ----
Actual:   15.0000 dB
Classic:  14.9964 dB  err: -30.8633 dB
Eigenval: 15.0179 dB  err: -23.8592 dB



