In [1]:
import ikrlib as ilib
import numpy as np
import os
import librosa
import soundfile as sf
import noisereduce as nr
import scipy
from pydub import AudioSegment
from pydub.silence import detect_nonsilent



In [2]:
cepstral_mean_subtraction_enabled = False
delta_coefficients_enabled = True
coefficients_normalization = True
data_pre_emphasis = True

In [None]:
def audio_adjust(dir):
    min_silence_len = 1000  # Minimálna dĺžka ticha (ms)
    silence_thresh = -44    # prah ticha (dB)

    print(f"Removing silence from records in directory {dir}")
    if not os.path.isdir(dir + "/rs"):
        os.mkdir(dir + "/rs")
    for f in os.listdir(dir):
        if f[-3:] == "wav":
            input_file = dir + "/" + f
            audio = AudioSegment.from_wav(input_file)
            nonsilent_intervals = detect_nonsilent(audio, min_silence_len, silence_thresh)

            # Zkonkatenuj invervaly kde nie je ticho
            non_silent_audio = AudioSegment.empty()
            for start, end in nonsilent_intervals:
                non_silent_audio += audio[start:end]

            # Ulož audio
            output_file = dir + "/rs/" + f
            non_silent_audio.export(output_file, format="wav")

audio_adjust("target_train")
audio_adjust("non_target_train")
audio_adjust("target_dev")
audio_adjust("non_target_dev")

Removing silence from records in directory target_train
Removing silence from records in directory non_target_train


In [None]:
def reduce_noise(dir):
    print(f"Removing noise from records in directory {dir}")
    if not os.path.isdir(dir + "/rn"):
        os.mkdir(dir + "/rn")
    for f in os.listdir(dir):
        if f[-3:] == "wav":
            input_file = dir + "/rs/" + f

            # Load the audio file
            audio, sr = librosa.load(input_file, sr=None)

            # Select a portion of the audio that contains only noise (e.g., the first 0.5 seconds)
            noise_sample = audio[:int(sr * 0.5)]

            # Perform noise reduction using the noise sample
            reduced_audio = nr.reduce_noise(y=audio, sr=sr, y_noise=noise_sample)

            # Save the noise-reduced audio to a new file
            output_file = dir + "/rn/" + f
            sf.write(output_file, reduced_audio, sr)

reduce_noise("target_train")
reduce_noise("non_target_train")
reduce_noise("target_dev")
reduce_noise("non_target_dev")

In [None]:
def data_augumentation(dir):
    print(f"Removing noise from records in directory {dir}")
    for f in os.listdir(dir):
        if f[-3:] == "wav":
            input_file = dir + "/" + f
            print("Data augumentation of file: " + input_file)

            time_stretched_audio = ilib.apply_time_stretching(input_file)
            pitch_shifted_audio = ilib.apply_pitch_shifting(input_file, semitones=2)
            time_shifted_audio = ilib.apply_time_shifting(input_file, shift_ms=500)

            # find the index of the "." in the filename
            dot_index = input_file.index(".")

            # insert "aug" between "audio" and ".wav"
            stretched_file = input_file[:dot_index-5] + "_stretched_aug" + input_file[dot_index-5:dot_index] + input_file[dot_index:]
            pitch_shifted_file = input_file[:dot_index-5] + "_pitch_shifted_aug" + input_file[dot_index-5:dot_index] + input_file[dot_index:]
            time_shifted_file = input_file[:dot_index-5] + "_time_shifted_aug" + input_file[dot_index-5:dot_index] + input_file[dot_index:]

            time_stretched_audio.export(stretched_file, format="wav")
            pitch_shifted_audio.export(pitch_shifted_file, format="wav")
            time_shifted_audio.export(time_shifted_file, format="wav")

data_augumentation("target_train/rn")
data_augumentation("non_target_train/rn")

In [None]:
def pre_emphasis(dir):
    data = []
    for f in os.listdir(dir):
        if f[-3:] == "wav":
            input_file = dir + "/" + f
            print("Proccessing input file: " + input_file)
            sample_rate, audio_samples = ilib.read_wav_file(input_file)

            emphasized_audio = ilib.apply_pre_emphasis(audio_samples)

            assert(sample_rate==16000)
            data.append(ilib.extract_mfcc(emphasized_audio, sample_rate))
    return data

if data_pre_emphasis:
    train_t = np.vstack(pre_emphasis('target_train/rn'))
    train_n = np.vstack(pre_emphasis('non_target_train/rn'))

    test_t = pre_emphasis('target_dev/rn')
    test_n = pre_emphasis('non_target_dev/rn')

In [None]:
if not data_pre_emphasis:
    train_t = ilib.wav16khz2mfcc('target_train/rn').values()
    train_n = ilib.wav16khz2mfcc('non_target_train/rn').values()

    test_t = ilib.wav16khz2mfcc('target_dev/rn').values()
    test_n = ilib.wav16khz2mfcc('non_target_dev/rn').values()

    # Koeficienty sa konkatenuju do jedneho poľa
    train_t = np.vstack(list(train_t))
    train_n = np.vstack(list(train_n))

In [None]:
def min_max_normalize(data):
    min_vals = np.min(data, axis=0)
    max_vals = np.max(data, axis=0)
    normalized_data = (data - min_vals) / (max_vals - min_vals)
    return normalized_data

if coefficients_normalization:
    train_t = min_max_normalize(train_t)
    train_n = min_max_normalize(train_n)

In [None]:
def compute_deltas(cepstral_coeffs, window_size=2):
    num_frames, num_coeffs = cepstral_coeffs.shape
    deltas = np.zeros((num_frames, num_coeffs))

    for t in range(num_frames):
        window_start = max(0, t - window_size)
        window_end = min(num_frames, t + window_size + 1)
        window_indices = np.arange(window_start, window_end)
        window_weights = window_indices - t

        weighted_sum = np.sum(window_weights[:, np.newaxis] * cepstral_coeffs[window_indices, :], axis=0)
        weight_sum_squared = np.sum(window_weights ** 2)

        deltas[t] = weighted_sum / weight_sum_squared

    return deltas

if delta_coefficients_enabled:
    train_t_delta_coeffs = compute_deltas(train_t, window_size=2)
    train_n_delta_coeffs = compute_deltas(train_n, window_size=2)

    train_t_derivative_delta_coeffs = compute_deltas(train_t_delta_coeffs, window_size=2)
    train_n_derivative_delta_coeffs = compute_deltas(train_n_delta_coeffs, window_size=2)

    train_t = np.concatenate((train_t, train_t_delta_coeffs, train_t_derivative_delta_coeffs), axis=1)
    train_n = np.concatenate((train_n, train_n_delta_coeffs, train_n_derivative_delta_coeffs), axis=1)

In [None]:
def cepstral_mean_subtraction(cepstral_coeffs):
    # Calculate the mean of the cepstral coefficients across all frames (axis 0)
    mean_coeffs = np.mean(cepstral_coeffs, axis=0)

    # Subtract the mean from the original cepstral coefficients
    cms_coeffs = cepstral_coeffs - mean_coeffs

    return cms_coeffs

if cepstral_mean_subtraction_enabled:
    train_t = cepstral_mean_subtraction(train_t)
    train_n = cepstral_mean_subtraction(train_n)
#print(train_t)

In [None]:
M_t = 5  # Počet gaussovských komponent
MUs_t = train_t[np.random.randint(1, len(train_t), M_t)]  # Počiatočna stredná hodnota
COVs_t = [np.cov(train_t.T)] * M_t  # Počiatočna kovariančná matica
Ws_t = np.ones(M_t) / M_t

M_n = 30
MUs_n = train_n[np.random.randint(1, len(train_n), M_n)]
COVs_n = [np.cov(train_n.T)] * M_t
Ws_n = np.ones(M_n) / M_n

In [None]:
for jj in range(30):
    # TTL_t je doveryhodnosť
    Ws_t, MUs_t, COVs_t, TTL_t = ilib.train_gmm(train_t, Ws_t, MUs_t, COVs_t)
    Ws_n, MUs_n, COVs_n, TTL_n = ilib.train_gmm(train_n, Ws_n, MUs_n, COVs_n)
    print(f'Iteration: {jj} Total log likelihood: {TTL_t} for target {TTL_n} for non target')

In [None]:
P_t=0.5
P_n=1.0-P_t

score=[]
for tst in test_t:
    test_modif_t = tst.copy()
    test_modif_n = tst.copy()

    if coefficients_normalization:
        test_modif_t = min_max_normalize(test_modif_t)
        test_modif_n = min_max_normalize(test_modif_n)

    if delta_coefficients_enabled:
        test_t_delta_coeffs = compute_deltas(tst, window_size=2)
        test_n_delta_coeffs = compute_deltas(tst, window_size=2)

        test_t_derivative_delta_coeffs = compute_deltas(test_t_delta_coeffs, window_size=2)
        test_n_derivative_delta_coeffs = compute_deltas(test_n_delta_coeffs, window_size=2)

        test_modif_t = np.concatenate((test_modif_t, test_t_delta_coeffs, test_t_derivative_delta_coeffs), axis=1)
        test_modif_n = np.concatenate((test_modif_n, test_n_delta_coeffs, test_n_derivative_delta_coeffs), axis=1)

    if cepstral_mean_subtraction_enabled:
        test_modif_t = cepstral_mean_subtraction(test_modif_t)
        test_modif_n = cepstral_mean_subtraction(test_modif_n)

    ll_t = ilib.logpdf_gmm(test_modif_t, Ws_t, MUs_t, COVs_t)
    ll_n = ilib.logpdf_gmm(test_modif_n, Ws_n, MUs_n, COVs_n)
    score.append((sum(ll_t) + np.log(P_t)) - (sum(ll_n) + np.log(P_n)))
print(score)
print(f"Fraction of correctly recognized targets: {np.mean(np.array(score) > 0) * 100}%")

In [None]:
score=[]
for tst in test_n:
    test_modif_t = tst.copy()
    test_modif_n = tst.copy()

    if coefficients_normalization:
        test_modif_t = min_max_normalize(test_modif_t)
        test_modif_n = min_max_normalize(test_modif_n)

    if delta_coefficients_enabled:
        test_t_delta_coeffs = compute_deltas(test_modif_t, window_size=2)
        test_n_delta_coeffs = compute_deltas(test_modif_n, window_size=2)

        test_t_derivative_delta_coeffs = compute_deltas(test_t_delta_coeffs, window_size=2)
        test_n_derivative_delta_coeffs = compute_deltas(test_n_delta_coeffs, window_size=2)

        test_modif_t = np.concatenate((tst, test_t_delta_coeffs, test_t_derivative_delta_coeffs), axis=1)
        test_modif_n = np.concatenate((tst, test_n_delta_coeffs, test_n_derivative_delta_coeffs), axis=1)

    if cepstral_mean_subtraction_enabled:
        test_modif_t = cepstral_mean_subtraction(test_modif_t)
        test_modif_n = cepstral_mean_subtraction(test_modif_n)

    ll_t = ilib.logpdf_gmm(test_modif_t, Ws_t, MUs_t, COVs_t)
    ll_n = ilib.logpdf_gmm(test_modif_n, Ws_n, MUs_n, COVs_n)
    score.append((sum(ll_t) + np.log(P_t)) - (sum(ll_n) + np.log(P_n)))
print(score)
print(f"Fraction of correctly recognized non targets: {np.mean(np.array(score) < 0) * 100}%")