In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
!rm -rf /content/drive

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
!pip install librosa tqdm




In [None]:
import os
import torch
import librosa
import numpy as np

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm


In [None]:
SAMPLE_RATE = 16000
DURATION = 3
NUM_SAMPLES = SAMPLE_RATE * DURATION

# âœ… Correct speech parameters
N_MELS = 64
N_FFT = 400        # 25 ms
HOP_LENGTH = 160  # 10 ms

BATCH_SIZE = 16
EPOCHS = 10
LR = 5e-4


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

LANGUAGES = ['en', 'de', 'es', 'tr', 'ja']
LABEL_MAP = {lang: i for i, lang in enumerate(LANGUAGES)}

BASE_DATA = "/content/drive/MyDrive/LID_Project/data"
SAVE_MEL  = "/content/drive/MyDrive/LID_Project/logmel_3s"


In [None]:
def load_audio_3s(path):
    audio, _ = librosa.load(path, sr=SAMPLE_RATE, mono=True)

    if len(audio) > NUM_SAMPLES:
        audio = audio[:NUM_SAMPLES]
    else:
        audio = np.pad(audio, (0, NUM_SAMPLES - len(audio)))

    return audio


In [None]:
FIXED_FRAMES = 300

def extract_logmel(audio):
    mel = librosa.feature.melspectrogram(
        y=audio,
        sr=SAMPLE_RATE,
        n_fft=N_FFT,
        hop_length=HOP_LENGTH,
        n_mels=N_MELS,
        power=2.0
    )

    logmel = librosa.power_to_db(mel, ref=np.max)

    # normalization
    logmel = (logmel - logmel.mean()) / (logmel.std() + 1e-9)

    # forced fixed time lenght
    if logmel.shape[1] < FIXED_FRAMES:
        pad_width = FIXED_FRAMES - logmel.shape[1]
        logmel = np.pad(logmel, ((0, 0), (0, pad_width)))
    else:
        logmel = logmel[:, :FIXED_FRAMES]

    return torch.tensor(logmel, dtype=torch.float32)


In [None]:
os.makedirs(SAVE_MEL, exist_ok=True)

MAX_PER_LANG = 3000

for lang in LANGUAGES:
    src = os.path.join(BASE_DATA, lang)
    dst = os.path.join(SAVE_MEL, lang)
    os.makedirs(dst, exist_ok=True)

    files = [f for f in os.listdir(src) if f.endswith(".wav")]
    files = files[:MAX_PER_LANG]  # limit

    print(f"Processing {lang}: {len(files)} files")

    for f in tqdm(files):
        wav_path = os.path.join(src, f)
        mel_path = os.path.join(dst, f.replace(".wav", ".pt"))

        if os.path.exists(mel_path):
            continue

        audio = load_audio_3s(wav_path)
        logmel = extract_logmel(audio)
        torch.save(logmel, mel_path)


Processing en: 3000 files


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 3000/3000 [00:01<00:00, 1543.40it/s]


Processing de: 3000 files


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 3000/3000 [00:08<00:00, 351.17it/s]


Processing es: 3000 files


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 3000/3000 [00:03<00:00, 913.63it/s] 


Processing tr: 3000 files


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 3000/3000 [00:09<00:00, 326.23it/s]


Processing ja: 3000 files


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 3000/3000 [00:00<00:00, 5228.47it/s]


In [None]:
class LogMelDataset(Dataset):
    def __init__(self, base_path):
        self.items = []

        for lang in LANGUAGES:
            folder = os.path.join(base_path, lang)
            for f in os.listdir(folder):
                if f.endswith(".pt"):
                    self.items.append(
                        (os.path.join(folder, f), LABEL_MAP[lang])
                    )

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        path, label = self.items[idx]
        x = torch.load(path).unsqueeze(0)  # [1, 64, T]
        return x, label


In [None]:
dataset = LogMelDataset(SAVE_MEL)


In [None]:
from sklearn.model_selection import train_test_split

train_items = []
val_items = []

for lang in LANGUAGES:
    lang_items = [item for item in dataset.items if item[1] == LABEL_MAP[lang]]

    train_lang, val_lang = train_test_split(
        lang_items,
        test_size=0.2,
        random_state=42
    )

    train_items.extend(train_lang)
    val_items.extend(val_lang)


In [None]:
class SplitDataset(Dataset):
    def __init__(self, items):
        self.items = items

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        path, label = self.items[idx]

        x = torch.load(path)  # [64, T]

        # delta features
        delta = x[:, 1:] - x[:, :-1]
        delta = torch.nn.functional.pad(delta, (0, 1))


        x = torch.stack([x, delta], dim=0)  # [2, 64, T]

        return x, label


In [None]:
train_set = SplitDataset(train_items)
val_set   = SplitDataset(val_items)


In [None]:
train_loader = DataLoader(
    train_set,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0
)

val_loader = DataLoader(
    val_set,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0
)


In [None]:
class CNN_GRU_LanguageID(nn.Module):
    def __init__(self, num_classes=5):
        super().__init__()

        # CNN feature extractor
        self.cnn = nn.Sequential(
            nn.Conv2d(2, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),   # freq â†“

            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),   # freq â†“

            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # GRU for temporal modeling
        self.gru = nn.GRU(
            input_size=128,
            hidden_size=128,
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )

        # Classifier
        self.fc = nn.Sequential(
            nn.Linear(128 * 2, 128),  # bidirectional
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # x: [B, 2, 64, T]

        x = self.cnn(x)              # [B, 128, F, T]
        x = x.mean(dim=2)            # average over frequency â†’ [B, 128, T]
        x = x.permute(0, 2, 1)       # [B, T, 128]

        out, _ = self.gru(x)         # [B, T, 256]
        out = out.mean(dim=1)        # temporal average

        return self.fc(out)


In [None]:
model = CNN_GRU_LanguageID().to(DEVICE)
# class-weighted loss (stabilizes training)
weights = torch.tensor([1.0, 1.2, 1.2, 1.3, 1.3]).to(DEVICE)
criterion = nn.CrossEntropyLoss(weight=weights)

optimizer = optim.Adam(model.parameters(), lr=LR)



In [None]:
def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for x, y in loader:
            x = x.to(DEVICE)      # [B, 2, 64, T]
            y = y.to(DEVICE)

            T = x.shape[-1]
            chunk_len = T // 3

            votes = []

            for i in range(3):
                xs = x[:, :, :, i*chunk_len:(i+1)*chunk_len]
                logits = model(xs)
                preds = logits.argmax(dim=1)
                votes.append(preds)

            votes = torch.stack(votes, dim=1)
            final_preds, _ = torch.mode(votes, dim=1)

            correct += (final_preds == y).sum().item()
            total += y.size(0)

    return 100 * correct / total


In [None]:
best_acc = 0.0
BEST_MODEL_PATH = "/content/drive/MyDrive/LID_Project/best_model.pt"


In [None]:
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0

    for x, y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}"):
        x = x.to(DEVICE)
        y = y.to(DEVICE)

        optimizer.zero_grad()
        loss = criterion(model(x), y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    val_acc = evaluate(model, val_loader)

if val_acc > best_acc:
    best_acc = val_acc
    torch.save(model.state_dict(), BEST_MODEL_PATH)
    print("âœ… Best model saved")

print(f"Loss: {total_loss/len(train_loader):.4f} | "
      f"Val Acc: {val_acc:.2f}%")



Epoch 1/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [1:31:47<00:00,  3.73s/it]
Epoch 2/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:50<00:00, 13.42it/s]
Epoch 3/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:49<00:00, 13.50it/s]
Epoch 4/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:49<00:00, 13.48it/s]
Epoch 5/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:48<00:00, 13.55it/s]
Epoch 6/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:48<00:00, 13.59it/s]
Epoch 7/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:49<00:00, 13.48it/s]
Epoch 8/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:48<00:00, 13.56it/s]
Epoch 9/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:49<00:00, 13.53it/s]
Epoch 10/10: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1476/1476 [01:49<00:00, 13.50it/s]


âœ… Best model saved
Loss: 0.5149 | Val Acc: 64.61%


In [None]:
print(f"BEST VALIDATION ACCURACY: {best_acc:.2f}%")


BEST VALIDATION ACCURACY: 64.61%


In [None]:
model.load_state_dict(torch.load(BEST_MODEL_PATH, map_location=DEVICE))
model.eval()

print("âœ… Best model loaded for demo")


âœ… Best model loaded for demo


In [None]:
def process_demo_audio(wav_path):
    audio = load_audio_3s(wav_path)
    logmel = extract_logmel(audio)

    # delta
    delta = logmel[:, 1:] - logmel[:, :-1]
    delta = torch.nn.functional.pad(delta, (0, 1))

    x = torch.stack([logmel, delta], dim=0)  # [2, 64, T]
    x = x.unsqueeze(0)  # [1, 2, 64, T]

    return x


In [None]:
import torch.nn.functional as F

def predict_language(wav_path):
    x = process_demo_audio(wav_path).to(DEVICE)

    T = x.shape[-1]
    chunk_len = T // 3

    probs_sum = torch.zeros(len(LANGUAGES)).to(DEVICE)

    with torch.no_grad():
        for i in range(3):
            xs = x[:, :, :, i*chunk_len:(i+1)*chunk_len]
            logits = model(xs)
            probs = F.softmax(logits, dim=1)
            probs_sum += probs.squeeze(0)

    probs_avg = probs_sum / 3
    pred_idx = probs_avg.argmax().item()

    return LANGUAGES[pred_idx], probs_avg.cpu().numpy()


In [None]:
DEMO_PATH = "/content/drive/MyDrive/LID_Project/demo"

correct = 0
total = 0

for demo_file in sorted(os.listdir(DEMO_PATH)):
    if not demo_file.endswith(".wav"):
        continue

    demo_audio = os.path.join(DEMO_PATH, demo_file)

    # ðŸ”¹ TRUE language from filename
    true_lang = demo_file.split("_")[0]

    # ðŸ”¹ MODEL prediction
    pred_lang, probs = predict_language(demo_audio)

    # accuracy count
    total += 1
    if pred_lang == true_lang:
        correct += 1

    print("\nðŸŽ§ Audio:", demo_file)
    print("âœ… True language:     ", true_lang)
    print("ðŸ¤– Predicted language:", pred_lang)
    print("Probabilities:")
    for lang, p in zip(LANGUAGES, probs):
        print(f"  {lang}: {p:.3f}")

# ðŸ”¹ demo accuracy
print("\nðŸ“Š DEMO ACCURACY:", 100 * correct / total, "%")



ðŸŽ§ Audio: de_demo_1.wav
âœ… True language:      de
ðŸ¤– Predicted language: de
Probabilities:
  en: 0.015
  de: 0.930
  es: 0.046
  tr: 0.006
  ja: 0.003

ðŸŽ§ Audio: de_demo_2.wav
âœ… True language:      de
ðŸ¤– Predicted language: en
Probabilities:
  en: 0.451
  de: 0.098
  es: 0.007
  tr: 0.104
  ja: 0.340

ðŸŽ§ Audio: en_demo_1.wav
âœ… True language:      en
ðŸ¤– Predicted language: en
Probabilities:
  en: 0.921
  de: 0.005
  es: 0.007
  tr: 0.052
  ja: 0.014

ðŸŽ§ Audio: en_demo_2.wav
âœ… True language:      en
ðŸ¤– Predicted language: en
Probabilities:
  en: 0.472
  de: 0.054
  es: 0.318
  tr: 0.022
  ja: 0.134

ðŸŽ§ Audio: es_demo_1.wav
âœ… True language:      es
ðŸ¤– Predicted language: es
Probabilities:
  en: 0.379
  de: 0.003
  es: 0.586
  tr: 0.017
  ja: 0.015

ðŸŽ§ Audio: es_demo_2.wav
âœ… True language:      es
ðŸ¤– Predicted language: ja
Probabilities:
  en: 0.009
  de: 0.007
  es: 0.316
  tr: 0.064
  ja: 0.604

ðŸŽ§ Audio: ja_demo_1.wav
âœ… True language:      ja
ðŸ¤–