In [None]:

# Cell: Import Libraries
import os
import random
import requests
import tarfile
from tqdm import tqdm
import pandas as pd
import numpy as np
import librosa
import matplotlib.pyplot as plt
import soundfile as sf
from glob import glob
import python_speech_features as psf
from scipy.io.wavfile import read
from IPython.display import Audio
import json
import g2p_en
import nltk
from collections import defaultdict
import pickle
import shutil
from hmmlearn import hmm
from sklearn.metrics import accuracy_score

In [None]:
# Specify the download directory
download_dir = "C:/Users/abhin/nltk_data"

# Download both resources to the specified directory
nltk.download('punkt', download_dir=download_dir)
nltk.download('averaged_perceptron_tagger_eng', download_dir=download_dir)

In [None]:
# URL for the dataset
url = "http://www.openslr.org/resources/12/train-clean-100.tar.gz" 

# Destination path for downloading
download_path = "train-clean-100.tar.gz"

# Function to download a file with progress bar
def download_file(url, destination_path):
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        total_size = int(r.headers.get('content-length', 0))
        with open(destination_path, 'wb') as f:
            for chunk in tqdm(r.iter_content(chunk_size=1024), total=total_size // 1024, unit='KB', desc="Downloading"):
                if chunk:
                    f.write(chunk)

# # Download the file
# download_file(url, download_path)

In [None]:
# Function to extract tar.gz file
def extract_file(tar_path, dest_dir):
    try:
        with tarfile.open(tar_path, 'r:gz') as tar:
            tar.extractall(path=dest_dir)
        print(f"Dataset extracted to {dest_dir}")
    except Exception as e:
        print(f"Error during extraction: {e}")

# Download the file
if not os.path.exists(download_path):
    download_file(url, download_path)

# Define the destination extraction folder in the current working directory
extracted_dir = os.path.join(os.getcwd(), "audio_files")

# Extract the tar.gz file if not already extracted
if os.path.exists(download_path) and not os.path.exists(extracted_dir):
    extract_file(download_path, extracted_dir)
else:
    if os.path.exists(extracted_dir):
        print(f"Dataset already extracted at {extracted_dir}")
    else:
        print(f"Download path does not exist: {download_path}")
        

In [None]:
# Cell: Locate all .flac files
flac_files = glob(os.path.join(extracted_dir, "**", "*.flac"), recursive=True)
print(f"Found {len(flac_files)} .flac files")
print("Example file:", flac_files[0])

In [None]:
# Path settings
source_root = "audio_files/LibriSpeech/train-clean-100"
wav_root = "train-clean-100-wav"

# Find all .flac files under the source directory
flac_files = []
for root, _, files in os.walk(source_root):
    for file in files:
        if file.endswith(".flac"):
            flac_files.append(os.path.join(root, file))

# Convert and save .wav files under simplified structure
def convert_flac_to_wav(flac_files, source_root, wav_root):
    for flac_path in tqdm(flac_files, desc="Converting to WAV"):
        # Get relative path from source_root (e.g., 19/198/19-198-0000.flac)
        relative_path = os.path.relpath(flac_path, source_root)
        wav_path = os.path.join(wav_root, os.path.splitext(relative_path)[0] + ".wav")
        
        # Create directory if needed
        os.makedirs(os.path.dirname(wav_path), exist_ok=True)
        
        # Load and write audio
        audio, sr = sf.read(flac_path)
        sf.write(wav_path, audio, sr)
        print(f"Saved: {wav_path}")

# convert_flac_to_wav(flac_files, source_root, wav_root)


In [None]:
# Function to copy transcripts to the new structure
def copy_transcripts(source_root, wav_root):
    for root, _, files in tqdm(os.walk(source_root), desc="Copying Transcripts"):
        for file in files:
            if file.endswith(".trans.txt"):
                # Full path to the transcript file
                transcript_file = os.path.join(root, file)
                
                # Get the relative path from source_root (e.g., 19/198/19-198.trans.txt)
                relative_path = os.path.relpath(root, source_root)
                
                # Create the corresponding directory in the new location
                dest_dir = os.path.join(wav_root, relative_path)
                os.makedirs(dest_dir, exist_ok=True)
                
                # Construct the destination path for the transcript file
                transcript_dest = os.path.join(dest_dir, file)
                
                # Copy the transcript file
                shutil.copy(transcript_file, transcript_dest)
                print(f"Copied: {transcript_dest}")

# Call the function to copy transcripts
# copy_transcripts(source_root, wav_root)


In [None]:
import glob
wav_files = glob.glob("train-clean-100-wav/**/*.wav", recursive=True)
print(f"Found {len(wav_files)} .wav files")


In [None]:
# Cell: Extract MFCC features

def extract_mfcc(wav_path, num_mfcc=13):
    sr, signal = read(wav_path)  # Correct order: sr, signal from scipy read
    mfcc_feat = psf.mfcc(signal, sr, numcep=num_mfcc)
    return mfcc_feat

# Example on one file
sample_mfcc = extract_mfcc(wav_files[0])
print("MFCC shape:", sample_mfcc.shape)

In [None]:
# Cell: Visualize waveform and MFCCs
def visualize_waveform_and_mfcc(wav_path):
    signal, sr = librosa.load(wav_path, sr=None)
    mfcc = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=13)
    
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(signal)
    plt.title("Waveform")
    
    plt.subplot(1, 2, 2)
    librosa.display.specshow(mfcc.T, sr=sr, x_axis='time')
    plt.colorbar()
    plt.title("MFCC")
    
    plt.tight_layout()
    plt.show()

# Test on one file
visualize_waveform_and_mfcc(wav_files[0])


In [None]:

# Limit to ~1 hour of data (assuming ~6 minutes per file on average)
target_duration = 60 * 60  # 1 hour in seconds
selected_files = []

current_duration = 0
i = 0
while current_duration < target_duration:
    file = wav_files[i]  # Randomly select a file
    signal, sr = librosa.load(file, sr=None)  # Load the file
    current_duration += len(signal) / sr
    selected_files.append(file)
    i+=1

# Save the selected files list
with open("selected_files.json", "w") as f:
    json.dump(selected_files, f)

print(f"Selected {len(selected_files)} files, total duration: {current_duration / 60:.2f} minutes")

In [None]:
# Load the selected files
with open("selected_files.json", "r") as f:
    selected_files = json.load(f)

# Extract MFCCs
mfcc_data = []  # list of dicts: {"path": ..., "mfcc": ...}

for wav_path in tqdm(selected_files, desc="Extracting MFCCs"):
    try:
        # Load the signal using soundfile (sf)
        signal, sr = sf.read(wav_path)
        
        # Check if signal is a numpy array (1D for mono audio)
        if not isinstance(signal, np.ndarray) or len(signal.shape) != 1:
            print(f"Warning: {wav_path} has an invalid signal format")
            continue
        
        # Compute MFCC features using python_speech_features
        mfcc_feat = psf.mfcc(signal, sr, numcep=13)

        # Check if mfcc_feat is a 2D numpy array (valid MFCC features)
        if not isinstance(mfcc_feat, np.ndarray) or len(mfcc_feat.shape) != 2:
            print(f"Warning: {wav_path} returned invalid MFCC features")
            continue
        
        # Append the MFCC data to the list
        mfcc_data.append({
            "path": wav_path,
            "mfcc": mfcc_feat.tolist()
        })
        
    except Exception as e:
        print(f"Error processing {wav_path}: {e}")

# Save MFCC data to a file
with open("mfcc_features_subset.json", "w") as f:
    json.dump(mfcc_data, f)

print("Saved MFCC features to mfcc_features_subset.json")


In [None]:
from g2p_en import G2p

g2p = G2p()

# Example word to phoneme conversion
word = "hello"
phonemes = g2p(word)
print(f"Word: {word} -> Phonemes: {phonemes}")


In [None]:
import re

def clean_transcript(transcript):
    # Lowercase and remove unwanted characters (punctuation, special symbols)
    transcript = transcript.lower()  # Make lowercase
    transcript = re.sub(r'[^a-z\s]', '', transcript)  # Remove non-alphabetical characters
    return transcript

# Example
transcript = "Hello, world! How are you?"
cleaned_transcript = clean_transcript(transcript)
print(f"Cleaned Transcript: {cleaned_transcript}")

In [None]:
import os

# Function to get the transcript for a given wav file
def get_transcript_for_wav(wav_path):
    # Extract the speaker and chapter info from the path
    speaker_id = wav_path.split("\\")[1]  # Example: '19'
    chapter_id = wav_path.split("\\")[2]  # Example: '198'
    audio_id = wav_path.split("\\")[-1].replace(".wav", "")  # Example: '19-198-0001'

    # Construct the transcript filename based on speaker_id and chapter_id
    transcript_file = os.path.join("train-clean-100-wav", speaker_id, chapter_id, f"{speaker_id}-{chapter_id}.trans.txt")

    # Read the transcript file and find the corresponding line for the audio file
    try:
        with open(transcript_file, "r") as f:
            lines = f.readlines()
        
        # Find the line corresponding to the audio_id (audio file name)
        for line in lines:
            if audio_id in line:
                # Split the line to extract the actual transcript (skip the first column)
                transcript = line.split(" ", 1)[1].strip()
                return transcript
    except FileNotFoundError:
        print(f"Transcript file {transcript_file} not found.")
        return None  # If the transcript file is not found, return None

    return None  # Return None if no matching transcript is found


In [None]:
# Initialize G2p model
g2p = G2p()

# Step 1: Clean the transcript by removing unnecessary characters and normalizing the text
def clean_transcript(transcript):
    # Convert to lowercase and remove punctuation (except spaces)
    transcript = transcript.lower()
    transcript = re.sub(r"[^a-zA-Z\s]", "", transcript)
    return transcript

# Step 2: Convert a cleaned transcript to phonemes
def convert_to_phonemes(cleaned_transcript):
    words = cleaned_transcript.split()
    phonemes = []

    for word in words:
        phonemes.extend(g2p(word))  # Get phoneme representation for each word

    return phonemes

In [None]:
# Load the MFCC data
with open("mfcc_features_subset.json", "r") as f:
    mfcc_data = json.load(f)

# Create a list to hold MFCC data with phoneme mappings
mfcc_with_phonemes = []

# Map MFCC features to phonemes
for data in tqdm(mfcc_data, desc="Mapping MFCCs to Phonemes"):
    wav_path = data["path"]
    mfcc_feat = data["mfcc"]
    
    # Retrieve the corresponding transcript for the audio file
    transcript = get_transcript_for_wav(wav_path)
    if transcript is None:
        continue  # Skip if no transcript is found
    
    # Clean the transcript
    cleaned_transcript = clean_transcript(transcript)
    
    # Convert the cleaned transcript to phonemes
    phonemes = convert_to_phonemes(cleaned_transcript)
    
    # Append the MFCC features and the phoneme sequence
    mfcc_with_phonemes.append({
        "path": wav_path,
        "mfcc": mfcc_feat,
        "phonemes": phonemes
    })

# Save the new data (MFCC + phonemes)
with open("mfcc_with_phonemes.json", "w") as f:
    json.dump(mfcc_with_phonemes, f)

print("Mapped MFCCs to Phonemes and saved.")


In [None]:
# Create a phoneme to index mapping (dictionary)
phoneme_set = set()  # To store unique phonemes

# Collect all unique phonemes from the mapped data
for data in mfcc_with_phonemes:
    phonemes = data["phonemes"]
    phoneme_set.update(phonemes)

# Create the phoneme-to-index mapping
phoneme_to_index = {phoneme: idx for idx, phoneme in enumerate(sorted(phoneme_set))}

# Save the phoneme-to-index mapping for later use
with open("phoneme_to_index.json", "w") as f:
    json.dump(phoneme_to_index, f)

print("Phoneme-to-index mapping created and saved.")


In [None]:
# Convert phoneme sequences to their corresponding indices
for data in tqdm(mfcc_with_phonemes, desc="Converting Phonemes to Indices"):
    phonemes = data["phonemes"]
    
    # Convert phonemes to indices using the phoneme-to-index mapping
    phoneme_indices = []
    for phoneme in phonemes:
        if phoneme in phoneme_to_index:
            phoneme_indices.append(phoneme_to_index[phoneme])
        else:
            phoneme_indices.append(phoneme_to_index.get("<UNK>", -1))  # Use -1 or <UNK> for missing phonemes
    
    # Replace the phonemes with their corresponding indices
    data["phoneme_indices"] = phoneme_indices

# Save the new data (MFCC + phoneme indices)
with open("mfcc_with_phoneme_indices.json", "w") as f:
    json.dump(mfcc_with_phonemes, f)

print("Phonemes converted to indices and saved.")


In [None]:
# Prepare features (MFCCs) and labels (phoneme indices)
X = []  # Features: list of MFCC sequences (each a 2D array)
lengths = []  # Needed for hmmlearn to know sequence boundaries

# Check if `mfcc_with_phonemes` is populated
print(f"mfcc_with_phonemes contains {len(mfcc_with_phonemes)} elements")

# Iterate over each data point and collect MFCCs and lengths
for data in mfcc_with_phonemes:
    mfcc = np.array(data["mfcc"])
    X.append(mfcc)
    lengths.append(len(mfcc))

# Concatenate all MFCCs into a single 2D array
X_concat = np.concatenate(X, axis=0)

# Save the prepared features and sequence lengths
np.save("mfcc_features_concat.npy", X_concat)
np.save("mfcc_lengths.npy", lengths)

print("Features and sequence lengths prepared for HMM training and saved.")


In [None]:
# Load phoneme-to-index mapping and training data
with open("phoneme_to_index.json", "r") as f:
    phoneme_to_index = json.load(f)

index_to_phoneme = {v: k for k, v in phoneme_to_index.items()}

# Load MFCC data with phoneme indices
with open("mfcc_with_phoneme_indices.json", "r") as f:
    mfcc_data = json.load(f)

# Organize MFCC sequences per phoneme
phoneme_sequences = defaultdict(list)

for sample in mfcc_data:
    mfcc = np.array(sample["mfcc"])
    labels = sample["phoneme_indices"]

    if len(labels) == 0 or len(mfcc) < len(labels):
        continue

    # Split MFCCs equally among phonemes (approximate)
    chunk_size = len(mfcc) // len(labels)
    for i, phoneme in enumerate(labels):
        start = i * chunk_size
        end = start + chunk_size
        phoneme_sequences[phoneme].append(mfcc[start:end])

# Train one HMM per phoneme
phoneme_models = {}
n_components = 3  # Number of hidden states per phoneme model

for phoneme_idx, sequences in tqdm(phoneme_sequences.items(), desc="Training HMMs"):
    try:
        # Concatenate all MFCCs for the phoneme
        X = np.concatenate(sequences, axis=0)
        lengths = [len(seq) for seq in sequences]

        # Initialize and train the HMM
        model = hmm.GaussianHMM(n_components=n_components, covariance_type="diag", n_iter=100)
        model.fit(X, lengths)

        # Apply Laplace smoothing if necessary
        laplace_smoothing_constant = 1e-5
        zero_rows = np.all(model.transmat_ == 0, axis=1)
        if np.any(zero_rows):
            print(f"Applying Laplace smoothing to phoneme {index_to_phoneme[phoneme_idx]}")
            model.transmat_[zero_rows] += laplace_smoothing_constant
        
        # Normalize the transition matrix rows to ensure they sum to 1
        model.transmat_ /= model.transmat_.sum(axis=1, keepdims=True)

        phoneme_models[phoneme_idx] = model
    except Exception as e:
        print(f"Failed to train HMM for phoneme {index_to_phoneme[phoneme_idx]}: {e}")

# Save all phoneme models to a file
with open("hmm_phoneme_models.pkl", "wb") as f:
    pickle.dump(phoneme_models, f)

print("All phoneme HMMs trained, smoothed (if needed), and saved.")


In [None]:
# Load test data (MFCCs with phoneme indices)
with open("mfcc_with_phoneme_indices.json", "r") as f:
    test_data = json.load(f)

# Load trained phoneme models
with open("hmm_phoneme_models.pkl", "rb") as f:
    phoneme_models = pickle.load(f)

# Reverse mapping from index to phoneme
with open("phoneme_to_index.json", "r") as f:
    phoneme_to_index = json.load(f)


# Function to decode using HMMs with Viterbi algorithm
def decode_with_hmms_viterbi(mfcc_sequence, models):
    predicted = []
    for frame in mfcc_sequence:
        frame = frame.reshape(1, -1)  # Ensure correct shape for the model
        scores = {phoneme_idx: model.score(frame) for phoneme_idx, model in models.items()}
        best_phoneme = max(scores, key=scores.get)
        predicted.append(best_phoneme)
    return predicted

# Function to convert indices to phonemes
def indices_to_phonemes(indices):
    return [index_to_phoneme[i] for i in indices]


In [None]:
# Load test data (MFCCs with phoneme indices)
with open("mfcc_with_phoneme_indices.json", "r") as f:
    test_data = json.load(f)

# Collect predicted and actual phoneme sequences
predicted_phonemes = []
actual_phonemes = []

for i, data in enumerate(tqdm(test_data, desc="Testing Model")):
    mfcc = np.array(data["mfcc"])  # MFCC sequence
    actual_sequence = data["phoneme_indices"]  # Actual phoneme indices

    # Decode using Viterbi
    predicted_sequence = decode_with_hmms_viterbi(mfcc, phoneme_models)
    
    print(f"Actual:    {actual_sequence[:10]}")
    print(f"Predicted: {predicted_sequence[:10]}")

    # --- Per-sample accuracy logging ---
    min_len = min(len(actual_sequence), len(predicted_sequence))
    trimmed_actual = actual_sequence[:min_len]
    trimmed_predicted = predicted_sequence[:min_len]
    match_count = sum(a == b for a, b in zip(trimmed_actual, trimmed_predicted))
    sample_accuracy = match_count / min_len if min_len > 0 else 0
    print(f"Sample {i+1}: Accuracy = {sample_accuracy * 100:.2f}%")
    # -----------------------------------

    # Convert indices to phonemes
    predicted_phonemes.append(indices_to_phonemes(predicted_sequence))
    actual_phonemes.append(indices_to_phonemes(actual_sequence))

# Calculate overall accuracy
flat_predicted = [phoneme for sublist in predicted_phonemes for phoneme in sublist]
flat_actual = [phoneme for sublist in actual_phonemes for phoneme in sublist]

accuracy = accuracy_score(flat_actual, flat_predicted)
print(f"\nOverall Accuracy: {accuracy * 100:.2f}%")

# Show a few example predictions
for i in range(min(5, len(predicted_phonemes))):
    print(f"\nTest Sample {i + 1}:")
    print(f"Predicted: {predicted_phonemes[i]}")
    print(f"Actual:    {actual_phonemes[i]}")
    print("-" * 50)
