  ## (3) Výběr částí extrahovaných nahrávek – cross-korelace

In [None]:
import numpy as np
import soundfile
from scipy import signal
import matplotlib.pyplot as plt
import IPython
import os

if not os.path.exists('plots'):
    os.makedirs('plots')
    
# Načte WAVky.
nomask_data, nomask_fs = soundfile.read('../audio/maskoff_tone.wav')
mask_data, mask_fs = soundfile.read('../audio/maskon_tone.wav')

# Vzorkovací frekvence obou souborů musí být 16 000 Hz.
assert nomask_fs == mask_fs == 16000

# Z nahrávky bez masky vyberu 1 s (16k vzorků) počínaje libovolným vzorkem.
nomask_start_sample = 16625
nomask_data = np.copy(nomask_data[nomask_start_sample:(nomask_start_sample+16000)])

# soundfile.read() vrací np pole hodnot float64, které jsou mezi -1.0 a 1.0, nemělo by být potřeba znovu je 
# ustředňovat a normalizovat, udělat to každopádně můžeme, předpokládám, že by to mělo vyjít stejně.
# zdroje: https://nbviewer.jupyter.org/github/zmolikova/ISS_project_study_phase/blob/master/Obecne_Python_tipy.ipynb
#         https://pysoundfile.readthedocs.io/en/latest/#soundfile.read
nomask_data -= np.mean(nomask_data)
nomask_data /= np.abs(nomask_data).max()

mask_data -= np.mean(mask_data)
mask_data /= np.abs(mask_data).max()

# Spočítáme korelaci a plotneme.
# Nejlepší shodu se signálem nomask má signál mask v části začínající indexem odpovídajícím indexu 
# položky v poli correlated s nejvyšší hodnotou.
correlated = np.correlate(mask_data, nomask_data, mode='valid')
max_corr_index = np.argmax(correlated)

fig, (nomask, mask, corr, res) = plt.subplots(4, 1, figsize=(15, 9))

nomask.set_title('Bez roušky (náhodně vybraný 1s úsek)')
nomask.set_xlabel('vzorek')
nomask.plot(np.arange(nomask_start_sample, nomask_start_sample+16000), nomask_data)

mask.set_title('S rouškou (celá nahrávka)')
mask.set_xlabel('vzorek')
mask.plot(mask_data)

corr.set_title('Korelace')
corr.plot(correlated)

res.set_title('Část signálu s rouškou s nejvyšší korelací')
res.plot(np.arange(max_corr_index, max_corr_index+16000), mask_data[max_corr_index:(max_corr_index+16000)])
res.set_xlabel('vzorek')

mask.axvline(x=max_corr_index, color='r')
corr.axvline(x=max_corr_index, color='r')

fig.tight_layout()
fig.show()

print('Počet hodnot korelace: ' + str(len(correlated)))
print('Počet vzorků signálu s rouškou: ' + str(len(mask_data)))
print('Nejvyšší korelace je na indexu: {}'.format(max_corr_index))

In [None]:
# Dále budeme pracovat už jen s nalezeným nejvíce odpovídajícím 1s úsekem
mask_data = np.copy(mask_data[max_corr_index:(max_corr_index+16000)])

# Provedeme ustřednění a normalizaci (protože jsme z původní nahrávky vzali jen část, je třeba udělat
# to znovu, i když už jsem to jednou udělal v předchozím kroku)
nomask_data -= np.mean(nomask_data)
nomask_data /= np.abs(nomask_data).max()

mask_data -= np.mean(mask_data)
mask_data /= np.abs(mask_data).max()

## (3) Rozdělení na rámce

In [None]:
# zdroj: https://superkogito.github.io/blog/SignalFraming.html
def framing(a, stride_length = 320, stride_step = 160):
    nrows = ((a.size - stride_length) // stride_step) + 1
    n = a.strides[0]
    return np.lib.stride_tricks.as_strided(a,
                                        shape=(nrows, stride_length),
                                        strides=(stride_step*n, n))
nomask_frames = framing(nomask_data)
mask_frames = framing(mask_data)

# Zobrazíme jeden libovolný rámec
show_frame_i = 98
show_frame_x = np.arange(show_frame_i * 160, show_frame_i * 160 + 320)

fig, (nomask, mask) = plt.subplots(2, 1, figsize=(10, 4), sharex=True)
nomask.set_title('Bez roušky, rámec ' + str(show_frame_i))
nomask.plot(show_frame_x, nomask_frames[show_frame_i])

mask.set_title('S rouškou, rámec ' + str(show_frame_i))
mask.set_xlabel('vzorek')
mask.plot(show_frame_x, mask_frames[show_frame_i])

fig.tight_layout()
fig.show()


## (4) Kontrola podobnosti základního tónu – autokorelace

In [None]:
def center_clip(sig, threshold=0.7):
    max_val = np.abs(sig).max() * threshold
    min_val = -max_val

    for i in range(0, len(sig)):
        sig[i] = 1 if sig[i] > max_val else (-1 if sig[i] < min_val else 0)

# Vlastní (a velmi neefektivní) implementace autokorelace
def auto_corr(sig):    
    #return np.correlate(sig, sig, mode='full')[len(sig)-1:]
    ret = np.zeros(len(sig))
    for k in range(0, len(sig)):
        shifted = np.append(np.roll(sig, -k)[0:len(sig)-k], np.zeros(k))
        ret[k] = np.sum(sig * shifted)
    
    return ret

# Pro analýzu si připravme kopie, ať nemodifikujeme původní data
nomask_frames_clipped = np.copy(nomask_frames)
mask_frames_clipped = np.copy(mask_frames)

nomask_lags = np.zeros(len(nomask_frames))
mask_lags = np.zeros(len(mask_frames))

# Center clipping (změní data ve zkopírovaných polích)
for i in range(0, len(nomask_frames)):
    center_clip(nomask_frames_clipped[i])
    center_clip(mask_frames_clipped[i])

# Plotneme libovolný rámec
fig, (frame, clipped, corr, ax) = plt.subplots(4, 1, figsize=(12, 10))
show_frame_i = 0

frame.set_title('Rámec ' + str(show_frame_i))
frame.grid(alpha=0.5, linestyle='--')
frame.plot(nomask_frames[show_frame_i])
t = np.abs(nomask_frames[show_frame_i]).max() * 0.7 # trochu redundantní
frame.axhline(y=t, color='r')
frame.axhline(y=-t, color='r')
frame.set_xlabel('vzorek')

clipped.set_title('Rámec po centrálním klipování')
clipped.plot(nomask_frames_clipped[show_frame_i])
clipped.grid(alpha=0.5, linestyle='--')
clipped.set_xlabel('vzorek')

# Číslo vzorku pro práh spočítáme jako vzorkovací frekvence / práh
threshold = int(16000/300)

# Autokorelace (vrací nová pole)
for i in range(0, len(nomask_frames)):
    nomask_frames_clipped[i] = auto_corr(nomask_frames_clipped[i])
    mask_frames_clipped[i] = auto_corr(mask_frames_clipped[i])
    # Max lags
    nomask_lags[i] = np.argmax(nomask_frames_clipped[i][threshold:]) + threshold
    mask_lags[i] = np.argmax(mask_frames_clipped[i][threshold:]) + threshold

# Maximální lag zvoleného rámce, který plotujeme
max_lag = int(nomask_lags[show_frame_i])
print('Hodnota lagu v rámci {}: {}'.format(show_frame_i, max_lag))

corr.set_title('Autokorelace')
corr.grid(alpha=0.5, linestyle='--')
corr.plot(nomask_frames_clipped[show_frame_i])
lag_stem = corr.stem([max_lag], [nomask_frames_clipped[show_frame_i][max_lag]], linefmt='r', markerfmt='bo')
thr_line = corr.axvline(x=threshold, color='k')
corr.legend([lag_stem, thr_line], ['Lag', 'Práh'])


# Základní frekvence = vzorkovací frekvence / lag
nomask_base_freqs = 16000/nomask_lags
mask_base_freqs = 16000/mask_lags

ax.set_title('Základní frekvence rámců')
ax.plot(nomask_base_freqs, label='Bez masky')
ax.plot(mask_base_freqs, label='S maskou')
ax.set_xticks(np.arange(0, len(nomask_base_freqs) + 5, 5))
ax.legend()
ax.grid(alpha=0.5, linestyle='--')
ax.set_xlabel('rámec')
fig.tight_layout()
fig.show()

# Spočítáme střední hodnoty a rozptyly
nomask_mean = np.mean(nomask_base_freqs)
nomask_dev = np.std(nomask_base_freqs)
mask_mean = np.mean(mask_base_freqs)
mask_dev = np.std(mask_base_freqs)

print('Bez masky:\n - střední hodnota: {} Hz\n - rozptyl: {} Hz²\nS maskou:\n - střední hodnota: {} Hz\n - rozptyl: {} Hz²'
         .format(nomask_mean, nomask_dev, mask_mean, mask_dev))

## (5) Spektrogramy

In [None]:
import time

def make_spectogram(fr, name, title):
    window = signal.windows.hann(320)
    fr = np.copy(fr) * window
    fr_dft = np.fft.fft(fr, n=1024)
    
    fr_log = 10*np.log10((fr_dft*np.conj(fr_dft)).real+1e-13)[:, 0:512]

    plt.figure(figsize=(15,6))

    times = np.arange(0.01, 0.98, 0.01)
    freqs = np.arange(0, 1024/2) / 1024 * 16000

    plt.pcolormesh(times, freqs, fr_log.T[:, 1:-1], shading='auto')#, vmin=-60, vmax=38)
    plt.yticks(np.append(np.array([0, 220, 440, 660]), np.arange(1000, 8000, 500)))
    plt.gca().set_title(title)
    plt.gca().set_xlabel('Čas [s]')
    plt.gca().set_ylabel('Frekvence [hz]')
    cbar = plt.colorbar()#ticks=np.arange(-60,37,10))

    plt.tight_layout()
    plt.show()
    
    plt.figure(figsize=(15,4))
    plt.gca().set_title('Spektrum rámce 0')
    plt.plot(freqs, fr_log[0])
    plt.show()
    return fr_dft
    
nomask_fr = make_spectogram(nomask_frames, 'spec_nomask', 'Spektogram bez roušky')
mask_fr = make_spectogram(mask_frames, 'spec_mask', 'Spektogram s rouškou')

### Okénková funkce

In [None]:
freqs = np.arange(0, 1024/2) / 1024 * 16000

fr_i = 98
fr_basic_dft = np.fft.fft(nomask_frames[fr_i], n=1024)
# Vytvoří okénkovou funkci na 320 vzorcích
win = signal.windows.hann(320)

_, (win_time, win_freq) = plt.subplots(1, 2, figsize=(15,4))

win_time.set_title('Časová oblast')
times = np.linspace(0, 0.99, 320)
win_time.plot(times, win)
win_time.grid(alpha=0.5, linestyle='--')
win_time.set_xlabel('Čas [s]')
win_time.set_ylabel('Koeficient')

win_freq.set_title('Spektrální oblast')
win_dft = np.fft.fft(win, n=1024)
win_log = 10*np.log10((win_dft*np.conj(win_dft)).real+1e-13)[0:512]
win_freq.plot(freqs, win_log)
win_freq.set_xlabel('Frekvence [Hz]')
win_freq.set_ylabel('Výkon [dB]')

win_freq.grid(alpha=0.5, linestyle='--')
plt.tight_layout()
plt.savefig('plots/window_func.pdf')
plt.show()

# Pronásobíme původní rámec okénkovou funkcí
frame_windowed = nomask_frames[fr_i] * win
# DFT pro plotnutí spektra
fr_windowed_dft = np.fft.fft(frame_windowed, n=1024) 

fr_basic_log = 10*np.log10((fr_basic_dft*np.conj(fr_basic_dft)).real+1e-13)[0:512]
fr_windowed_log = 10*np.log10((fr_windowed_dft*np.conj(fr_windowed_dft)).real+1e-13)[0:512]

fig = plt.figure(figsize=(15, 4))
plt.gca().set_title('Spektrum výkonu rámce {}'.format(fr_i))
plt.plot(freqs, fr_basic_log)
plt.plot(freqs, fr_windowed_log)
plt.legend(['Bez okénkové funkce', 'S okénkovou funkcí'])
plt.gca().set_xlabel('Frekvence [Hz]')
plt.gca().set_ylabel('Výkon [dB]')
plt.grid(alpha=0.5, linestyle='--')
plt.tight_layout()
plt.savefig('plots/window_func_frame_comp.pdf')
plt.show()

## Mimo zadání, test vytvoření filtru a jeho použití na jedné dvojici rámců

In [None]:
fr_i = 4

nm_orig = np.fft.fft(nomask_frames[fr_i], n=1024)
m_orig = np.fft.fft(mask_frames[fr_i], n=1024)

nm = nomask_fr[fr_i]
m = mask_fr[fr_i]

H_orig = m_orig/nm_orig
H = m/nm

_, ax = plt.subplots(3, 1, figsize=(14,7))

ax[0].plot(nomask_frames[fr_i])
ax[0].set_title('Bez masky (jeden rámec)')

ax[1].plot(mask_frames[fr_i])
ax[1].set_title('S maskou (jeden rámec)')

impulse_resp_orig = np.fft.ifft(H_orig)
impulse_resp = np.fft.ifft(H)

mask_sim_orig = signal.lfilter(impulse_resp_orig.real[0:512], [1.0], nomask_frames[fr_i])
mask_sim = signal.lfilter(impulse_resp.real[0:512], [1.0], nomask_frames[fr_i])

ax[2].plot(mask_sim_orig)
ax[2].plot(mask_sim)
ax[2].set_title('S maskou (simulace filtru)')
ax[2].legend(['Bez okénkové fce', 'S okénkovou fcí'])

for ax1 in ax:
    ax1.grid(alpha=0.5, linestyle='--')
    
plt.tight_layout()
plt.savefig('plots/window_func_simulation_comp.pdf')
plt.show()

## (6) Frekvenční charakteristika

In [None]:
H = mask_fr/nomask_fr # Najednou vypočítá podíly všech odpovídajících hodnot v rámcích

H_mean = np.mean(H, axis=0) # Spočítá průměry odpovídajících položek napříč rámci
H_log = 10*np.log10((H_mean*np.conj(H_mean)).real)
freqs = np.arange(0, 1024/2) / 1024 * 16000

smoothed = signal.savgol_filter(H_log[0:256], 255, 11)
print('Minimum na frekvenci {} Hz'.format(freqs[np.argmin(smoothed)]))

_, ax = plt.subplots(1, 1, figsize=(12,4))
ax.plot(freqs, H_log[0:512])
ax.set_title('Frekvenční charakteristika')
ax.set_xlabel('Frekvence [Hz]')
ax.set_ylabel('Výkon [dB]')
ax.grid(alpha=0.5, linestyle='--')

plt.tight_layout()
plt.show()

_, ax = plt.subplots(1, 1, figsize=(12,4))
ax.plot(freqs, H_log[0:512])
ax.plot(freqs[0:256], smoothed)
ax.axvline(x=freqs[np.argmin(smoothed)], color='r')

ax.set_title('Frekvenční charakteristika')
ax.set_xlabel('Frekvence [Hz]')
ax.set_ylabel('Výkon [dB]')
ax.grid(alpha=0.5, linestyle='--')

plt.tight_layout()
plt.show()

## (7) Impulsní odezva

In [None]:
def idft(X, n=1024):
    padded = np.pad(X, (0, n - len(X)), constant_values=((0, 0), ))
    # Výstupní pole
    ret = np.zeros(padded.size, dtype='complex128')
    
    # Předpočítání mocnin e pro všechny dvojice "k, n"
    es = np.zeros(n**2, dtype='complex128').reshape(n, n)
    for k in range(0, n):
        for na in range(0, n):
            es[k][na] = np.exp(2j*np.pi*k*na/n)    
    
    # Výpočet IDFT
    for k in range(0, n):
        ret[k] = np.sum(padded * es[k]) / n

    return ret

# IDFT vyjde v komplexních číslech, ale úhly budou vždycky buď 0, nebo pi,
# což značí pouhé otočení znaménka reálného čísla. Můžeme (musíme) s klidem vzít pouze reálné složky.
impulse_resp_a = np.fft.ifft(H_mean, n=1024)[0:512].real
impulse_resp = idft(H_mean)[0:512].real
print("Moje IDFT a numpy IFFT jsou shodné: " + str(np.allclose(impulse_resp_a, impulse_resp, atol=0)))

plt.figure(figsize=(12, 4))
plt.plot(impulse_resp)
plt.gca().set_title('Impulsní odezva')
plt.grid(alpha=0.5, linestyle='--')
plt.tight_layout()
plt.show()

## (8) Simulace roušky

In [None]:
nomask_sent_data, _ = soundfile.read('../audio/maskoff_sentence.wav')
mask_sent_data, fs = soundfile.read('../audio/maskon_sentence.wav')
nomask_data, _ = soundfile.read('../audio/maskoff_tone.wav')

mask_simulation = signal.lfilter(impulse_resp, [1.0], nomask_sent_data)
mask_simulation_tone = signal.lfilter(impulse_resp, [1.0], nomask_data)

fig, (nomask, mask, sim_mask, nomask_and_sim, mask_and_sim) = plt.subplots(5, 1, figsize=(14, 14))

nomask.set_title('Věta bez roušky')
nomask.plot(nomask_sent_data)
nomask.grid(alpha=0.5, linestyle='--')

mask.set_title('Věta s rouškou')
mask.plot(mask_sent_data)
mask.grid(alpha=0.5, linestyle='--')

sim_mask.set_title('Simulace věty s rouškou')
sim_mask.plot(mask_simulation)
sim_mask.grid(alpha=0.5, linestyle='--')

nomask_and_sim.set_title('Věta bez roušky a simulace')
nomask_and_sim.plot(nomask_sent_data)
nomask_and_sim.plot(mask_simulation)
nomask_and_sim.legend(['Bez roušky', 'Simulace'])
nomask_and_sim.grid(alpha=0.5, linestyle='--')

mask_and_sim.set_title('Věta s rouškou a simulace')
mask_and_sim.plot(nomask_sent_data)
mask_and_sim.plot(mask_sent_data, color='red')
mask_and_sim.plot(mask_simulation, color='orange')
mask_and_sim.legend(['Bez roušky', 'S rouškou', 'Simulace'])
mask_and_sim.grid(alpha=0.5, linestyle='--')

fig.tight_layout()
fig.show()

IPython.display.display(IPython.display.Audio(nomask_sent_data, rate=fs))
IPython.display.display(IPython.display.Audio(mask_sent_data, rate=fs))
IPython.display.display(IPython.display.Audio(mask_simulation, rate=fs))

IPython.display.display(IPython.display.Audio(nomask_data, rate=fs))
IPython.display.display(IPython.display.Audio(mask_simulation_tone, rate=fs))

soundfile.write('../audio/sim_maskon_sentence_window.wav', mask_simulation, fs)
soundfile.write('../audio/sim_maskon_tone_window.wav', mask_simulation_tone, fs)

In [None]:
# Pro zajímavost: různé korelace
nomask_sent_data -= np.mean(nomask_sent_data)
nomask_sent_data /= np.abs(nomask_sent_data).max()
mask_sent_data -= np.mean(mask_sent_data)
mask_sent_data /= np.abs(mask_sent_data).max()
mask_simulation -= np.mean(mask_simulation)
mask_simulation /= np.abs(mask_simulation).max()

sent_autocor = np.correlate(mask_sent_data, mask_sent_data, mode='full')
nom_cor = np.correlate(nomask_sent_data, mask_simulation, mode='full')
sim_cor = np.correlate(mask_sent_data, mask_simulation, mode='full')
recs_cor = np.correlate(mask_sent_data, nomask_sent_data, mode='full')

print('Autokorelace nahrávky s maskou: ' + str(np.max(np.abs(sent_autocor))))
print('Korelace simulace a nahrávky bez masky: ' + str(np.max(np.abs(nom_cor))))
print('Korelace simulace a nahrávky s maskou: ' + str(np.max(np.abs(sim_cor))))
print('Korelace nahrávky bez masky a nahrávky s maskou: ' + str(np.max(np.abs(recs_cor))))