In [1]:
import numpy as np
import pandas as pd
from numpy.fft import fft
import soundfile as sf
import os

import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio


### Helper

In [25]:
from typing import List, Tuple

def load_measurements(f: int, modulation: str, version: str = "records"):
  base_path = f"{version}/{modulation}/{round(int(f)*1e-3, 2)}kHz"
  
  paths = get_all_wav(base_path)
  paths = [path for path in map(
    lambda path: (int(path[0].lstrip("meas_").rstrip("deg")), path[1]), paths)]
  paths.sort(key=lambda path: path[0])

  angles = [angle for angle in map(lambda path: path[0], paths)]
  
  records = []
  for path in paths:
    records.append(sf.read(path[1])[0])

  return (np.array(records), np.array(angles))

def split_signals(signals: np.ndarray, noise_range: Tuple[int, int], signal_range: Tuple[int, int]):
  noise = []
  signal = []

  for rec in signals:
    noise.append(rec[noise_range[0]:noise_range[1]])
    signal.append(rec[signal_range[0]: signal_range[1]])

  return (np.array(signal), np.array(noise))

def get_all_wav(path: str) -> List[Tuple[str, str]]:
  files = os.listdir(path)

  file_paths = []
  for i, f in enumerate(files):
    name, ext = os.path.splitext(f)

    if ext != ".wav": continue

    file_paths.append((name, os.path.join(path, f)))
  
  return file_paths

def calc_fft(signals: np.ndarray, fs: int, N: int):
  vS = fft(signals, n=N*2, axis=1)
  vf = np.linspace(0, int(fs/2), N)

  vS = vS[:,0:N]

  return (vS, vf)

def moving_average(signals: np.ndarray, w: int):
  weights = np.ones(w)/w
  return np.array([sig for sig in map(lambda sig: np.convolve(sig, weights, "same"), signals)])

def get_idx_from_bandwidth(f: np.ndarray, bandwidth: Tuple[int, int]):
  try:
    i_min = np.where(f <= bandwidth[0])[0][-1]
  except IndexError:
    i_min = 0

  try:
    i_max = np.where(f >= bandwidth[1])[0][0]
  except IndexError:
    i_max = len(f) -1

  return (i_min, i_max)

def get_peak(S: np.ndarray, f: np.ndarray, bandwidth: Tuple[int, int]):
  i_min, i_max = get_idx_from_bandwidth(f, bandwidth)

  return np.max(S[:, i_min:i_max], 1)

def get_snr(S: np.ndarray, f: np.ndarray, bandwidth: Tuple[int, int]):
  i_min, i_max = get_idx_from_bandwidth(f, bandwidth)

  noise = np.sum(S[:, 0:i_min], 1) + np.sum(S[:, i_max::], 1)
  sig = np.sum(S[:, i_min:i_max], 1)

  return sig / noise


def get_signal_power(S: np.ndarray, f: np.ndarray, bandwidth: Tuple[int, int]):
  i_min, i_max = get_idx_from_bandwidth(f, bandwidth)
  
  return np.sum(S[:,i_min:i_max], 1)

def circular_plot(E: np.ndarray, phi: np.ndarray):
  E_dB = 20 * np.log10(E/np.max(E))

  fig = go.FigureWidget()
  fig.update_layout(
      autosize=False,
      width=800,
      height=500,
      margin=dict(
          l=40,
          r=30,
          b=40,
          t=30,
          pad=4
      ),
      polar=dict(radialaxis_range=[-30, 0])
  )

  fig.add_scatterpolar(theta=phi, r=E_dB)

def dB(E: np.ndarray):
  return 10 * np.log10(E)

# Load Measurements

In [26]:
#frequencies = [100, 400, 1.5e3, 10e3]
frequencies = [50, 100, 200, 400, 600, 800, 1000, 1200, 1500, 2000, 3000, 4000, 5000, 8000, 10000, 15000, 20000]

fs = 44100
total_duration = 10.5

N = 50000

signal_split = (4, 10)
noise_split = (0, 2)

vt_sig = np.linspace(0, signal_split[1] - signal_split[0], (signal_split[1] - signal_split[0]) * fs)
vt_noise = np.linspace(0, noise_split[1] - noise_split[0], (noise_split[1] - noise_split[0]) * fs)

### AM

In [27]:
am_signals = []
am_noise = []
am_signals_fft = []
am_filtered_signals_fft = []
am_noise_fft = []

am_phi = None
am_vf = None

for f in frequencies:
  records, am_phi = load_measurements(f, "AM")
  
  signals, noise = split_signals(records, np.multiply(noise_split, fs), np.multiply(signal_split, fs))
  am_signals.append(signals)
  am_noise.append(noise)

  signals_fft, am_vf = calc_fft(signals, fs, N)
  am_signals_fft.append(signals_fft)

  noise_fft, am_vf = calc_fft(noise, fs, N)
  am_noise_fft.append(noise_fft)

  am_filtered_signals_fft.append(signals_fft - noise_fft)

am_signals = np.array(am_signals)
am_noise = np.array(am_noise)
am_signals_fft = np.array(am_signals_fft)
am_filtered_signals_fft = np.array(am_filtered_signals_fft)
am_noise_fft = np.array(am_noise_fft)


### FM

In [11]:
fm_signals = []
fm_noise = []
fm_signals_fft = []
fm_filtered_signals_fft = []
fm_noise_fft = []

fm_phi = None
fm_vf = None

for f in frequencies:
  records, fm_phi = load_measurements(f, "FM", "recordsV2")

  signals, noise = split_signals(records, np.multiply(
      noise_split, fs), np.multiply(signal_split, fs))
  fm_signals.append(signals)
  fm_noise.append(noise)

  signals_fft, fm_vf = calc_fft(signals, fs, N)
  fm_signals_fft.append(signals_fft)

  noise_fft, fm_vf = calc_fft(noise, fs, N)
  fm_noise_fft.append(noise_fft)

  fm_filtered_signals_fft.append(signals_fft - noise_fft)

fm_signals = np.array(fm_signals)
fm_noise = np.array(fm_noise)
fm_signals_fft = np.array(fm_signals_fft)
fm_filtered_signals_fft = np.array(fm_filtered_signals_fft)
fm_noise_fft = np.array(fm_noise_fft)


# Check Measurements

In [23]:
plot_signals = am_signals[1]
plot_signals_fft = am_filtered_signals_fft[1]
plot_vf = am_vf
plot_phi = am_phi

### Single Signal - TIME

In [257]:
fig = px.line(x=vt_sig, y=plot_signals[0])
fig.show(renderer="browser")

### Single Signal - FREQUENCY

In [24]:
fft_fig = go.Figure()
fft_fig.add_trace(go.Scatter(x=plot_vf, y=dB(np.abs(plot_signals_fft[0])), mode="lines", name="Signal with Noise"))

fft_fig.update_layout(
    xaxis_title=r"$f \text{ in } Hz$",
    yaxis_title=r"$\text{Magnitude in } dB$",
    #    yaxis=dict(range=[0, 2**12]),
    xaxis=dict(range=(0, np.log10(fs/2)), type="log"),
)

pio.write_image(fft_fig, "beamforming_fm_4.0kHz.pdf", width=700, height=400)
fft_fig.show(renderer="browser")


invalid value encountered in log10



### All Signals with the same Frequency

In [330]:
fft_fig = go.Figure()

for idx, sig in enumerate(plot_signals_fft):
  fft_fig.add_trace(go.Scatter(x=plot_vf, y=dB(np.abs(
      sig)), mode="lines", name=r"$\varphi =" + str(plot_phi[idx]) + "^\circ$"))

fft_fig.update_layout(
    xaxis_title=r"$f \text{ in } Hz$",
    yaxis_title=r"$\text{Magnitude in } dB$",
    #    yaxis=dict(range=[0, 2**12]),
    xaxis=dict(type="log"),
)

fft_fig.show(renderer="browser")


### All Frequencies at the same angle

In [362]:
plot_signals_fft = fm_filtered_signals_fft[:,0,:]
plot_vf = fm_vf
plot_phi = fm_phi

In [363]:
fft_fig = go.Figure()

for idx, sig in enumerate(plot_signals_fft):
  fft_fig.add_trace(go.Scatter(x=plot_vf, y=dB(np.abs(
      sig)), mode="lines", name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + "kHz$"))

fft_fig.update_layout(
    xaxis_title=r"$f \text{ in } Hz$",
    yaxis_title=r"$\text{Magnitude in } dB$",
    #    yaxis=dict(range=[0, 2**12]),
    xaxis=dict(type="log"),
)

fft_fig.show(renderer="browser")


# Polar Plot

## AM

### Peaks

In [28]:
am_peaks = []

for idx, signal in enumerate(am_filtered_signals_fft):
  f_sig = frequencies[idx]
  bandwidth = [f_sig - 100, f_sig + 100]

  am_peaks.append(get_peak(np.abs(signal), am_vf, bandwidth))

am_peaks_norm = np.array(am_peaks) / np.max(am_peaks)


In [333]:
am_peaks = np.max(np.abs(am_filtered_signals_fft), axis=2)
am_peaks_norm = am_peaks / np.max(am_peaks)

In [29]:
am_polar_fig = go.Figure()
am_polar_fig.update_layout(
    polar=dict(radialaxis_range=[-35, 0])
)

for idx, peaks in enumerate(am_peaks_norm):
  am_polar_fig.add_scatterpolar(r=dB(peaks), theta=am_phi, name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + r"kHz$")

pio.write_image(am_polar_fig, "beamforming_am_polar.pdf", width=600, height=500)
am_polar_fig.show(renderer="browser")

### Power

In [259]:
am_power = []
for idx, signal in enumerate(am_filtered_signals_fft):
  f_sig = frequencies[idx]
  bandwidth = [f_sig - 1, f_sig + 1]

  am_power.append(get_signal_power(np.abs(signal), am_vf, bandwidth))

am_power_norm = np.array(am_power) / np.max(am_power)

In [260]:
am_polar_power_fig = go.Figure()
am_polar_power_fig.update_layout(
    #    polar=dict(radialaxis_range=[-30, 0])
)

for idx, power in enumerate(am_power_norm):
  am_polar_power_fig.add_scatterpolar(
      r=dB(power), theta=am_phi, name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + r"kHz$")

am_polar_power_fig.show(renderer="browser")


### SNR

In [261]:
am_snr = []
for idx, signal in enumerate(am_filtered_signals_fft):
  f_sig = frequencies[idx]
  bandwidth = [f_sig - 1, f_sig + 1]

  am_snr.append(get_snr(np.abs(signal), am_vf, bandwidth))

am_snr_norm = np.array(am_snr) / np.max(am_snr)


In [262]:
am_polar_snr_fig = go.Figure()
am_polar_snr_fig.update_layout(
    #    polar=dict(radialaxis_range=[-30, 0])
)

for idx, snr in enumerate(am_snr_norm):
  am_polar_snr_fig.add_scatterpolar(
      r=dB(snr), theta=am_phi, name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + r"kHz$")

am_polar_snr_fig.show(renderer="browser")


## FM

### Peaks

In [18]:
fm_peaks = []

for idx, signal in enumerate(fm_filtered_signals_fft):
  f_sig = frequencies[idx]
  bandwidth = [f_sig -100, f_sig + 100]

  fm_peaks.append(get_peak(np.abs(signal), fm_vf, bandwidth))

fm_peaks_norm = np.array(fm_peaks) / np.max(fm_peaks)


In [340]:
fm_peaks = np.max(np.abs(fm_filtered_signals_fft), axis=2)
fm_peaks_norm = fm_peaks / np.max(fm_peaks)


In [19]:
fm_polar_fig = go.Figure()
fm_polar_fig.update_layout(
    polar=dict(radialaxis_range=[-35, 0])
)

for idx, peaks in enumerate(fm_peaks_norm):
  fm_polar_fig.add_scatterpolar(
      r=dB(peaks), theta=fm_phi, name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + r"kHz$")

pio.write_image(fm_polar_fig, "beamforming_fm_polar.pdf", width=600, height=500)
fm_polar_fig.show(renderer="browser")


### Power

In [291]:
fm_power = []
for idx, signal in enumerate(fm_filtered_signals_fft):
  f_sig = frequencies[idx]
  bandwidth = [f_sig - 1, f_sig + 1]

  fm_power.append(get_signal_power(np.abs(signal), fm_vf, bandwidth))

fm_power_norm = np.array(fm_power) / np.max(fm_power)


In [292]:
fm_polar_power_fig = go.Figure()

for idx, power in enumerate(fm_power_norm):
  fm_polar_power_fig.add_scatterpolar(
      r=dB(power), theta=fm_phi, name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + r"kHz$")

fm_polar_power_fig.show(renderer="browser")


### SNR

In [294]:
fm_snr = []
for idx, signal in enumerate(fm_filtered_signals_fft):
  f_sig = frequencies[idx]
  bandwidth = [f_sig - 1, f_sig + 1]

  fm_snr.append(get_snr(np.abs(signal), fm_vf, bandwidth))

fm_snr_norm = np.array(fm_snr) / np.max(fm_snr)

In [295]:
fm_polar_snr_fig = go.Figure()
fm_polar_snr_fig.update_layout(
    #    polar=dict(radialaxis_range=[-30, 0])
)

for idx, snr in enumerate(fm_snr_norm):
  fm_polar_snr_fig.add_scatterpolar(
      r=dB(snr), theta=fm_phi, name=r"$f =" + str(round(frequencies[idx]*1e-3, 2)) + r"kHz$")

fm_polar_snr_fig.show(renderer="browser")
