<a href="https://colab.research.google.com/github/rajaranjith/HCL-GenAI-Training/blob/main/GenAI-Application-Developer-FY26-SilverBadge-03Dec2025-Ass2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:

#@title Install/Upgrade Dependencies
!pip -q install -U torch torchaudio soundfile numpy gradio tqdm
!apt -yqq install ffmpeg > /dev/null


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.1/62.1 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m899.7/899.7 MB[0m [31m906.3 kB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m594.3/594.3 MB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/10.2 MB[0m [31m101.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.0/88.0 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m954.8/954.8 kB[0m [31m52.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.1/193.1 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m48.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:

#@title Verify GPU & Library Versions
import torch, torchaudio, platform
print("Python:", platform.python_version())
print("Torch:", torch.__version__, "| CUDA available:", torch.cuda.is_available())
print("Torchaudio:", torchaudio.__version__)


Python: 3.12.12
Torch: 2.9.1+cu128 | CUDA available: False
Torchaudio: 2.9.1+cu128


In [3]:

#@title Audio Similarity Core (run this cell)

import os
import pickle
from typing import Dict, List, Tuple

import numpy as np
import torch
import torchaudio
from torchaudio.transforms import Resample
from tqdm import tqdm

# ---------- Utilities ----------
AUDIO_EXTS = {".wav", ".mp3", ".flac", ".ogg", ".m4a", ".aac", ".wma", ".opus", ".aiff", ".aif"}

def is_audio_file(path: str) -> bool:
    return os.path.splitext(path)[1].lower() in AUDIO_EXTS

def list_audio_files(root: str) -> List[str]:
    files = []
    for dirpath, _, filenames in os.walk(root):
        for fn in filenames:
            fp = os.path.join(dirpath, fn)
            if is_audio_file(fp):
                files.append(fp)
    return sorted(files)

def load_audio(path: str, target_sr: int, device: torch.device) -> Tuple[torch.Tensor, int, float]:
    """
    Returns:
      waveform: (1, num_samples) float32 tensor on device
      sample_rate: int (== target_sr after resample)
      duration_sec: float
    """
    waveform, sr = torchaudio.load(path)  # (channels, samples)
    if waveform.dim() != 2:
        raise RuntimeError(f"Unexpected audio shape for {path}: {waveform.shape}")

    # Mixdown to mono
    if waveform.size(0) > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    else:
        waveform = waveform[:1, :]

    # Resample if needed
    if sr != target_sr:
        resampler = Resample(orig_freq=sr, new_freq=target_sr)
        waveform = resampler(waveform)

    duration_sec = waveform.size(1) / float(target_sr)
    waveform = waveform.to(device)
    return waveform, target_sr, duration_sec

def chunk_waveform(
    waveform: torch.Tensor,
    sr: int,
    chunk_sec: float = 5.0,
    hop_sec: float = 2.5,
) -> List[torch.Tensor]:
    """
    Split waveform (1, N) into overlapping chunks.
    Returns list of (1, chunk_samples).
    """
    assert waveform.dim() == 2 and waveform.size(0) == 1
    n = waveform.size(1)
    chunk_samples = int(sr * chunk_sec)
    hop_samples = int(sr * hop_sec)
    if n <= chunk_samples:
        return [waveform]

    chunks = []
    start = 0
    while start < n:
        end = min(start + chunk_samples, n)
        ch = waveform[:, start:end]
        if ch.size(1) < int(0.5 * chunk_samples) and len(chunks) > 0:
            break
        chunks.append(ch)
        start += hop_samples
        if end == n:
            break
    return chunks

# ---------- Embedding Extractors ----------
class EmbeddingExtractor:
    """
    method: 'wav2vec2' (default), 'hubert', or 'mfcc'
    Produces a fixed-size L2-normalized embedding per audio.
    """
    def __init__(self, method: str = "wav2vec2", device: str = None):
        self.method = method.lower()
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = torch.device(device)
        self.model = None
        self.bundle = None

        if self.method == "wav2vec2":
            self.bundle = torchaudio.pipelines.WAV2VEC2_BASE
            self.model = self.bundle.get_model().to(self.device).eval()
            self.sample_rate = self.bundle.sample_rate  # 16000
            self.embedding_dim = self.model.encoder_embed_dim  # typically 768
        elif self.method == "hubert":
            self.bundle = torchaudio.pipelines.HUBERT_BASE
            self.model = self.bundle.get_model().to(self.device).eval()
            self.sample_rate = self.bundle.sample_rate  # 16000
            self.embedding_dim = self.model.encoder_embed_dim  # typically 768
        elif self.method == "mfcc":
            self.sample_rate = 16000
            self.n_mfcc = 40
            self.mfcc = torchaudio.transforms.MFCC(
                sample_rate=self.sample_rate,
                n_mfcc=self.n_mfcc,
                melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 64, "center": True},
            ).to(self.device)
            self.embedding_dim = self.n_mfcc * 2  # mean + std
        else:
            raise ValueError("method must be one of: wav2vec2, hubert, mfcc")

    @torch.inference_mode()
    def embed_waveform(self, waveform: torch.Tensor, sr: int) -> torch.Tensor:
        """
        waveform: (1, N) on self.device; sr is original sample rate.
        Returns: (D,) float32 tensor (L2-normalized)
        """
        # Resample to model/mfcc SR
        if sr != self.sample_rate:
            resampler = Resample(orig_freq=sr, new_freq=self.sample_rate).to(self.device)
            waveform = resampler(waveform)

        # Chunk for long files
        chunks = chunk_waveform(waveform, self.sample_rate, chunk_sec=5.0, hop_sec=2.5)

        if self.method in ("wav2vec2", "hubert"):
            embs = []
            for ch in chunks:
                feats_list, _ = self.model.extract_features(ch)  # (list of layers)
                last = feats_list[-1]  # (B=1, T, C)
                pooled = last.mean(dim=1).squeeze(0)  # (C,)
                embs.append(pooled)
            emb = torch.stack(embs, dim=0).mean(dim=0)
        else:
            embs = []
            for ch in chunks:
                mfcc = self.mfcc(ch)  # (1, n_mfcc, time)
                mean = mfcc.mean(dim=2).squeeze(0)
                std = mfcc.std(dim=2).squeeze(0)
                embs.append(torch.cat([mean, std], dim=0))
            emb = torch.stack(embs, dim=0).mean(dim=0)

        emb = emb.float()
        emb = emb / (torch.norm(emb) + 1e-8)
        return emb  # (D,)

# ---------- Indexing & Query ----------
def build_index(audio_dir: str, index_path: str, method: str = "wav2vec2", device: str = None) -> None:
    extractor = EmbeddingExtractor(method=method, device=device)
    sr_target = extractor.sample_rate
    files = list_audio_files(audio_dir)
    if not files:
        print(f"No audio files found in: {audio_dir}")
        return

    index = {
        "method": method,
        "sample_rate": sr_target,
        "embedding_dim": extractor.embedding_dim,
        "items": []  # list of dict{path, embedding(np.float32), duration_sec}
    }

    print(f"Indexing {len(files)} files using method='{method}' on device={extractor.device} ...")
    for path in tqdm(files):
        try:
            waveform, sr, duration = load_audio(path, sr_target, extractor.device)
            emb = extractor.embed_waveform(waveform, sr)  # (D,)
            index["items"].append({
                "path": os.path.abspath(path),
                "embedding": emb.cpu().numpy().astype(np.float32),
                "duration_sec": float(duration),
            })
        except Exception as e:
            print(f"[WARN] Failed: {path} ({e})")

    with open(index_path, "wb") as f:
        pickle.dump(index, f)
    print(f"✅ Saved index with {len(index['items'])} items → {index_path}")

def load_index(index_path: str) -> Dict:
    if not os.path.exists(index_path):
        raise FileNotFoundError(f"Index not found: {index_path}")
    with open(index_path, "rb") as f:
        index = pickle.load(f)
    return index

@torch.inference_mode()
def query_similar(
    query_file: str,
    index_path: str,
    top_k: int = 5,
    method: str = "wav2vec2",
    device: str = None,
) -> List[Tuple[str, float, float]]:
    """
    Returns a list of (path, similarity, duration_sec) sorted by similarity (desc).
    """
    index = load_index(index_path)
    if index["method"] != method:
        print(f"[NOTE] Index was built with method='{index['method']}', but querying with '{method}'.")
        print("      For best results, use the same method in both steps.")

    extractor = EmbeddingExtractor(method=method, device=device)
    sr_target = extractor.sample_rate

    waveform, sr, _ = load_audio(query_file, sr_target, extractor.device)
    q_emb = extractor.embed_waveform(waveform, sr).cpu().numpy().astype(np.float32)
    q_emb = q_emb / (np.linalg.norm(q_emb) + 1e-8)

    paths, durs, embs = [], [], []
    for item in index["items"]:
        paths.append(item["path"])
        durs.append(item["duration_sec"])
        embs.append(item["embedding"])
    if not embs:
        return []

    M = np.vstack(embs)  # (N, D)
    M = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-8)
    sims = M @ q_emb  # cosine similarity

    k = min(top_k, len(paths))
    idxs = np.argpartition(-sims, k - 1)[:k]
    idxs = idxs[np.argsort(-sims[idxs])]
    results = [(paths[i], float(sims[i]), float(durs[i])) for i in idxs]
    return results


In [4]:

#@title Create a sample folder (you can skip if you have your own)
import os, shutil

AUDIO_DIR = "/content/my_audio"  #@param {type: "string"}
os.makedirs(AUDIO_DIR, exist_ok=True)

print("Audio directory is:", AUDIO_DIR)
print("Put .wav/.mp3/.m4a/.ogg/.flac etc. files in this folder.")


Audio directory is: /content/my_audio
Put .wav/.mp3/.m4a/.ogg/.flac etc. files in this folder.


In [5]:

#@title (Optional) Upload a few audio files here
from google.colab import files
uploaded = files.upload()  # select multiple audio files
for name in uploaded.keys():
    src = f"/content/{name}"
    dst = os.path.join(AUDIO_DIR, name)
    shutil.move(src, dst)
print("Files moved to:", AUDIO_DIR)


Files moved to: /content/my_audio


In [7]:

#@title Build the index
AUDIO_DIR = AUDIO_DIR  #@param {type: "string"}
INDEX_PATH = "/content/audio_index.pkl"  #@param {type: "string"}
METHOD = "wav2vec2"  #@param ["wav2vec2", "hubert", "mfcc"]
DEVICE = "cpu"  #@param ["auto", "cpu", "cuda"]

_device = None if DEVICE == "auto" else DEVICE
build_index(AUDIO_DIR, INDEX_PATH, method=METHOD, device=_device)


AttributeError: 'Wav2Vec2Model' object has no attribute 'encoder_embed_dim'

In [8]:

#@title Upload a query audio file
from google.colab import files
query_upload = files.upload()

assert len(query_upload) == 1, "Please upload exactly one query file."
query_file = list(query_upload.keys())[0]
query_path = f"/content/{query_file}"
print("Query file at:", query_path)


AssertionError: Please upload exactly one query file.

In [9]:

#@title Find Top‑K similar files
TOP_K = 5  #@param {type:"integer"}
METHOD = METHOD  # keep consistent with index

results = query_similar(query_path, INDEX_PATH, top_k=TOP_K, method=METHOD)
if not results:
    print("No results found (is your index empty?)")
else:
    print(f"\nTop {len(results)} similar to: {query_path}")
    for rank, (path, sim, dur) in enumerate(results, 1):
        print(f"{rank:2d}. sim={sim:0.4f}  dur={dur:0.1f}s  {path}")


NameError: name 'query_path' is not defined