# ZRE Projekt - 2023
#### Honza Černocký a Honza Pavlus, FIT VUT Brno, 2023
#### ` Celkem bodů 29`
---

Principem projektu oprava signálů narušených klipováním pomocí lineární predikce nebo jiných technik. Nachystali jsme 100 signálů z TIMITu tak, že máte k disposici originál i klipovanou verzi. Klipovaná verze simuluje ztrátu nejvyššího bitu. Signály a seznam viz https://www.fit.vutbr.cz/study/courses/ZRE/public/2022-23/. 

Projekt doporučujeme řešit v Pythonu v Google Colab, ale je možné použít i jiný jazyk / prostředí, v tomto případě prosím zašlete protokol a zdrojáky emailem.  

---



In [None]:
# nacteni signalu a listu, at to nemusite hledat
import os

import librosa as librosa

if not os.path.exists('proj_zre_2023.tgz'):
    !wget https://www.fit.vutbr.cz/study/courses/ZRE/public/2022-23/proj_zre_2023.tgz
    !tar xfz proj_zre_2023.tgz
!ls

# make graphs interactive
!pip install mpld3
%matplotlib inline
import scipy
import mpld3
mpld3.enable_notebook()

## Úloha 1 - Detektor klipovaných signálů - `2 body `

Navrhněte a naprogramujte detektor klipovaných částí signálu. Pozor, klipované mohou být jak kladné, tak záporné vzorky. Zobrazte několik příkladů těchto klipovaných částí v časové a ve frekvenční oblasti (spektrogram). 

---

### Vaše řešení:

Popište vaše řešení

In [None]:
import numpy as np
import scipy.io.wavfile as wav
import matplotlib.pyplot as plt
import librosa

Fs = 16000
# load files
def load_dir(path):
    folder, _, files = next(os.walk(path))
    #signals, srs = zip(*[librosa.load(os.path.join(folder, f),sr=None) for f in files])
    srs, signals = zip(*[wav.read(os.path.join(folder, f)) for f in files])
    #signals = [s.astype(np.float32) / 2**15 for s in signals]
    assert all(np.array(srs) == Fs)
    return tuple(np.array(s - np.mean(np.array(s))) for s in signals)

signals_clp = load_dir('clipped_signals')
signals_orig = load_dir('orig_signals')

def detect_clipped(signal):
    # try to detect threshold
    mx = np.max(signal)
    # return indices of clipped samples
    return np.array(np.where(np.abs(signal) >= mx))

def plot_clipped(signal, clipped_indices, spectr=False, title=None):
    # try to find part where clipping is "most" visible
    mid = np.median(clipped_indices).astype(int)
    # get frame around mid
    src = signal[mid-256:mid+256]
    clipped_src = clipped_indices - mid
    clipped_i = np.where(np.abs(clipped_src) < 256)[1]
    clipped_src = clipped_src[0][clipped_i] + 256

    # plot frame and it's spectrogram
    plt.figure(figsize=(20, 10))
    plt.subplot(2, 1, 1)
    plt.title(title)
    plt.plot(src)
    plt.scatter(clipped_src, src[clipped_src], c='r')
    plt.gca().axes.get_yaxis().set_label_text('Sample')
    if spectr:
        plt.subplot(2, 1, 2)
        plt.specgram(src, Fs=16000)
        plt.gca().axes.get_xaxis().set_visible(False)
        plt.gca().axes.get_yaxis().set_label_text('Hz')
    plt.show()

test_file_clp = signals_clp[0]
clipped = detect_clipped(test_file_clp)
plot_clipped(test_file_clp, clipped, spectr=True, title="Clipped signal")

## Úloha 2 - Výpočet SNR - `2 body `

Napište funkci pro výpočet poměru signálu k šumu, kde šum (jmenovatel) bude chyba (rozdíl mezi klipovaným signálem a originálem) a signál (čitatel) je originální signál. Spočítejte SNR způsobené klipováním pro všech 100 signálů. Berte je v úvahu celé, nejen klipované části.

---

### Vaše řešení:

$ SNR = 10 \cdot \log_{10} \left( \frac{\sum_{i=1}^{N} x_i^2}{\sum_{i=1}^{N} (x_i - y_i)^2} \right) $


Popište vaše řešení

In [None]:
def snr(orig, modified):
    return 10 * np.log10(np.sum(orig**2) / np.sum((orig - modified)**2))
test_file_orig = signals_orig[0]
test_file_clp = signals_clp[0]
print("sample SNR:", snr(test_file_orig, test_file_clp), "dB")



## Úloha 3 - Oprava pomocí predikce zleva - ` 5 bodů`

Napište funkci, která bude opravovat klipované části. Měla by pracovat tak, že na L=240 vzorcích signálu před klipovanou částí odhadne koeficienty prediktoru a ten využije k náhradě klipovaných vzorků predikcí z minulosti. Vyberte 2 opravené segmenty a zobrazte je. Připravte dvě promluvy k přehrání (pokud budete pracovat v Py notebooku, přímo v něm, pokud v něčem jiném, jako několik WAV souborů). Vyzkoušejte délky prediktoru od P=2 do P=20. Pro každou délku vyhodnoťte na všech 100 signálech SNR a spočítejte redukci SNR oproti původnímu stavu (SNRR - v procentech). 

---


### Vaše řešení:

Popište vaše řešení

In [None]:
def lpc(signal, ordr):
    #r = np.correlate(signal,signal, mode='full')
    #r = r[len(r)//2:len(r)//2 + ordr + 1]
    #a = np.linalg.inv(scipy.linalg.toeplitz(r[:-2])) @ -r[1:-1]
    a = librosa.lpc(signal, order=ordr-1)[1:]
    return a,np.hstack([1.0, -a])

def fix_LPC(signal, p=20, l=240):
    clips = detect_clipped(signal)
    repaired = signal.copy()
    for i in clips[0]:
        if i+l > len(repaired):
            break
        a,b = lpc(repaired[i - l:i], p)
        repaired[i] = scipy.signal.lfilter(-a, [1.0], repaired[i - l:i])[-1]
    return repaired


signals_leftLPC = []
for i in range(2): # TODO remove 2 len(signals_orig)
    print("\n File ", i, ":")
    clp,orig = signals_clp[i], signals_orig[i]
    snr_denom = snr(orig,clp)
    for p in range(2,21,18): # TODO remove 18 -> 1,2
        repr = fix_LPC(clp)
        snr_num = snr(orig,repr)
        print("P=",p," SNRR=",round((snr_num/snr_denom)*100,0),"%\t")
        if p == 20:
            signals_leftLPC.append(repr)



In [None]:
from IPython.display import Audio
cl1 = 0
#clipped = detect_clipped(signals_leftLPC[cl1])
#plot_clipped(signals_clp[cl1], clipped, title="Clipped signal")
#plot_clipped(signals_leftLPC[cl1], clipped,title="Repaired signal")

fix, ax = plt.subplots()
ax.plot(signals_orig[cl1], label="original")
ax.plot(signals_leftLPC[cl1], label="repaired")
ax.plot(signals_clp[cl1], label="clipped")
ax.plot(signals_leftLPC[cl1] - signals_clp[cl1], label="healed")
ax.legend()
plt.show()


Audio(signals_leftLPC[cl1], rate=Fs, normalize = False)

In [None]:
cl1 = 1

fix, ax = plt.subplots()
ax.plot(signals_orig[cl1], label="original")
ax.plot(signals_leftLPC[cl1], label="repaired")
ax.plot(signals_clp[cl1], label="clipped")
ax.plot(signals_leftLPC[cl1] - signals_clp[cl1], label="healed")
ax.legend()
plt.show()


Audio(signals_leftLPC[cl1], rate=Fs, normalize = False)

## Úloha 4 - Oprava pomocí predikce zprava - ` 4 body`

Zopakujte předchozí cvičení, ale tentokrát predikujte klipované vzorky zprava (tedy z budoucnosti). Ve výpočtech budete tedy muset obracet signály nebo použít nekauzální filtry. Udělejte to samé, co v minulé cvičení, tedy vzorky k zobrazení a poslechu, závislost SNR a SNRR na délce prediktoru.

### Vaše řešení:

Popište vaše řešení

In [None]:
# invert signal and predict
signals_rightLPC = []
for i in range(2): # TODO remove 2 len(signals_orig)
    clp,orig = signals_clp[i], signals_orig[i]
    clp,orig = clp[::-1], orig[::-1]
    snr_denom = snr(orig,clp)
    for p in range(2,21,18): # TODO remove 18 -> 1,2
        repr = fix_LPC(clp)
        snr_num = snr(orig,repr)
        print("P=",p," SNRR=",round((snr_num/snr_denom)*100,0),"%\t")
        if p == 20:
            signals_rightLPC.append(repr[::-1])

In [None]:
cl1 = 0

fix, ax = plt.subplots()
ax.plot(signals_orig[cl1], label="original")
ax.plot(signals_rightLPC[cl1], label="repaired")
ax.plot(signals_clp[cl1], label="clipped")
ax.plot(signals_rightLPC[cl1] - signals_clp[cl1], label="healed")
ax.legend()
plt.show()


Audio(signals_rightLPC[cl1], rate=Fs, normalize = False)

In [None]:
cl1 = 1

fix, ax = plt.subplots()
ax.plot(signals_orig[cl1], label="original")
ax.plot(signals_rightLPC[cl1], label="repaired")
ax.plot(signals_clp[cl1], label="clipped")
ax.plot(signals_rightLPC[cl1] - signals_clp[cl1], label="healed")
ax.legend()
plt.show()


Audio(signals_rightLPC[cl1], rate=Fs, normalize = False)

## Úloha 5 - Lineární kombinace obou predikcí - ` 4 body`

Pro klipované vzorky proveďte lineární kombinaci obou predikcí, můžete vyzkoušet různé interpolace (s konstantními váhami 0,5 a 0,5, s klesajícím oknem zleva a stoupajícím zprava, cokoliv jiného). Proveďte opět ty samé kroky jako v minulých cvičeních, tedy vzorky k zobrazení a poslechu, závislost SNR a SNRR na délce prediktoru. 

---


### Vaše řešení:

Popište vaše řešení

In [None]:
weight = 0.5
signals_combinedLPC = []
for i in range(2): # TODO remove 2 len(signals_orig)
    print("\n File ", i, ":")
    clp,orig = signals_clp[i], signals_orig[i]
    snr_denom = snr(orig,clp)
    for p in range(2,21,18): # TODO remove 18 -> 1,2
        left_lpc = fix_LPC(clp)
        right_lpc = fix_LPC(clp[::-1])[::-1]
        combined  = left_lpc * weight + right_lpc * (1-weight)
        snr_nom = snr(orig,combined)
        print("P=",p," SNRR=",round((snr_nom/snr_denom)*100,0),"%\t")
        #append if last
        if p == 20:
            signals_combinedLPC.append(combined)


In [None]:
cl1 = 0

fix, ax = plt.subplots()
ax.plot(signals_orig[cl1], label="original")
ax.plot(signals_combinedLPC[cl1], label="repaired")
ax.plot(signals_clp[cl1], label="clipped")
ax.plot(signals_combinedLPC[cl1] - signals_clp[cl1], label="healed")
ax.legend()
plt.show()


Audio(signals_combinedLPC[cl1], rate=Fs, normalize = False)

In [None]:
cl1 = 1

fix, ax = plt.subplots()
ax.plot(signals_orig[cl1], label="original")
ax.plot(signals_combinedLPC[cl1], label="repaired")
ax.plot(signals_clp[cl1], label="clipped")
ax.plot(signals_combinedLPC[cl1] - signals_clp[cl1], label="healed")
ax.legend()
plt.show()


Audio(signals_combinedLPC[cl1], rate=Fs, normalize = False)

## Úloha 6 - Cokoliv lepšího - ` 12 bodů`

Navrhněte a implementujte jakékoliv zlepšení rekonstrukce, možné příklady jsou: 

*   Optimalizace délky okna. 
*   Korektní práce s minulými či budoucími vzorky, pokud je za sebou více klipovaných částí (neměly by vstupovat do korelace pro výpočet R[k] a už vůbec ne do predikce vzorků)
*   Využití dynamické délky okna a dynamické délky prediktoru podle charakteru signálu. 
* Dlouhodobá predikce využívající základního tónu. 
* Analytické odvození, jak má predikce vzorků probíhat simultánně z levého či pravého kontextu nebo nalezení v literatuře a implementace. 
* Využití strojového učení (např. neuronová síť predikující vzorky)
* Cokoliv jiného  

Pro navrženou metodu proveďte opět ty samé kroky jako v minulých cvičeních, tedy vzorky k zobrazení a poslechu, a výpočet závislost SNR a SNRR. Komentujte výsledky. **Toto cvičení se bude obhajovat v pondělí 15. května**, nachystajte si prosím max. 10-minutovou presentaci - pár slajdů s popisem, co jste dělali, SNRR, ploty. 

---





### Vaše řešení:

Popište vaše řešení

In [None]:
def DCT_dictionary(frame_size=256, redundancy_factor=2):
    """
    DCT dictionary of size (frame_size)x(frame_size*redundancy_factor)
    """

    m = frame_size * redundancy_factor
    wa = np.hamming(frame_size) # analysis window TODO: don't

    t = np.arange(frame_size)  # time
    k = np.arange(m)  # frequency

    D = np.cos(np.pi/m * np.dot((t[np.newaxis, :].T+1/2), k[np.newaxis, :]+1/2))

    D = np.diag(wa).dot(D) # Analysis window
    D = D/np.sqrt(np.sum(D**2,0)) # normalize columns
    return	D

def consistentIHT(signal, clipped_samples_mat, D, iters = 100, non_zero = 32):
    """
    Consistent iterative hard thresholding for signal declipping, Kitic et al, ICASSP 2013

    Inputs:
        - clipped_samples_mat: bool vector, True if sample is clipped
        - D: fixed DCT dictionary

    Outputs:
        - A: sparse activation matrix
        - cost: loss vector
    """

    # initialize sparse coefficient matrix:
    a = np.zeros((D.shape[1]))

    mu = 1/np.linalg.norm(D,2)**2  # gradient descent parameter

    clipped_pos = np.logical_and(clipped_samples_mat, signal >= 0) # positive clipped samples
    clipped_neg = np.logical_and(clipped_samples_mat, signal <= 0) # negative clipped samples

    cost = np.nan*np.empty(iters+1) # save cost at each iteration

    # compute residual:
    residual = signal - D @ a
    # enforce clipping consistency:
    residual[clipped_pos] = np.maximum(residual[clipped_pos],0)
    residual[clipped_neg] = np.minimum(residual[clipped_neg],0)

    cost[0] = np.sum(residual**2)

    for it in range(1,iters):
        # gradient descent step:
        a = a + mu * D.T@residual

        # hard thresholding:
        a = a * (abs(a) >= np.sort(np.abs(a),0)[a.shape[0]-non_zero])

        # compute residual:
        residual = signal - D @ a
        # enforce clipping consistency:
        residual[clipped_pos] = np.maximum(residual[clipped_pos],0)
        residual[clipped_neg] = np.minimum(residual[clipped_neg],0)

        # compute cost:
        cost[it] = np.sum(residual**2)
        if cost[it] < 1e-4:
            break

    return a, cost


def repair_clipped(signal):
    """
    Repair clipped samples using consistent iterative hard thresholding
    """
    mx = np.max(signal)
    clipped_samples = np.logical_or(signal<=-mx,signal>=mx)
    a, _ = consistentIHT(signal,clipped_samples,D_DCT)
    return D_DCT@a

D_DCT = DCT_dictionary()


# Hodnocení

Viz zadání. Jak je v ZRE zvykem, student/ka s nejinovativnějším / nejzajímavějším řešením dostane láhev dobrého červeného.