<a href="https://colab.research.google.com/github/karankumar211/Native-Language-Identification-Project/blob/main/Task3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# CELL 1: Install Condacolab
print("Installing Condacolab...")
!pip install condacolab
import condacolab
condacolab.install()

In [None]:
# CELL 2: Install Conda Packages
import condacolab
condacolab.check()
print("Conda is active.")

print("Installing MFA using Conda (this may take a few minutes)...")
!conda install -c conda-forge montreal-forced-aligner -y
print("MFA installed successfully.")

In [None]:
# CELL 3: Install Pip Packages
import sys
python_executable_path = sys.executable
print(f"Using Python executable at: {python_executable_path}")

print("Installing all Python libraries (transformers, librosa, etc.) using conda's pip...")
!{python_executable_path} -m pip install transformers torch torchaudio librosa soundfile huggingface_hub requests beautifulsoup4 tqdm scikit-learn
print("All Python libraries installed successfully.")

In [None]:
# CELL 4: Verify Install & Set Backend
import datasets
import os
import warnings

warnings.filterwarnings('ignore', category=UserWarning)

# 1. Verify MFA
print("\nVerifying MFA installation (this should show the help menu)...")
!mfa model download --help

# 2. Set Audio Backend (Fixes the torchcodec bug)
datasets.config.AUDIO_DECODER = "torchaudio"
print(f"\nAudio decoder set to: {datasets.config.AUDIO_DECODER}")

In [None]:
# CELL 5: Download & Unzip Audio Data
from huggingface_hub import hf_hub_download
import zipfile
import os
print("Downloading dataset 'DarshanaS/IndicAccentDb' (3.2 GB)...")
zip_path = hf_hub_download(repo_id="DarshanaS/IndicAccentDb", filename="IndicAccentDB.zip", repo_type="dataset")
extract_dir = "data"
os.makedirs(extract_dir, exist_ok=True)
print("Extracting to 'data' folder...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)
print("Extraction complete.")

In [None]:
# CELL 6: Parse Transcripts
import glob
import os
import re
from bs4 import BeautifulSoup
import requests
from tqdm.auto import tqdm

print("Parsing harvard_sentences.html...")
url = "https://www.cs.columbia.edu/~hgs/audio/harvard.html"
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
text = soup.get_text()
lines = text.split('\n')
all_harvard_sentences = []
for line in lines:
    line = line.strip()
    if not line or line.startswith("List "): continue
    sentence = line.upper().replace('.', '').replace(',', '').replace('?', '')
    all_harvard_sentences.append(sentence)
print(f"Successfully parsed {len(all_harvard_sentences)} sentences.")

print("\nCreating transcript map...")
all_file_paths = glob.glob("data/**/*.wav", recursive=True)
transcript_map = {}
list_pattern = re.compile(r'_List(\d+)_(\d+)\.wav', re.IGNORECASE)
num_pattern = re.compile(r'\((\d+)\)\.wav', re.IGNORECASE)
for audio_path in all_file_paths:
    filename = os.path.basename(audio_path)
    sentence_index = -1
    list_match = list_pattern.search(filename)
    if list_match:
        list_num = int(list_match.group(1)); sent_num = int(list_match.group(2))
        sentence_index = (list_num - 1) * 10 + (sent_num - 1)
    num_match = num_pattern.search(filename)
    if num_match:
        sent_num = int(num_match.group(1)); sentence_index = sent_num - 1
    if 0 <= sentence_index < len(all_harvard_sentences):
        transcript_map[audio_path] = all_harvard_sentences[sentence_index]
clean_file_list = list(transcript_map.keys())
print(f"Found {len(clean_file_list)} clean files for alignment.")

In [None]:
# CELL 7: Create 'data_clean' Corpus
import shutil
from tqdm.auto import tqdm

print("Creating a 'data_clean' corpus for alignment...")
clean_dir = "data_clean"
os.makedirs(clean_dir, exist_ok=True)
files_written = 0
for audio_path in tqdm(clean_file_list, desc="Copying clean files"):
    try:
        sentence_text = transcript_map[audio_path]
        base_filename = os.path.basename(audio_path)
        lab_filename = base_filename.replace('.wav', '.lab')
        new_wav_path = os.path.join(clean_dir, base_filename)
        new_lab_path = os.path.join(clean_dir, lab_filename)
        shutil.copy(audio_path, new_wav_path)
        with open(new_lab_path, 'w') as f:
            f.write(sentence_text)
        files_written += 1
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")
print(f"\nClean corpus created. Copied {files_written} .wav/.lab pairs.")

In [None]:
# CELL 8: Download MFA Models
print("Downloading MFA models (Dictionary and Acoustic)...")
!mfa model download dictionary english_us_arpa
!mfa model download acoustic english_mfa
print("MFA models downloaded successfully.")

In [None]:
# CELL 9: Run Forced Alignment
print("Starting the Montreal Forced Aligner (MFA)...")
print("This will process all 3,207 clean files.")
!mfa align data_clean english_us_arpa english_mfa data_aligned
print("\n--- MFA Alignment Complete! ---")

In [None]:
# CELL 10: Load Task 1 Results from Google Drive
import numpy as np
import json
from google.colab import drive
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

print("Mounting Google Drive to load Task 1 results...")
drive.mount('/content/drive')
save_path = '/content/drive/MyDrive/Colab_Project_Data/'
print("Loading processed Task 1 data from Google Drive...")

try:
    X_hubert_sentence = np.load(os.path.join(save_path, 'X_hubert.npy'))
    X_mfcc_sentence = np.load(os.path.join(save_path, 'X_mfcc.npy'))
    y_labels_sentence = np.load(os.path.join(save_path, 'y_labels.npy'))
    with open(os.path.join(save_path, 'label_to_int.json'), 'r') as f:
        label_to_int = json.load(f)
    int_to_label = {int(i): name for name, i in label_to_int.items()}
    print("--- Task 1 Data Loaded Successfully ---")

    print("\nRe-calculating Task 1 (Sentence) accuracies...")
    X_mfcc_train_s, X_mfcc_test_s, y_train_s, y_test_s = train_test_split(
        X_mfcc_sentence, y_labels_sentence, test_size=0.2, random_state=42, stratify=y_labels_sentence
    )
    X_hubert_train_s, X_hubert_test_s, _, _ = train_test_split(
        X_hubert_sentence, y_labels_sentence, test_size=0.2, random_state=42, stratify=y_labels_sentence
    )
    mfcc_scaler_s = StandardScaler().fit(X_mfcc_train_s)
    mfcc_svm_s = SVC(random_state=42).fit(mfcc_scaler_s.transform(X_mfcc_train_s), y_train_s)
    acc_mfcc = accuracy_score(y_test_s, mfcc_svm_s.predict(mfcc_scaler_s.transform(X_mfcc_test_s)))
    hubert_scaler_s = StandardScaler().fit(X_hubert_train_s)
    hubert_svm_s = SVC(random_state=42).fit(hubert_scaler_s.transform(X_hubert_train_s), y_train_s)
    acc_hubert = accuracy_score(y_test_s, hubert_svm_s.predict(hubert_scaler_s.transform(X_hubert_test_s)))
    print(f"Task 1 Accuracies restored: MFCC={acc_mfcc*100:.2f}%, HuBERT={acc_hubert*100:.2f}%")
except Exception as e:
    print(f"--- ERROR loading from Drive: {e} ---")
    raise

In [None]:
# CELL 11: Load HuBERT Model
import torch
from transformers import AutoFeatureExtractor, HubertModel
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")
target_sr = 16000
ssl_ckpt = 'facebook/hubert-base-ls960'
print(f"Loading model: {ssl_ckpt} onto device...")
feature_extractor = AutoFeatureExtractor.from_pretrained(ssl_ckpt)
hubert_model = HubertModel.from_pretrained(ssl_ckpt).to(device)
print("Model loaded successfully.")

In [None]:
# CELL 12: Manual TextGrid Parser
import re

def parse_textgrid(file_path):
    """
    A simple parser for .TextGrid files to extract word intervals.
    Returns a list of (start_time, end_time, word) tuples.
    """
    with open(file_path, 'r') as f:
        content = f.read()

    intervals = []
    # Find all interval blocks
    # We look for "xmin = ...", "xmax = ...", "text = ..." patterns
    for match in re.finditer(
        r"intervals\s*\[\d+\]:\s*"
        r"xmin\s*=\s*([\d\.]+)\s*"
        r"xmax\s*=\s*([\d\.]+)\s*"
        r"text\s*=\s*\"(.*?)\"\s*",
        content,
        re.DOTALL
    ):
        start_time = float(match.group(1))
        end_time = float(match.group(2))
        word = match.group(3).strip()

        # Skip silence (which MFA often marks as "")
        if word:
            intervals.append((start_time, end_time, word))

    return intervals

print("TextGrid parser function defined.")
# Test it on one file
example_grid = glob.glob("data_aligned/*.TextGrid")[0]
print(f"Parsing example file: {example_grid}")
example_words = parse_textgrid(example_grid)
print(f"Found words: {example_words[:5]}...")

In [None]:
# CELL 13: Run Word-Level Feature Extraction
import glob
import os
import numpy as np
import librosa
from tqdm.auto import tqdm
import torch
import warnings
warnings.filterwarnings('ignore', category=UserWarning)

print("Starting WORD-LEVEL feature extraction...")
textgrid_files = glob.glob("data_aligned/*.TextGrid")
print(f"Found {len(textgrid_files)} .TextGrid files to process.")

hubert_word_features = []
mfcc_word_features = []
word_labels = []

for grid_path in tqdm(textgrid_files, desc="Processing words"):
    try:
        base_name = os.path.basename(grid_path).replace('.TextGrid', '.wav')
        wav_path = os.path.join("data_clean", base_name)
        original_path = glob.glob(f"data/**/{base_name}", recursive=True)[0]
        label_string = os.path.basename(os.path.dirname(original_path))
        label_int = label_to_int[label_string]

        waveform_raw, sr_raw = librosa.load(wav_path, sr=None)

        # --- Use our new parser ---
        word_segments = parse_textgrid(grid_path)
        if not word_segments: continue

        for (start_time, end_time, word) in word_segments:
            if (end_time - start_time) < 0.1: continue # Skip very short words

            start_sample = int(start_time * sr_raw); end_sample = int(end_time * sr_raw)
            word_audio_raw = waveform_raw[start_sample:end_sample]

            # MFCC
            mfccs = librosa.feature.mfcc(y=word_audio_raw, sr=sr_raw, n_mfcc=20)
            mfcc_word_features.append(np.mean(mfccs, axis=1))

            # HuBERT
            word_audio_16k = librosa.resample(word_audio_raw, orig_sr=sr_raw, target_sr=target_sr)
            inputs = feature_extractor(word_audio_16k, sampling_rate=target_sr, return_tensors='pt').to(device)
            with torch.no_grad():
                outputs = hubert_model(inputs.input_values)
            hubert_word_features.append(outputs.last_hidden_state.mean(dim=1).squeeze(0).cpu().numpy())

            word_labels.append(label_int)
    except Exception as e:
        print(f"Error processing file {grid_path}: {e}")

X_hubert_words = np.array(hubert_word_features)
X_mfcc_words = np.array(mfcc_word_features)
y_labels_words = np.array(word_labels)

print(f"\nWord-level feature extraction complete!")
print(f"HuBERT word feature matrix shape: {X_hubert_words.shape}")
print(f"MFCC word feature matrix shape: {X_mfcc_words.shape}")
print(f"Word labels array shape: {y_labels_words.shape}")

In [None]:
# CELL 14: Run Word-Level Train/Test/Compare
from sklearn.metrics import classification_report

print(f"--- Task 3: Word-Level vs. Sentence-Level ---")

X_mfcc_train_w, X_mfcc_test_w, y_train_w, y_test_w = train_test_split(
    X_mfcc_words, y_labels_words, test_size=0.2, random_state=42, stratify=y_labels_words
)
X_hubert_train_w, X_hubert_test_w, _, _ = train_test_split(
    X_hubert_words, y_labels_words, test_size=0.2, random_state=42, stratify=y_labels_words
)
print(f"\nTotal sentence samples: {len(y_labels_sentence)}")
print(f"Total word samples: {len(y_labels_words)}")

mfcc_scaler_w = StandardScaler().fit(X_mfcc_train_w)
hubert_scaler_w = StandardScaler().fit(X_hubert_train_w)

print("Training WORD-LEVEL MFCC Model (SVM)...")
mfcc_svm_w = SVC(random_state=42).fit(mfcc_scaler_w.transform(X_mfcc_train_w), y_train_w)
y_pred_mfcc_w = mfcc_svm_w.predict(mfcc_scaler_w.transform(X_mfcc_test_w))
acc_mfcc_w = accuracy_score(y_test_w, y_pred_mfcc_w)

print("Training WORD-LEVEL HuBERT Model (SVM)...")
hubert_svm_w = SVC(random_state=42).fit(hubert_scaler_w.transform(X_hubert_train_w), y_train_w)
y_pred_hubert_w = hubert_svm_w.predict(hubert_scaler_w.transform(X_hubert_test_w))
acc_hubert_w = accuracy_score(y_test_w, y_pred_hubert_w)

print("\n\n--- FINAL COMPARISON (Task 3) ---")
print("                          | MFCCs      | HuBERT")
print(f"Sentence-Level Accuracy:    | {acc_mfcc * 100:.2f}%     | {acc_hubert * 100:.2f}%")
print(f"Word-Level Accuracy:        | {acc_mfcc_w * 100:.2f}%     | {acc_hubert_w * 100:.2f}%")