In [None]:
import numpy as np
import librosa
import librosa.display
import pandas as pd
import random
import glob
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import make_pipeline
import warnings
warnings.filterwarnings('ignore')

In [None]:
def spec_to_image(spec, eps=1e-6):
    mean = spec.mean()
    std = spec.std()
    spec_norm = (spec - mean) / (std + eps)
    spec_min, spec_max = spec_norm.min(), spec_norm.max()
    spec_scaled = 255 * (spec_norm - spec_min) / (spec_max - spec_min)
    spec_scaled = spec_scaled.astype(np.uint8)
        
    return spec_scaled

def get_melspectrogram_db(file_path, aug=False, sr=48000, n_fft=2048, hop_length=256, n_mels=128, fmin=20, fmax=8300, top_db=80):
    wav, _ = librosa.load(file_path, sr=sr)
    
    # # Ensure audio is at least 5 seconds
    if wav.shape[0] < 3 * sr:
        wav = np.pad(wav, int(np.ceil((3 * sr - wav.shape[0]) / 2)), mode='reflect')
    else:
        wav = wav[:3 * sr]
 
    if aug:
        if random.random() < 0.5:
            wav = pitch_shift(wav, sr, n_steps=random.uniform(-1, 1))  # Pitch-shift
    
    spec = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, fmin=fmin, fmax=fmax)
    spec_db = librosa.power_to_db(spec, top_db=top_db)

    return spec_db


        
# image_post = spec_to_image(get_melspectrogram_db(file_path, aug=self.augs))[np.newaxis, ...]
# image_pre = spec_to_image(get_melspectrogram_db(file_path.replace('post', 'pre'), 
#                                                         aug=self.augs))[np.newaxis, ...]     
# combined = np.concatenate([image_pre, image_post])
# label = torch.tensor(self.c2i[row['class']], dtype=torch.long)

In [None]:
# 97 total: 77 train, 10 val, 10 test

### Splitting data

In [None]:
random.seed(10)

# Data files
data_path_pre = 'wetvoice/extractedMPTs/pre swallow/**'
data_files_pre = [x for x in glob.glob(data_path_pre) if '.wav' in x and 'android' not in x]

data_path_post = 'wetvoice/extractedMPTs/post swallow/**'
data_files_post = [x for x in glob.glob(data_path_post) if '.wav' in x and 'android' not in x]

ids = [x.split('/')[-1].split('_post')[0] for x in data_files_post]

train_files = [x for x in data_files_pre if x.split('/')[-1].split('_pre')[0] in ids[:95]]
# val_files = [x for x in data_files_pre if x.split('/')[-1].split('_pre')[0] in ids[77:87]]
# test_files = [x for x in data_files_pre if x.split('/')[-1].split('_pre')[0] in ids[87:]]

len(train_files)

#### Labels

In [None]:
df = pd.read_excel('audio file numbers and aspiration values.xlsx')
df.head()

In [None]:
train_labels = [df[df['Audio File Name']==int(i.split('/')[-1].split('_pre')[0])]['Aspiration  '].values[0]
                for i in train_files]
train_labels = [1 if x == 'Yes' else 0 for x in train_labels]
len(train_labels)

## KNN

In [None]:
def extract_features(f_path):
    y, sr = librosa.load(f_path)
    mfcc = librosa.feature.mfcc(y=y, sr=sr)
    return mfcc.mean(axis=1)

audio_paths_a = train_files
audio_paths_b = [x.replace('post', 'pre') for x in train_files]

features = []
for path_a, path_b in zip(audio_paths_a, audio_paths_b):
    features_a = extract_features(path_a)
    features_b = extract_features(path_b)

    feature_diff = np.abs(features_a - features_b)
    features.append(feature_diff)

features = np.array(features)
labels = np.array(train_labels)  

# Split the dataset 
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42,  stratify=labels)

In [None]:
knn = KNeighborsClassifier(n_neighbors=10)
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)

print(classification_report(y_test, predictions))

In [None]:
predictions = knn.predict(X_train)
print(classification_report(y_train, predictions))

### Pipeline

In [None]:
# Create a pipeline that includes scaling, PCA for dimensionality reduction, and KNN
pipeline = make_pipeline(
    StandardScaler(),
    PCA(n_components=3),  
    KNeighborsClassifier(n_neighbors=5)
)

pipeline.fit(X_train, y_train)
predictions = pipeline.predict(X_test)
print(classification_report(y_test, predictions))

## Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

rf = RandomForestClassifier(n_estimators=200, random_state=42)

rf.fit(X_train, y_train)
predictions = rf.predict(X_test)

# Evaluate the model
print(classification_report(y_test, predictions))

## SVM

In [None]:
from sklearn.svm import OneClassSVM

normal_features = features[labels == 0]

scaler = StandardScaler()
normal_features_scaled = scaler.fit_transform(normal_features)

oc_svm = OneClassSVM(kernel='rbf', gamma='auto')

oc_svm.fit(normal_features_scaled)

full_features_scaled = scaler.transform(features)
predictions = oc_svm.predict(full_features_scaled)

predictions = (predictions == -1).astype(int)

# Evaluate the model
print(classification_report(labels, predictions))