In [None]:
#These were the libraies imported for use for the project.
import numpy as np 
import pandas as pd 
import os
import glob
from tqdm import tqdm
from time import sleep
from tabulate import tabulate
import matplotlib.pyplot as plt
import librosa as lib
import IPython.display as ipd
from pathlib import PurePosixPath, PureWindowsPath, WindowsPath
import pathlib
import tensorflow as tf
from sklearn.model_selection import train_test_split
import shutil
import soundfile
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical
from keras import Sequential
from keras.layers import Dense,Conv2D,MaxPooling2D,Flatten,Dropout
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from nltk.tokenize import word_tokenize
import re
from langdetect import detect
from sklearn.metrics import accuracy_score
from tensorflow.keras import models
from tensorflow.keras import layers

In [None]:
# Assign directory
#This should correspond to the path where the dataset sits for reliable and accurate access
directory = '[home_directory_path]/UGSPEECHDATA/' 

#This should correspond to the path where augmentation or any other additional file or folder generation takes place so as to
#avoid unnecessary changes to dataset which can affect future use of dataset
dir = '[any path of your choice]'

# Defined column names in dataset corresponding to the excel sheet containing or transcribed audios (order doesn't matter),
# It was reference here to be use to execute a data normalization/cleaning later on in the course of project. This was neccessary because of
# the non-uniform naming of the columns between at least more than one of the languages selected transcribed audio file
target_column_names = ['FILE_NO.', 'IMAGE_PATH', 'IMAGE_SRC_URL', 'AUDIO_PATH', 'TRANSCRIPTION',
                       'SPEAKER_ID','ORG_NAME', 'PROJECT_NAME', 'LOCALE', 'GENDER', 'AGE',
                       'DEVICE', 'ENVIRONMENT', 'YEAR','FULL_FILENAME', 'FILENAME']

#This was used for data visualization purposes to label each of the langauges
locale = ['ak_gh', 'dga_gh', 'dag_gh', 'ee_gh', 'kpo_gh'] 

#excel file for use
# vital for the merging of all five langauges selected transcribed audio as one single dataframe(rown and columns datatype from the pandas library)
target_excel = '/selected transcribed audios/selected transcribed audios.xlsx'


In [None]:
# Iterate over folder for transcriped audio file in directory
def get_transcribed_files():
    fileList = []
    for name in os.listdir(directory):
        if not name.__contains__("."):
          #
          fileList.append(os.path.join(directory, name)+target_excel)
            
          #Prints directory folder names corresponding to languages 
          #print(os.path.join(name))
    return fileList

In [None]:
# Reads excel data and store as dataframe
def read_data_excel(url):
    df = pd.read_excel(url, index_col=0, keep_default_na=False, na_values='')
    return df

In [None]:
# Cleanup column names for consolidation of languages and their coresponding data 
mapper = {}

def format_column_names(dataframe):
    for col_name in dataframe.columns:
        for target_name in target_column_names:
            #
            df_column_for_compare = col_name.lower().strip().replace(' ', '_').removesuffix('2').removesuffix('s')
            target_column_for_compare = target_name.lower().strip().replace(' ', '_').removesuffix('2').removesuffix('s')
    
            if df_column_for_compare == target_column_for_compare:
                mapper[col_name] = target_name
                break
    
    # Rename the columns
    dataframe = dataframe.rename(columns=mapper)
    
    return dataframe
    

In [None]:
# declares and initiates and empty list to store all consolidated languages excel data
frame = []
fileList = get_transcribed_files() # gets all selected transcribed audio excel file

def consolidate_lang_datas():
    
  for file in tqdm(fileList):
      df = read_data_excel(file) #read data from excel sheet
      df = format_column_names(df)
      frame.append(df) # append each new sheet as a value in the frame list
  #    
  print('Completed')

#
consolidate_lang_datas() # call function for the frame list to be initialized

In [None]:
# Returns frame length
len(frame) 

In [None]:
#Print fileList involved in the creation of the frame
fileList

In [None]:
#First three entries in Frame 1   
frame[0].head(3)

In [None]:
#First three entries in Frame 2
frame[1].head(3)

In [None]:
#First three entries in Frame 3
frame[2].head(3)

In [None]:
#First three entries in Frame 4
frame[3].head(3)

In [None]:
#First three entries in Frame 5
frame[4].head(3)

In [None]:
#combine all dataframes in frame as a single dataframe for processing
df_combined = pd.concat(frame, join='outer')

In [None]:
#Structure of dataframe 1 in frame, corresponds to index 0
frame[0].info()

In [None]:
#Structure of dataframe 4 in frame, corresponds to index 3
frame[3].info()

In [None]:
# information on the combined dataframe containing details of selected subcribed audio of all five languages
df_combined.info()

In [None]:
#Process to eliminate columns which are unlikely to impact the result of training 
final_df = df_combined.drop(['FILE_NO.', 'IMAGE_SRC_URL', 'ORG_NAME', 'PROJECT_NAME', 'YEAR', 'FILENAME', 'IMAGE_PATH', 
                            'SPEAKER_ID', 'GENDER', 'AGE', 'DEVICE', 'ENVIRONMENT'], axis=1)

In [None]:
#Updated dataframe information after eliminating less important columns
final_df.info()

In [None]:
#Converts file path from windows Path to its unix equivalent for run on unix machines
#There won't be a need to use this function when running project on a windows machine
def get_audio_path_unix(windowsPath):
    path = PureWindowsPath(windowsPath)
    audio_file = PurePosixPath(directory, *path.parts[0:])
    audio_file.as_posix()
    return audio_file

In [None]:
#Removes files whose path seems untraceable/does not exist
for i in tqdm(final_df.AUDIO_PATH):
   unix_path = get_audio_path_unix(i)
   if not os.path.exists(unix_path) and (i.startswith('Akan') or not i.startswith('Ewe') or not i.startswith('Ikposo') or not i.startswith('Dagbani') or not i.startswith('Dagaare')):
     # print(i)
     final_df.drop(final_df.loc[final_df['AUDIO_PATH']==i].index, inplace=True)

In [None]:
# Get final dataframe information 
final_df.info()

In [None]:
# Drop or delete rows who AUDIO_PATH or TRANSCRIPTION column/s equivalent data is null or None or empty
final_df.dropna(subset=['AUDIO_PATH', 'TRANSCRIPTION'], inplace=True)
#Gets information on dataframe after excecuting the above function
final_df.info()

In [None]:
#Gets 10 entries from the final dataframe
final_df.head(10)

In [None]:
#Counting the number of entries per each language in the final dataframe
final_df.LOCALE.value_counts()

In [None]:
#Display the plot of Transcription against locale in a bar chart format
grouped_df = final_df.groupby('LOCALE').count()[['TRANSCRIPTION']]

grouped_df.plot(kind='bar')
plt.show()


In [None]:
#Displays number of entries corresponding to each locale on a bar chart
def data_distribution(data):
    plt.figure(figsize=(16,3))
    data.LOCALE.value_counts().plot(kind='bar', title="Data Category distribution")
    plt.show()


In [None]:
#calls the function above
data_distribution(final_df)

In [None]:
# obtain the first occurence of a given locale from a provided dataframe, this information is used later for data visualization purposes
def get_first_locale_occurence(df, locale):
    return df[df['LOCALE'] == locale].head(1)

In [None]:
# displays audio file into its spectrogram format, which is the format that all audio files will be converted to be able to use it for training
def spectrogram(file_path, locale):
    y, sr = lib.load(file_path)
    plt.figure(figsize=(16,3))
    plt.title(locale + 'Log-Frequency Power Spectrogram')
    data = lib.amplitude_to_db(np.abs(lib.stft(y)), ref=np.max)
    lib.display.specshow(data, y_axis='log', x_axis='time')
    plt.colorbar();

In [None]:
# Displays audio file in its corresponding wave format
def waveform(file_path, label):
    y, sr = lib.load(file_path)
    plt.figure(figsize=(16, 3))
    plt.title(label + ' Sound Wave')
    lib.display.waveshow(y, color="blue")
    # librosa.display.waveshow(y, sr=sr)

In [None]:
# plays raw audio, one which can be heard
def play_raw_audio_File(file_path):
    return ipd.Audio(file_path) #to hear sound play in Notebooks not interactive shell like IPython

In [None]:
#Display the spectrogram, waveform and raw audio play of the first akan language audio file in the final dataframe
akan1_pd = get_first_locale_occurence(final_df, 'ak_gh')
audio_path_akan = akan1_pd.iloc[0].AUDIO_PATH
transcription_akan = akan1_pd.iloc[0].TRANSCRIPTION
audio_unix_akan = get_audio_path_unix(audio_path_akan)
print(audio_unix_akan)
#
spectrogram(audio_unix_akan, transcription_akan)


#
waveform(audio_unix_akan, transcription_akan)


#
play_raw_audio_File(audio_unix_akan)

In [None]:
#Display the spectrogram, waveform and raw audio play of the first dagbani language audio file in the final dataframe
dagbani1_pd = get_first_locale_occurence(final_df, 'dga_gh')
audio_path_dagbani = dagbani1_pd.iloc[0].AUDIO_PATH
transcription_dagbani = dagbani1_pd.iloc[0].TRANSCRIPTION
audio_unix_dagbani = get_audio_path_unix(audio_path_dagbani)
print(audio_unix_dagbani)

#
spectrogram(audio_unix_dagbani, transcription_dagbani)

#
waveform(audio_unix_dagbani, transcription_dagbani)

#Dagbani
play_raw_audio_File(audio_unix_dagbani)

In [None]:
#Display the spectrogram, waveform and raw audio play of the first dagaare language audio file in the final dataframe
dagaare1_pd = get_first_locale_occurence(final_df, 'dag_gh')
audio_path_dagaree = dagaare1_pd.iloc[0].AUDIO_PATH
transcription_dagaare = dagaare1_pd.iloc[0].TRANSCRIPTION
audio_unix_dagaare = get_audio_path_unix(audio_path_dagaree)
print(audio_unix_dagaare)
#
spectrogram(audio_unix_dagaare, transcription_dagaare)

#Dagaare
waveform(audio_unix_dagaare, transcription_dagaare)

#Dagaare
play_raw_audio_File(audio_unix_dagaare)

In [None]:
# Display the spectrogram, waveform and raw audio play of the first Ewe language audio file in the final dataframe
ewe1_pd = get_first_locale_occurence(final_df, 'ee_gh')
audio_path_ewe = ewe1_pd.iloc[0].AUDIO_PATH
transcription_ewe = ewe1_pd.iloc[0].TRANSCRIPTION
audio_unix_ewe = get_audio_path_unix(audio_path_ewe)
print(audio_unix_ewe)
#
spectrogram(audio_unix_ewe, transcription_ewe)

#Ewe
waveform(audio_unix_ewe, transcription_ewe)

#Ewe
play_raw_audio_File(audio_unix_ewe)

In [None]:
# Display the spectrogram, waveform and raw audio play of the first ikposo language audio file in the final dataframe
ikposo1_pd = get_first_locale_occurence(final_df, 'kpo_gh')
audio_path_ikposo = ikposo1_pd.iloc[0].AUDIO_PATH
transcription_ikposo = ikposo1_pd.iloc[0].TRANSCRIPTION
audio_unix_ikposo = get_audio_path_unix(audio_path_ikposo)
print(audio_unix_ikposo)
#
spectrogram(audio_unix_ikposo, transcription_ikposo)

#Ikposo
waveform(audio_unix_ikposo, transcription_ikposo)

#Ikposo
play_raw_audio_File(audio_unix_ikposo)

In [None]:
#Reduce dataset using Locale as reference of grouping of the final dataframe. This was exceuted in order to reduce the datasize,
# The datasize of the final_df is 93166 which happens to be too huge for my machine to process(CPU an mermory)
df_final_sample = final_df.groupby("LOCALE").sample(n=1350)

In [None]:
#Information on the Dataframe assigned the sampling result, it has a size of 6750, which is quite smaller and manageable as opposed to the initial
# size of 93,166
df_final_sample.info()

In [None]:
#verifies the sampling criteria of obtaining 1350 dataset from each local
df_final_sample.LOCALE.value_counts()

In [None]:
#Creates a working directory for data augmentation purposes. Data augmentation was consideered because of the imbalance in the 
# existing in the dataframe
class distribution of the labels
os.mkdir(dir+'working/')
os.mkdir(dir+'working/Data2') # creates the Data2 directory where all 1350 by 5 languages == 6750 datasets or entries, in this case audio_files are transfered for augmentation purposes
#copies files from folder into Data2, these folders in the context of our training are set_a and set_b
def fill_folder1_toData2(): #copy files using the file path in the dataframe to the Data2 directory
    destination = dir+'working/Data2/'

    # Iterate over the files and copy them to the destination directory
    for audio_file in tqdm(df_final_sample.AUDIO_PATH):
        source_file = get_audio_path_unix(audio_file)
        destination_file = os.path.join(destination, os.path.basename(source_file))
        shutil.copy2(source_file, destination_file)

fill_folder1_toData2() #call function to begin the copy

In [None]:
# creates the OUT folder where files copied into Data2 and their augmented copy will be located
os.mkdir(dir+'working/OUT')

# get the file_path from a given folder path
def get_fileNames(path): 
       onlyfiles = next(os.walk(path))[2] 
       return onlyfiles

#returns length of files executed by get_fileNames
len(get_fileNames(dir+'working/OUT'))

In [None]:
# Pitch shifting involves changing the frequency content of an audio signal while preserving its duration. This can be achieved using 
# digital signal processing techniques such as time-stretching. in the case of some audio files this form of augmentation rendered it currupted
def changing_pitch(step, src_path, dst_path):
    files = get_fileNames(src_path)
    # print(len(files))
    if not os.path.exists(dst_path):
      os.makedirs(dst_path)
    for file in tqdm(files):
      filename = os.path.basename(file).replace(directory, "")
      y, sr = lib.load(src_path+'/'+file)
      updated_y = lib.effects.pitch_shift(y, sr=sr, n_steps=step)
      soundfile.write(dst_path + '/' + filename.split('.mp3')[0] + '_' + str(step) + '.mp3', updated_y, sr)

In [None]:
# provide paramter or step for changing the pitch of the audio signal. In this case we changed the pitch by 2
def sound_augmentation(src_path, dst_path):
    steps = [2] #[2, -2, 2.5, -2.5]
    for step in steps:
        changing_pitch(step, src_path, dst_path)

    files = get_fileNames(src_path)
    for f in files:
      shutil.copy(src_path+'/'+f, dst_path) # after changing pitch of each file, it copy all files who pitch has been changed to provided destination folder

In [None]:
# creates audio data through pitch shifting
def create_new_augmented_data_files():
    # Checking and creating new directory for saving newly generated audio files using data augmentation
    if os.path.exists(dir+'working/OUT'):
      if len(get_fileNames(dir+'working/OUT')) == 6750:
          print('Sound Augmentation Already Done and Saved')
      else:
          shutil.rmtree(dir+'working/OUT')
          sound_augmentation(dir+'working/Data2', dir+'working/OUT')
    else:
        sound_augmentation(dir+'working/Data2', dir+'working/OUT')


In [None]:
# calls creates create_new_augmented data files function.
create_new_augmented_data_files()

In [None]:
#handles case where oringal file needs to be retrieved after augmented
def get_aug_file(file):
    if file.__contains__('_2.mp3'):
        return file.split('_2.mp3')[0]+'.mp3' #other returns None type not ""
        
    elif (file.__contains__('_0.8.mp3')):
           return file.split('_0.8.mp3')[0]+'.mp3' 
        
    else:
        return file


In [None]:
# Gets transcription with audio file provided from a particular dataframe
def get_transcription_with_audio(df, audio_file):   
   for row in df.FULL_FILENAME:
       if get_aug_file(row) == get_aug_file(audio_file):
          transcription = df[df['FULL_FILENAME'] == row].TRANSCRIPTION.iloc[0]
          return transcription
        

In [None]:
#create a dataframe for augmented files with the help of sample files used for augmentation
def create_dataframe(dataframe_name, folder_path):
    new_df = {'FULL_FILENAME': [], 'TRANSCRIPTION': []}

    #
    fileNames = get_fileNames(folder_path)
    for file in tqdm(fileNames):
    
        # 
        new_df['FULL_FILENAME'].append(file)
        transcription = get_transcription_with_audio(dataframe_name, file)
        new_df['TRANSCRIPTION'].append(transcription)
        
    augmented_df = pd.DataFrame(new_df)
    return augmented_df

In [None]:
# creates dataframe containg augmented files and initial audio files of 6750
aug_df = create_dataframe(df_final_sample, dir+'working/OUT')

In [None]:
# provides information on the new created dataframe
aug_df.info()

In [None]:
# check for currupted file and remove from both dataframe and folder
def remove_currupted_audio(folder_path, data_frame):
    fileList = get_fileNames(folder_path)
    cur = 0
    not_cur = 0
    
    for file in tqdm(fileList):
        try:
            y, sr = lib.load(folder_path+file) #removed duration value of duration=3s
            # print(file)
            not_cur = not_cur+1
            
        except EOFError as e:
           #remove from folder
           os.remove(folder_path+file)

           #get row index
           index = data_frame[(data_frame.FULL_FILENAME == file)].index
           #remove from dataframe
           data_frame = data_frame.drop(index)
            
           cur = cur+1
           # print(e.with_traceback)
    return data_frame

    print(cur)
    print(not_cur)
        

In [None]:
# Noticed a couples of files upon augmentation ended up currupted hence created a function to detect them and get rid of them
df_aug_updated = remove_currupted_audio(dir+'working/OUT/', aug_df)

In [None]:
# Updated Dataframe state after removing all currupted files
df_aug_updated.info()

In [None]:
# Initial dataframe state prior to augmentation
aug_df.info()

In [None]:
#sort index after removal of currupted files
df_aug_updated.sort_index(inplace=True)

In [None]:
#compare augmented dataframe with its updated version to get rid of currupted files, this is in order to obtain the files
#that were lost in the process as they led to an un-even distribution in the class distribution in split of dataset for training and testing:
#Error snapshot during  StratifiedShuffleSplit: 'The least populated class in y has only 1 member, which is too few. The minimum number of groups for any class cannot be less than 2.'
df_diff = pd.merge(aug_df, df_aug_updated, how='outer', suffixes=('','_y'), indicator=True)
rows_in_df1_not_in_df2 = df_diff[df_diff['_merge']=='left_only'][aug_df.columns]

#get length of deleted currupted files
len(rows_in_df1_not_in_df2)

currupted_files_df = rows_in_df1_not_in_df2
currupted_files_df

In [None]:
currupted_files = currupted_files_df['FULL_FILENAME'].tolist() # Get all the list of currupted files
currupted_files #print list

In [None]:
os.mkdir(dir+'working/TEMP/') #creates a TEMP folder where the initial version of audio files corresponding to the currupted augmented file are stored for another appropriate form of augmentation to make dataset distribution tally or have at least more than two occuring version of a transcription
source_folder = dir+"working/Data2/" 
destination_folder = dir+"working/TEMP/"

# fetch all files from source folder corresponding to deleted currupted files into the TEMP directory in the working directory
for file in tqdm(currupted_files):
    # 
    file_original_name = get_aug_file(file)
    
    if os.path.exists(source_folder+file_original_name):
        #
        source = source_folder + file_original_name
        destination = destination_folder

        # copy only files
        if os.path.isfile(source):
            shutil.copy(source, destination)
            print('copied', file)

    else:
        print('File doesnt exist')

In [None]:
# Audio file aygmented through audio signal speed change or through time strectching, it is going to be the form of augmnetation going to be administered on the files which upon pitch change became currupted
def changing_speed(speed_rate, src_path, dst_path):
    files = get_fileNames(src_path)
    if not os.path.exists(dst_path):
      os.makedirs(dst_path)
    for file in tqdm(files):
      filename = os.path.basename(file).replace(directory, "")
      y, sr = lib.load(src_path+"/"+file)
      updated_y = lib.effects.time_stretch(y, rate=speed_rate)
      soundfile.write(dst_path + '/' + filename.split('.mp3')[0] + '_' + str(speed_rate) + ".mp3", updated_y, sr)

In [None]:
# administers sound augmentation by a speed rate of 0.8
def sound_aug_without_copy(src_path, dst_path):
    speed_rates = [0.8]
    for speed_rate in speed_rates:
        changing_speed(speed_rate, src_path, dst_path)

In [None]:
os.mkdir(dir+'working/OUT_TEMP') #creates a folder OUT_TEMP where all newly augmented files in the TEMP folder are kept
for file in currupted_files:
    #original file prior to augmentation, since increasing pitch by 2 
    #currupted audio,this time pitch reduction is going to be administed on the files
    #pitch change to this files keep failing so an alternate augmentation procedure was administer, i.e. speed change
    original_file = get_aug_file(file)

    sound_aug_without_copy(dir+'working/TEMP', dir+'working/OUT_TEMP')
    

In [None]:
# check for currupted file after re-augmentation to ensure that re-augmentation process was successful
def check_currupted_audio(folder_path):
    fileList = get_fileNames(folder_path)
    cur = 0
    not_cur = 0
    
    for file in tqdm(fileList):
        try:
            y, sr = lib.load(folder_path+file) #removed duration value of duration=3s
            not_cur = not_cur+1
            
        except EOFError as e:
           cur = cur+1

    print('Currupted Files: '+(str(cur)))
    print('Uncurrupted Files: ' + str(not_cur))

In [None]:
# calls check_currupted_audio file function
check_currupted_audio(dir+'working/OUT_TEMP/')

In [None]:
#create a new dataframe for re-augmented currupted files
re_aug_df = create_dataframe(currupted_files_df, dir+'working/OUT_TEMP')

In [None]:
re_aug_df.info()

In [None]:
#add re-augmented dataframe to the updated dataframe which was rid of the currupted files(df_aug_updated)
aug_frames = [df_aug_updated, re_aug_df]
final_aug_df = pd.concat(aug_frames, ignore_index=True)


In [None]:
# Display re-augmnted dataframe information to verify or process taken to restore it to its initial size of 13,500
final_aug_df.info()
display(final_aug_df)

In [None]:
#copy re-augmented currupted files into OUT folder

source_folder = dir+"working/OUT_TEMP/"
destination_folder = dir+"working/OUT/"

out_temp_files = get_fileNames(source_folder)

# fetch all files
for file in tqdm(out_temp_files):
    
    if os.path.exists(source_folder+file):
        #
        source = source_folder + file
        destination = destination_folder

        # copy only files
        if os.path.isfile(source):
            shutil.copy(source, destination)
            print('copied', file)

    else:
        print('File doesnt exist')

In [None]:
#splits final_augmented dataframe to its corresponding training and testing feature and label, 75% training data and 25% testing dataset
X_train, X_test, y_train, y_test = train_test_split(final_aug_df['FULL_FILENAME'],final_aug_df['TRANSCRIPTION'], test_size=0.25, random_state=42)

In [None]:
#Display results from training and test split
X_train, X_test, y_train, y_test

In [None]:
# Mechanisim to detect tokens in all five languages after several lang detect attempt match predicted langauges 
def tokenize_text(text): #can add language as parameter in future for languge specific token
    # Define regex patterns for tokenization
    patterns = [
        r'\b[a-zA-Z]+\b',  # Match English words
        r'\b[^\W\d_]+\b',  # Match any of the other five languages words (non-numeric and non-special characters)
        r'\b\d+\b'         # Match numbers
    ]

    # Combine regex patterns into a single pattern
    combined_pattern = '|'.join(patterns)

    # Tokenize text using regex
    tokens = re.findall(combined_pattern, text)

    return tokens

In [None]:
#preprocess the transcription or label
def preprocess_transcription(transcription):
    # Identify language or detect the language 
    try:
        language = detect(transcription)
    except:
        # If language detection fails, use fallback language
        language = 'unknown'
    
    # Tokenization using default tokenization function for wrongly detected languages and languagues not existing as part of langdetect classes
    tokens = tokenize_text(transcription)

    # Lowercasing, this is because speech isn't case sensitive, i.e. you can tell if part of the speech is capital or lower, so its better they
    # are kept as lower cases
    tokens = [token.lower() for token in tokens]

    # Removing punctuation helps ensure consistency in the text or sentences by elimination nuances due to difference puuctuation styles
    tokens = [re.sub(r'[^\w\s]', '', token) for token in tokens]


    return tokens 

In [None]:
# returns training data feature by obtaining melspectrogram from audio file and also pre-processing transcription
def load_data(audio_paths, transcriptions):
    spectrograms = []
    processed_transcriptions = []

    for audio_path, transcription in tqdm(zip(audio_paths, transcriptions)):
        # Load audio file and compute spectrogram
        y, sr = lib.load(dir+'working/OUT/'+audio_path, sr=None)
        spectrogram = lib.feature.melspectrogram(y=y, sr=sr)
        spectrogram = lib.power_to_db(spectrogram, ref=np.max)
        spectrograms.append(spectrogram)

        # Preprocess transcription
        processed_transcription = preprocess_transcription(transcription)
        processed_transcriptions.append(processed_transcription)

    # Convert lists to NumPy arrays if needed
    spectrogram_array = spectrogram
    transcription_array = processed_transcriptions
    
    

    return spectrogram_array, transcription_array

In [None]:
# Load data and preprocess transcriptions
audio_paths = X_train  # List of audio file paths
transcriptions = y_train  # List of transcriptions
spectrograms, processed_transcriptions = load_data(audio_paths, transcriptions)

In [None]:
# Preprocessed transcription
processed_transcriptions

In [None]:
#melspectrogram extraction of audio file
spectrograms

In [None]:
# Load data and preprocess transcriptions
audio_paths = X_test  # List of audio file paths
transcriptions = y_test  # List of transcriptions
spectrograms_test, processed_transcriptions_test = load_data(audio_paths, transcriptions)

In [None]:
# melspectrogram of audio file and preprocessed version of transcription
spectrograms_test, processed_transcriptions_test

In [None]:
# computes max length of preprocessed transcription
max_length = max(len(inner_list) for inner_list in processed_transcriptions)
max_length

In [None]:
# min length of preprocessed transcription
min_length = min(len(inner_list) for inner_list in processed_transcriptions)
min_length

the difference between the min and max length of the preprocessed transcription is so wide, it would have been better employing a check om audio signal initially during pre-processing stage of data to ensure audio length is capped within a given range

In [None]:
# converts train features to type numpy
spectrograms_to_numpy = np.asarray(spectrograms, dtype="object")
x_train = spectrograms_to_numpy

In [None]:
#converts transcription or label to type numpy
processed_transcriptions_to_numpy = np.asarray(processed_transcriptions, dtype="object")
Y_train = processed_transcriptions_to_numpy
Y_train

In [None]:
# Define CNN model, sets input shape to width and height due to the limiting size of the resulting numpy array
def create_model(n_width,n_height,n_dropout,n_classes):
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(n_width,n_height)),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

In [None]:
# Define model parameters
input_shape = x_train.shape
num_classes = len(set(y_train))  # Number of unique tokens in transcriptions

In [None]:
#input shape of features is two dimentional
input_shape

In [None]:
#number of classes resulting of len of unique transcription is 6,321 which is a lot, this number made it difficult to us StratifiedShuffleSplit 
# as a means of spliting as number of classes exceeded test data set if split between train and test data is capped below 50% for each
classes = np.unique(num_classes)
classes

In [None]:
#creates CNN model
def cnn_model(x_train):
    return create_model(x_train.shape[0], x_train.shape[1], 0.5, num_classes)

In [None]:
# attemps to create CNN model fails due to the inhomogenous shape of the x_train dataset. Increasing the dataset through additional augmentation would have fixed it to some extend as
# as capping audio duration to say 3mins, further augmentation was going to be hard because of the limited resources of the machine used(Memory and CPU)
cnn_model(x_train) #this cell failed or raised an error

In [None]:
# Did not get the opportunity to execute this cell as a result of the preceeding cell failing.
# Compile model
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# train model
model.fit(padded_list_to_numpy, padded_list_to_numpy, epochs=10, validation_split=0.2)

# save cnn model
model.save(dir+'trained_model.h5')

# Evaluate the  model
test_loss, test_acc = model.evaluate(X_test, y_test)
print('Test accuracy:', test_acc)

In [None]:
# Did not get the opportunity to execute this cell as a result of the preceeding cell failing.
# calculate Character error rate for the model trained against the preccited transcription over the actual transcription
def calculate_cer(predicted_transcription, ground_truth_transcription):
    # Remove whitespace and punctuation, and convert to lowercase if necessary
    predicted_transcription = predicted_transcription.strip().lower()
    ground_truth_transcription = ground_truth_transcription.strip().lower()

    # Calculate Character error rate
    cer = 0
    total_characters = max(len(predicted_transcription), len(ground_truth_transcription))
    for p_char, g_char in zip(predicted_transcription, ground_truth_transcription):
        if p_char != g_char:
            cer += 1

    cer /= total_characters  # Normalize by total number of characters
    return cer

In [None]:
# Did not get the opportunity to execute this cell as a result of the preceeding cell failing.

#loads the trained model if context is lost 
loaded_model = load_model('trained_model.h5')

# Perform inference
y_pred = loaded_model.predict(X_test)

# Evaluate predictions by calculating accuracy
accuracy = accuracy_score(y_test, y_pred)

# Gives the evaluation result
print("Accuracy:", accuracy)

In [None]:
# Did not get the opportunity to execute this cell as a result of the preceeding cell failing.
#calculate perfomance on test dataset
calculate_cer(y_pred, y_test)