In [152]:
# all libraries used in this notebook

import os.path
from pathlib import Path
import re
import librosa
import math
import numpy as np
import matplotlib.pyplot as plt
import sys
import importlib
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch import nn
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
import os
import csv

In [186]:
dataset_id = 3 # ID of the selected datasets, 1 - ethernet, 3 - hva 280, 5 - hva 630
selected_model = "ClickDetectorCNN_64_128_LW.py"
model_weights = "hva280_det_model_run_1_ch1_64_ch2_128.pt"

In [187]:
# import necessary functions from the folder "05_Utilities"

cwd = str(Path.cwd()) ## current working directory, should be the path to "01_Dataset", in which this notebook is located
parent_dir = str(Path(cwd).parent) ## parent dir of the current working directory, should be the project directory
utilities_dir_full_path = os.path.join(parent_dir, "05_Utilities") ## full path to the "05_Utilities" folder

# import moduls for audio file processing and mel-spectrogram plotting

sys.path.append(utilities_dir_full_path) ## add the path of the "05_Utilities" folder to the sys.path list

import audioProcessing ## modul for audio file processing and mel-spectrogram generation
import spectrogramPlotting ## modul for mel-spectrogram visualization
import sharedValues ## modul for shared variables between the classes

importlib.reload(audioProcessing)
importlib.reload(spectrogramPlotting)
importlib.reload(sharedValues)

# make instances of the classes in the modules
#processAudio = processAudio()
processAudio = audioProcessing.processAudio()
spectrogramPlotter = spectrogramPlotting.spectrogramPlotter()
sharedValuesConfig =sharedValues.sharedValuesConfig()

In [188]:
# import detection model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_architectures_dir = "03_Click_Detection_Model/01_modelArchitectures"
model_architectures_dir_path = os.path.join(parent_dir, model_architectures_dir)
print(model_architectures_dir_path)

if os.path.exists(model_architectures_dir_path):
    sys.path.append(model_architectures_dir_path)
    model_module = importlib.import_module(selected_model[:-3])
    #from ClickDetectorCNN_v1 import ClickDetectorCNN
    ClickDetectorCNN = getattr(model_module, 'ClickDetectorCNN') #access the ClickDetectorCNN class
    model = ClickDetectorCNN(input_channels=1, output_shape=1).to(device)
else:
    print("Model architectures directory does not exist")

model = ClickDetectorCNN(input_channels=1, output_shape=1).to(device)

# loda model weights

model_weights_dir = "03_Click_Detection_Model/02_savedWeights"
model_weights_dir_path = os.path.join(parent_dir, model_weights_dir)


model_weights_full_path = os.path.join(model_weights_dir_path, model_weights)

if os.path.exists(model_weights_full_path):
    model.load_state_dict(torch.load(model_weights_full_path, map_location=device, weights_only=True)) # load model weights, map location of weights to device
    model.to(device)
    print("Model weights have been loaded")
    print(f"model: {model}")
else:
    print("Model weights file does not exist")

/Users/mateseidl/Library/CloudStorage/OneDrive-SZTAKI/_SZTAKI/03_Masters_thesis/01_TUM_Masterarbeit_Ausarbeitung/07_Click_event_detection_software/01_Electrical_Connector_Click-Event_Detection_git_repo/03_Click_Detection_Model/01_modelArchitectures
Model weights have been loaded
model: ClickDetectorCNN(
  (block_1): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (block_2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=65536, out_features=1, bias=True)
    (2): Sigmoid()
  )
)


In [189]:
# import data from selected dataset

audio_datasets = []
audio_datasets_main_dir_path = None

def access_data_from_local_system():
    global audio_datasets_main_dir_path, audio_datasets
    cwd = str(Path.cwd())
    parent_dir = os.path.dirname(cwd)
    audio_datasets_main_dir = "01_Dataset/01_audioDatasets"
    audio_datasets_main_dir_path = os.path.join(parent_dir, audio_datasets_main_dir)

    if os.path.exists(audio_datasets_main_dir_path):
        for i in os.listdir(audio_datasets_main_dir_path):
            #if folder name does not start with a dot
            if i[0] != ".":
                audio_datasets.append(i)
        audio_datasets = sorted(audio_datasets)
    else:
        print("Audio dataset directory does not exist")

    return audio_datasets

audio_datasets = access_data_from_local_system()

print(audio_datasets)

['01_Ethernet', '02_Ethernet_Test', '03_HVA280', '04_HVA280_Test', '05_HVA630', '06_HVA630_Test', '07_Noise_Samples', 'voice_memo_loc_mac.txt']


In [190]:
# import data from selected test directory

long_window = False
if dataset_id == 2:
    long_window = True

def sort_key_func(file_name):
        numbers = re.findall(r'_(\d+)', file_name)
        if numbers:
            return int(numbers[0])
        return file_name

loaded_spec_chunks = None
loaded_spec_chunk_labels = None

dataset_dir_path = os.path.join(audio_datasets_main_dir_path, audio_datasets[dataset_id])

file_list = []
for f in os.listdir(dataset_dir_path):
    if f.endswith('.npz'):
        file_list.append(f)

test_files_sorted = sorted(file_list, key=sort_key_func)

print(test_files_sorted[:5])

['HVA280_test_1_dataset.npz', 'HVA280_test_2_dataset.npz', 'HVA280_test_3_dataset.npz', 'HVA280_test_4_dataset.npz', 'HVA280_test_5_dataset.npz']


In [191]:
def load_test_data(file_id):
    file_fullpath = os.path.join(dataset_dir_path, test_files_sorted[file_id])

    file_name = test_files_sorted[file_id]

    data = np.load(file_fullpath)
    loaded_spec_chunks = data['spec_chunks']
    loaded_spec_chunk_labels = data['labels']

    return loaded_spec_chunks, loaded_spec_chunk_labels, file_name

In [192]:
# normalize spectrogram chunks

def normalize_spectrogram_chunks(spec_chunks):
    # global min and max values for dB range
    global_min = -120
    global_max = 0

    # convert to numpy array
    spec_chunks = np.array(spec_chunks)

    # normalize to the range [0, 1]
    normalized_spectrograms = (spec_chunks - global_min) / (global_max - global_min)

    return normalized_spectrograms

In [193]:
def to_tensor(X, y):
    X = torch.from_numpy(X).type(torch.float32).unsqueeze(1) # convert to torch and add channel dimension
    y = torch.from_numpy(y).type(torch.float32)
    return X, y

In [194]:
def create_data_loader(X, y):
    BATCH_SIZE = len(X)
    test_dataset = torch.utils.data.TensorDataset(X, y)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    return test_loader

In [195]:
def run_prediction(model, test_loader):
    test_acc = 0

    all_preds = []
    all_true_labels = []

    binary_threshold = 0.5

    def accuracy_fn(y_true, y_pred):
            binary_predictions = (y_pred > binary_threshold).float()
            correct = torch.eq(y_true, binary_predictions).sum().item() # torch.eq() calculates where two tensors are equal
            acc = (correct / len(y_pred)) * 100
            return acc

    model.eval()
    with torch.inference_mode():
        for X, y in test_loader:
            X, y = X.to(device), y.to(device)
            # forward pass
            test_pred = model(X)
            test_pred = torch.squeeze(test_pred)

            # calculate accuracy
            binary_predictions = (test_pred > binary_threshold).float()

            test_acc += accuracy_fn(y_true=y, y_pred=test_pred)

            all_preds.extend(binary_predictions.tolist())
            all_true_labels.extend(y.tolist())

        # divide total accuracy by length of test dataloader (per batch)
        test_acc /= len(test_loader)

        test_acc_rounded = round(test_acc, 2)

    true_positive_indiced = [i for i, (pred, true) in enumerate(zip(all_preds, all_true_labels)) if pred == 1.0 and true == 1.0]
    false_positive_indices = [i for i, (pred, true) in enumerate(zip(all_preds, all_true_labels)) if pred == 1.0 and true == 0.0]

    detection_result = None
    first_TP_index = None
    first_FP_index = None

    # delta error
    delta = 1

    if len(true_positive_indiced) == 0 and len(false_positive_indices) == 0:
        detection_result = "Detection failed"
    elif len(true_positive_indiced) == 0 and len(false_positive_indices) > 0:
        detection_result = "Detection failed"
        first_FP_index = min(false_positive_indices)
    elif len(true_positive_indiced) > 0 and len(false_positive_indices) > 0 and min(false_positive_indices) < min(true_positive_indiced)-delta:
        detection_result = "Detection failed"
        first_TP_index = min(true_positive_indiced)
        first_FP_index = min(false_positive_indices)
    elif len(true_positive_indiced) > 0 and len(false_positive_indices) > 0 and min(true_positive_indiced) < min(false_positive_indices)+delta:
        detection_result = "Detection successful"
        first_TP_index = min(true_positive_indiced)
        first_FP_index = min(false_positive_indices)
    else:
        detection_result = "Detection successful"
        first_TP_index = min(true_positive_indiced)


    true_positives = len([i for i, (pred, true) in enumerate(zip(all_preds, all_true_labels)) if pred == 1.0 and true == 1.0])
    true_negatives = len([i for i, (pred, true) in enumerate(zip(all_preds, all_true_labels)) if pred == 0.0 and true == 0.0])
    false_positives = len([i for i, (pred, true) in enumerate(zip(all_preds, all_true_labels)) if pred == 1.0 and true == 0.0])
    false_negatives = len([i for i, (pred, true) in enumerate(zip(all_preds, all_true_labels)) if pred == 0.0 and true == 1.0])

    return test_acc_rounded, true_positives, true_negatives, false_positives, false_negatives, detection_result, first_TP_index, first_FP_index
    

In [196]:
results = []

save_results_dir = "01_testResults"
save_results_dir_path = os.path.join(cwd, save_results_dir)

for file_id in range(len(test_files_sorted)):
    loaded_spec_chunks, loaded_spec_chunk_labels, file_name = load_test_data(file_id)
    normalized_spectrograms = normalize_spectrogram_chunks(loaded_spec_chunks)
    X, y = to_tensor(normalized_spectrograms, loaded_spec_chunk_labels)
    test_loader = create_data_loader(X, y)
    test_acc, true_positives, true_negatives, false_positives, false_negatives, detection_result, first_TP_index, first_FP_index = run_prediction(model, test_loader)

    results.append([file_name, test_acc, true_positives, true_negatives, false_positives, false_negatives, detection_result, first_TP_index, first_FP_index])

csv_file_name = model_weights[:-3] + "_test" + ".csv"

csv_file_path = os.path.join(save_results_dir_path, csv_file_name)

with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['File Name', 'Test Accuracy', 'True Positives', 'True Negatives', 'False Positives', 'False Negatives', 'Detection Result', 'First TP Index', 'First FP Index'])
    writer.writerows(results)