Downloading the necessary modules and libraries for the project.

In [None]:
# Using these lines in terminal and remember to set the environment variable to this ipynb file
# conda create --name comp.sgn.120 python=3.11.3
# conda activate comp.sgn.120
# conda install numpy=1.26.2
# pip install ipykernel --upgrade
# conda install -c conda-forge ffmpeg

# Uncomment these lines to install the required packages if you haven't already
# !pip install pydub==0.25.1
# !pip install tqdm==4.66.1
# !pip install librosa==0.10.1
# !pip install matplotlib==3.7.2
# !pip install scikit-learn==1.3.2
# !pip install scipy==1.11.4 
# !pip install pandas==2.1.4


Necessary Modules and Libraries

In [None]:
import os

import numpy as np
import pandas as pd
from tqdm import tqdm, tqdm_pandas
tqdm.pandas()

# Database loading and Feature extraction
from pydub import AudioSegment
import librosa as lb
import librosa.display
from scipy.stats import skew
from scipy.signal import hamming, hann


# Representation
import IPython.display as ipd
import matplotlib.pyplot as plt

# Training and evaluating the model
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from sklearn.model_selection import GridSearchCV
from sklearn.decomposition import PCA

from sklearn.inspection import DecisionBoundaryDisplay

from sklearn.svm import SVC



Function for reading the data

In [None]:
def readFolder(folder):
    folder_names = []
    for root, dirs, files in os.walk(folder):
        for name in dirs:
            folder_names.append(os.path.join(root, name))
    return folder_names


def readFileInFolder(folder):
    file_lists = []
    for root, dirs, files in os.walk(folder):
        for name in files:
            file_lists.append(os.path.join(root, name))
    return file_lists

Function for extracting the features

In [None]:
# def getMFCC(y, sr, n_mel=40, hop_size=128, n_fft=512):
#     mfcc = lb.feature.mfcc(
#         y=y, sr=sr, n_mfcc=n_mel, hop_length=hop_size, norm="ortho", n_fft=n_fft
#     )
#     return mfcc


def getMFCC(name, path):
    n_mel=40
    hop_size=128
    n_fft=512
    data, _ = librosa.core.load(name, sr = None)
    try:
        ft1 = lb.feature.mfcc(y= data, n_mfcc=n_mel, hop_length=hop_size, norm="ortho", n_fft=n_fft)
        ft2 = librosa.feature.zero_crossing_rate(y = data)[0]
        ft3 = librosa.feature.spectral_rolloff(y= data)[0]
        ft4 = librosa.feature.spectral_centroid(y = data)[0]
        ft5 = librosa.feature.spectral_contrast(y = data)[0]
        ft6 = librosa.feature.spectral_bandwidth(y = data)[0]
        ft1_trunc = np.hstack((np.mean(ft1, axis=1), np.std(ft1, axis=1), skew(ft1, axis = 1), np.max(ft1, axis = 1), np.median(ft1, axis = 1), np.min(ft1, axis = 1)))
        ft2_trunc = np.hstack((np.mean(ft2), np.std(ft2), skew(ft2), np.max(ft2), np.median(ft2), np.min(ft2)))
        ft3_trunc = np.hstack((np.mean(ft3), np.std(ft3), skew(ft3), np.max(ft3), np.median(ft3), np.min(ft3)))
        ft4_trunc = np.hstack((np.mean(ft4), np.std(ft4), skew(ft4), np.max(ft4), np.median(ft4), np.min(ft4)))
        ft5_trunc = np.hstack((np.mean(ft5), np.std(ft5), skew(ft5), np.max(ft5), np.median(ft5), np.min(ft5)))
        ft6_trunc = np.hstack((np.mean(ft6), np.std(ft6), skew(ft6), np.max(ft6), np.median(ft6), np.max(ft6)))
        return pd.Series(np.hstack((ft1_trunc, ft2_trunc, ft3_trunc, ft4_trunc, ft5_trunc, ft6_trunc)))
    except:
        print('bad file')
        return pd.Series([0]*210)


def getMelSpectrogram(y, sr, n_mel=40, hop_size=128, n_fft=512):
    mel = lb.feature.melspectrogram(
        y=y, sr=sr, n_mels=n_mel, hop_length=hop_size, n_fft=n_fft
    )
    return mel

Utilities for the project

In [None]:
def convert_to_labels(preds, i2c, k=2):
    """
    :param preds: list of predictions
    :param i2c: index to class dictionary
    :param k: number of predictions to convert
    :return: list of converted predictions
    
    """
    ans = [] # list of converted predictions
    ids = [] # list of ids of converted predictions
    for p in preds:
        idx = np.argsort(p)[::-1]
        ids.append([i for i in idx[:k]])
        ans.append(' '.join([i2c[i] for i in idx[:k]]))
    # Convert the ans into a list of 1 value, which is higer probability
    for i in range(len(ans)):
        ans[i] = ans[i].split(' ')[0]
        
        
    return ans, ids

Data preprocessing

In [None]:
# Tram_Train: https://freesound.org/people/publictransport/packs/36726/
# Tram_Train: https://freesound.org/people/ali.abdelsalam/packs/36722/
# Bus_Train: https://freesound.org/people/emmakyllikki/packs/36810/
# Bus_Train: https://freesound.org/people/glingden/packs/36807/
# Tram_Test: My own recording
# Bus_Test: My own recording
folder_list = readFolder("audio")
folder_to_read = ["Bus_Test", "Bus_Train", "Tram_Test", "Tram_Train"]
bus_test = []
bus_train = []
tram_test = []
tram_train = []
label = {}
for folder in folder_list:
    # Read all the files and append to the list of files
    files = readFileInFolder(folder)
    for name in folder_to_read:
        if name in folder:
            # Append the files to the corresponding list
            if name == "Bus_Test":
                bus_test = files
                for file in files:
                    label[file] = "bus"
            elif name == "Bus_Train":
                bus_train = files
                for file in files:
                    label[file] = "bus"
            elif name == "Tram_Test":
                tram_test = files
                for file in files:
                    label[file] = "tram"
            elif name == "Tram_Train":
                tram_train = files
                for file in files:
                    label[file] = "tram"

In [None]:
# Read the audio files
bus_test_audio = []
bus_train_audio = []
tram_test_audio = []
tram_train_audio = []
for file in bus_train:
    y, sr = lb.load(file, sr=None)
    bus_train_audio.append((y, sr))
for file in tram_train:
    y, sr = lb.load(file, sr=None)
    tram_train_audio.append((y, sr))

# Read the audio files in m4a format
for file in bus_test:
    sound = AudioSegment.from_file(file, format="m4a")
    sound.export("temp.wav", format="wav")
    y, sr = lb.load("temp.wav", sr=None)
    bus_test_audio.append((y, sr))
    os.remove("temp.wav")
for file in tram_test:
    sound = AudioSegment.from_file(file, format="m4a")
    sound.export("temp.wav", format="wav")
    y, sr = lb.load("temp.wav", sr=None)
    tram_test_audio.append((y, sr))
    os.remove("temp.wav")

In [None]:
print("Number of bus train audio files: ", len(bus_train_audio))
print("Number of tram train audio files: ", len(tram_train_audio))
print("Number of bus test audio files: ", len(bus_test_audio))
print("Number of tram test audio files: ", len(tram_test_audio))


In [None]:
# Prepare Data
train_data = pd.DataFrame()
train_data["fname"] = bus_train + tram_train
test_data = pd.DataFrame()
test_data["fname"] = bus_test + tram_test

train_data = train_data["fname"].progress_apply(getMFCC, path = None)
print("done loading train mfcc")
test_data = test_data["fname"].progress_apply(getMFCC, path = None)
print("done loading test mfcc")


In [None]:
train_data["fname"] = bus_train + tram_train
train_data["label"] = train_data["fname"].apply(lambda x: label[x])

print("Train data:")
# print(train_data)


In [None]:
test_data["fname"] = bus_test + tram_test
test_data["label"] = test_data["fname"].apply(lambda x: label[x])

print("Test data:")
# print(test_data)

In [None]:
# Functions from Random Foresth using MFCC ttps://www.kaggle.com/amlanpraharaj/random-forest-using-mfcc-features
X = train_data.drop(['label', 'fname'], axis=1)
feature_names = list(X.columns)
X = X.values

labels = np.sort(np.unique(train_data.label.values))

num_class = len(labels)
c2i = {}
i2c = {}
for i, c in enumerate(labels):
    c2i[c] = i
    i2c[i] = c
y = np.array([c2i[x] for x in train_data.label.values])
X_test = test_data.drop(['label', 'fname'], axis=1).values
y_test = np.array([c2i[x] for x in test_data.label.values])


In [None]:
# Apply scaling for PCA
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_test_scaled = scaler.transform(X_test)

In [None]:
# Apply PCA for dimension reduction
pca = PCA(n_components=65).fit(X_scaled)
X_pca = pca.transform(X_scaled)
X_test_pca = pca.transform(X_test_scaled)

print(sum(pca.explained_variance_ratio_)) 

In [None]:
# Build a KNN model
X_train, X_val, y_train, y_val = train_test_split(X_pca, y, test_size = 0.2, random_state = 42, shuffle = True)

# clf = SVC(kernel = 'rbf', probability=True)
# clf.fit(X_train, y_train)

clf = Pipeline(
    steps=[("scaler", StandardScaler()), ("knn", KNeighborsClassifier(n_neighbors=2))]
)
clf.fit(X_train, y_train)



print(accuracy_score(clf.predict(X_val), y_val))


In [None]:
# Test the model with the test data
print(accuracy_score(clf.predict(X_test_pca), y_test))


In [None]:
# Define the paramter grid for C from 0.001 to 10, gamma from 0.001 to 10
# C_grid = [0.001, 0.01, 0.1, 1, 10]
# gamma_grid = [0.001, 0.01, 0.1, 1, 10]
# param_grid = {'C': C_grid, 'gamma' : gamma_grid}

# grid = GridSearchCV(SVC(kernel='rbf'), param_grid, cv = 3, scoring = "accuracy")
# grid.fit(X_train, y_train)

# # Find the best model
# print(grid.best_score_)

# print(grid.best_params_)

# print(grid.best_estimator_)

In [None]:
# Fit the entire training sets
# clf.fit(X_pca, y)
# str_preds, _ = convert_to_labels(clf.predict_proba(X_test_pca), i2c, k=2)

# # Write to outputs
# subm = pd.DataFrame()
# # subm['fname'] = audio_test_files
# # subm['label'] = str_preds
# subm['fname'] = test_data["fname"]
# subm['label'] = str_preds
# subm.to_csv('submission.csv', index=False)