In [None]:
pip install flask pyngrok flask_ngrok


In [None]:
pip install  torch llm2vec librosa pandas soundfile torchaudio peft timm pyyaml torchao

In [None]:
from llm2vec import LLM2Vec
from transformers import AutoTokenizer, AutoModel, AutoConfig
from peft import PeftModel
from torchao.quantization import quantize_, Int8WeightOnlyConfig
import torch

access_token = REDACTED_TOKEN

model_id = "McGill-NLP/LLM2Vec-Sheared-LLaMA-mntp"

tokenizer = AutoTokenizer.from_pretrained(model_id, padding = True, truncation = True, max_length = 512)
config = AutoConfig.from_pretrained(model_id, trust_remote_code=True)

if torch.cuda.is_available():
    print("Using GPU")
# GPU path: use bf16 for speed
    model = AutoModel.from_pretrained(
    model_id,
    trust_remote_code=True,
    config=config,
    torch_dtype=torch.bfloat16,
    device_map="cuda",
    token=access_token,
)
else:
    print("Using CPU")
# CPU path: use float32 first, then quantize
    model = AutoModel.from_pretrained(
    model_id,
    trust_remote_code=True,
    config=config,
    torch_dtype=torch.float32,   # quantization requires fp32
    device_map="cpu",
    token=access_token,
    )

    try:
        from torchao.quantization import quantize_
        print("[INFO] Applying torchao quantization for CPU...")
        quant_config = Int8WeightOnlyConfig(group_size=None)
        print("[INFO] Applying torchao quantization with Int8WeightOnlyConfig...")
        quantize_(model, quant_config)

    except ImportError:
        print("[WARNING] torchao not installed. Run: pip install torchao")
        print("[WARNING] Falling back to non-quantized CPU model.")

l2v = LLM2Vec(model, tokenizer, pooling_mode="mean", max_length=512)

In [None]:
from sklearn.decomposition import IncrementalPCA
from sklearn.preprocessing import StandardScaler
from pathlib import Path

import numpy as np
import pickle
import torch
import os
import joblib

# Initialize PCA and StandardScaler globally for training
_pca_trainer = None

class SimplePCATrainer:
    """
    A simple PCA trainer that uses IncrementalPCA to fit data in batches.
    It saves checkpoints every 5 batches and can save the final model.

    Args:
        None

    Returns:
        None

    Attributes:
        pca: The IncrementalPCA model.
        scaler: StandardScaler for normalizing data.
        fitted: Boolean indicating if the model has been initialized.
        batch_count_pca: Counter for the number of batches processed.

    Methods:
        process_batch(vectors): Processes a batch of vectors, fits the PCA model incrementally.
        save_final(model_path): Saves the final PCA model to the specified path.
    """

    # Initialize the trainer
    def __init__(self):
        self.pca = None
        self.scaler = StandardScaler()
        self.fitted = False
        self.batch_count_pca = 0

    def _determine_optimal_components(self, vectors):
        """
        Determine the optimal number of PCA components to retain 95% variance.

        Args:
            vectors: The input data to analyze.
        Returns:
            n_components: The optimal number of components.
        """
        temp_pca = IncrementalPCA()
        temp_pca.fit(vectors)
        cumsum_var = np.cumsum(temp_pca.explained_variance_ratio_)
        n_comp_95 = np.argmax(cumsum_var >= 0.95) + 1
        return min(n_comp_95, vectors.shape[1] // 2)

    def process_batch(self, vectors):
        """
        Process a batch of vectors, fitting the PCA model incrementally.

        Args:
            vectors: The input data batch to process.
        Returns:
            reduced_vectors: The PCA-transformed data.

        Note: This method saves a checkpoint every 5 batches.
        """
        if not self.fitted:
            # First batch - initialize everything
            n_components = self._determine_optimal_components(vectors)
            self.pca = IncrementalPCA(n_components=n_components, batch_size=1000)
            self.scaler.fit(vectors)
            self.fitted = True
            print(f"Initialized PCA with {n_components} components")

        # Process batch
        vectors_scaled = self.scaler.transform(vectors)
        self.pca.partial_fit(vectors_scaled)
        reduced_vectors = self.pca.transform(vectors_scaled)

        self.batch_count_pca += 1

        # Save checkpoint every 5 batches
        if self.batch_count_pca % 5 == 0:
            os.makedirs("pca_checkpoints", exist_ok=True)
            with open(f"pca_checkpoints/checkpoint_batch_{self.batch_count_pca}.pkl", 'wb') as f:
                pickle.dump({'pca': self.pca, 'scaler': self.scaler}, f)
            print(f"Saved checkpoint at batch {self.batch_count_pca}")

        print(f"Processed batch {self.batch_count_pca}, shape: {vectors.shape} -> {reduced_vectors.shape}")
        return reduced_vectors

    def save_final(self, model_path):
        """
        Save the final PCA model to the specified path.

        Args:
            model_path: The file path to save the PCA model.

        Returns:
            None

        Note: Change the model path as needed in the data_config.yml file.
        """
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        with open(model_path, 'wb') as f:
            pickle.dump({'pca': self.pca, 'scaler': self.scaler}, f)
        print(f"Final model saved to {model_path}. Total variance explained: {np.sum(self.pca.explained_variance_ratio_):.4f}")

## For Single Input
def load_pca_model(vectors, model_path="models/fusion/pca.pkl"):
    """
    Load a pre-trained PCA model and transform the input vectors.

    Args:
        vectors: The input data to transform.
        model_path: The file path of the pre-trained PCA model.

    Returns:
        output: The PCA-transformed data.

    Note: Change the model path as needed in the data_config.yml file (or set the path file as shown above). Can be used for the main program.
    """
    model_path = Path(model_path)
    pca = joblib.load(model_path)
    return pca.transform(vectors)

def l2vec_single_train(l2v, lyrics):
    """
    Encode a single lyric string using the provided LLM2Vec model.

    Args:
        l2v: The LLM2Vec model for encoding lyrics.
        lyrics: A single lyric string to encode.

    Returns:
        vectors: The vector representation of the lyrics.

    """
    vectors = l2v.encode([lyrics]).detach().cpu().numpy()
    return vectors

# For Batch Processing
def l2vec_train(l2v, lyrics_list):
    """
    Encode a list of lyric strings using the provided LLM2Vec model.

    Args:
        l2v: The LLM2Vec model for encoding lyrics.
        lyrics_list: A list of lyric strings to encode.
    Returns:
        vectors: The encoded vector representations of the lyrics.

    Note: This function only encodes the lyrics and does not apply PCA reduction. The PCA reduction can be applied separately in the train.py module.
    """
    with torch.no_grad():
        vectors = l2v.encode(lyrics_list)  # lyrics_list: list of strings
    return vectors

In [None]:
!git clone https://github.com/krislette/bach-or-bot.git
%cd bach-or-bot

In [None]:
import sys
import os

# Add the src directory to the system path
sys.path.append('/content/bach-or-bot/src')

# Change the current directory to the project root
os.chdir('/content/bach-or-bot')

In [None]:
import threading
import torch
import numpy as np
from types import SimpleNamespace

from src.spectttra.feature import FeatureExtractor
from src.spectttra.spectttra import SpecTTTra, build_spectttra_from_cfg, load_frozen_spectttra

# Shared variables for the model and setup, loaded only once and reused (cache)
_PREDICTOR_LOCK = threading.Lock()
_FEAT_EXT = None
_MODEL = None
_CFG = None
_DEVICE = None


def build_spectttra(cfg, device):
    """
    Wrapper that builds SpecTTTra + FeatureExtractor and loads frozen checkpoint.
    """
    feat_ext, model = build_spectttra_from_cfg(cfg, device)
    model = load_frozen_spectttra(model, "/content/bach-or-bot/models/spectttra/spectttra_frozen.pth", device)
    return feat_ext, model


def _init_predictor_once():
    """
    Initialize and cache FeatureExtractor and SpecTTTra once per process.

    Ensures thread-safe, one-time initialization of the feature extractor and
    transformer model, including moving them to the appropriate device.

    This function also sets default configurations for audio,
    mel-spectrogram extraction, and model architecture.
    """

    global _FEAT_EXT, _MODEL, _CFG, _DEVICE

    if _MODEL is not None and _FEAT_EXT is not None:
        return

    with _PREDICTOR_LOCK:
        if _MODEL is not None and _FEAT_EXT is not None:
            return

        # Configurations of best performing variant for 120s
        cfg = SimpleNamespace(
            audio=SimpleNamespace(sample_rate=16000, max_time=120, max_len=16000 * 120),
            melspec=SimpleNamespace(
                n_fft=2048,
                hop_length=512,
                win_length=2048,
                n_mels=128,
                f_min=20,
                f_max=8000,
                power=2,
                top_db=80,
                norm="mean_std",
            ),
            model=SimpleNamespace(
                embed_dim=384,
                num_heads=6,
                num_layers=12,
                t_clip=3,
                f_clip=1,
                pre_norm=True,
                pe_learnable=True,
                pos_drop_rate=0.1,
                attn_drop_rate=0.1,
                proj_drop_rate=0.0,
                mlp_ratio=2.67,
            ),
        )

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        feat_ext, model = build_spectttra(cfg, device)
        feat_ext.to(device)

        # Move model to device (GPU if available) and allow faster inference with mixed precision
        model.to(device).eval()

        # Cache
        _FEAT_EXT, _MODEL, _CFG, _DEVICE = feat_ext, model, cfg, device


def spectttra_predict(audio_tensor):
    """
    Run single-input inference with SpecTTTra.

    Args:
        audio_tensor (torch.Tensor): Input waveform of shape (1, num_samples).
            Must already be preprocessed including resampled to the target sampling rate (16 kHz).

    Returns:
        np.ndarray:
            1D embedding vector of shape (embed_dim,). The embedding is obtained
            by mean-pooling the transformer token outputs.
    """

    global _FEAT_EXT, _MODEL, _CFG, _DEVICE

    _init_predictor_once()

    device = _DEVICE
    feat_ext = _FEAT_EXT
    model = _MODEL
    cfg = _CFG

    # Move waveform to device but keep float for mel extraction
    waveform = audio_tensor.to(device).float()

    with torch.no_grad():
        # Extract mel-spectrogram
        melspec = feat_ext(waveform)

        # Ensure melspec shape matches model's expectation ---
        expected_frames = model.input_temp_dim  # expected_frames is 3744
        if melspec.shape[2] > expected_frames:
            melspec = melspec[:, :, :expected_frames]
        elif melspec.shape[2] < expected_frames:
            padding = expected_frames - melspec.shape[2]
            melspec = torch.nn.functional.pad(melspec, (0, padding))

        if device.type == "cuda":
            with torch.cuda.amp.autocast(enabled=True):
                tokens = model(melspec)
                pooled = tokens.mean(dim=1)
        else:
            tokens = model(melspec)
            pooled = tokens.mean(dim=1)

    out = pooled.squeeze(0).cpu().numpy()
    return out


def spectttra_train(audio_tensors):
    """
    Efficient batch inference with SpecTTTra (GPU-optimized and pad-safe).
    Args:
        audio_tensors (list[torch.Tensor]): List of tensors (1, num_samples) or (num_samples,)
    Returns:
        np.ndarray of shape (batch_size, embed_dim)
    """
    global _FEAT_EXT, _MODEL, _CFG, _DEVICE
    _init_predictor_once()

    if not audio_tensors:
        return np.empty((0, _CFG.model.embed_dim))

    # Normalize shape and get max length
    normalized = []
    max_len = max(t.numel() for t in audio_tensors)

    for t in audio_tensors:
        # Ensure each tensor is shape (1, num_samples)
        if t.ndim == 1:
            t = t.unsqueeze(0)
        elif t.ndim > 2:
            raise ValueError(f"Unexpected tensor shape: {t.shape}")

        # Pad shorter tensors to max length
        pad_len = max_len - t.shape[-1]
        if pad_len > 0:
            t = torch.nn.functional.pad(t, (0, pad_len))
        normalized.append(t)

    # Stack into batch (B, 1, num_samples)
    batch_waveforms = torch.cat(normalized, dim=0).to(_DEVICE).float()

    with torch.no_grad():
        melspec = _FEAT_EXT(batch_waveforms)  # (B, n_mels, n_frames)

        if _DEVICE.type == "cuda":
            with torch.cuda.amp.autocast(enabled=True):
                tokens = _MODEL(melspec)      # (B, num_tokens, embed_dim)
                pooled = tokens.mean(dim=1)   # (B, embed_dim)
        else:
            tokens = _MODEL(melspec)
            pooled = tokens.mean(dim=1)

    return pooled.cpu().numpy()

In [None]:
import getpass
import os
import threading
import numpy as np
import torch
from flask import Flask, request, jsonify
from pyngrok import ngrok, conf

# Import your SpecTTTra functions
from __main__ import spectttra_predict, spectttra_train
from __main__ import l2vec_single_train, l2vec_train, l2v

# Setup ngrok Authentication

print("Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass("Ngrok Authtoken: ")


# Initialize Flask + ngrok

app = Flask(__name__)
port = 5000

public_url = ngrok.connect(port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{port}\"")
app.config["BASE_URL"] = public_url

# Flask Routes

@app.route("/")
def home():
    return """
    <h1>L2Vec + SpecTTTra Flask API is running</h1>
    <p>Available endpoints:</p>
    <ul>
      <li>POST /single → {'lyrics': 'string'}</li>
      <li>POST /batch → {'lyrics_list': ['song1', 'song2', ...]}</li>
      <li>POST /spectttra/predict → {'audio': [[...]]}</li>
      <li>POST /spectttra/train → {'audios': [[[...]], [[...]], ...]}</li>
    </ul>
    """

# --- L2V single lyric ---
@app.route("/single", methods=["POST"])
def single():
    try:
        data = request.get_json()
        if "lyrics" not in data:
            return jsonify({"error": "Missing 'lyrics' field"}), 400

        lyrics = data["lyrics"]
        vectors = l2vec_single_train(l2v, lyrics)
        return jsonify({"vectors": vectors.tolist()})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# --- L2V batch lyrics ---
@app.route("/batch", methods=["POST"])
def batch():
    try:
        data = request.get_json()
        if "lyrics_list" not in data:
            return jsonify({"error": "Missing 'lyrics_list' field"}), 400

        lyrics_list = data["lyrics_list"]
        vectors = l2vec_train(l2v, lyrics_list)
        return jsonify({"vectors": vectors.tolist()})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# --- SpecTTTra: single audio ---
@app.route("/spectttra/predict", methods=["POST"])
def predict_audio():
    try:
        data = request.get_json()
        if "audio" not in data:
            return jsonify({"error": "Missing 'audio' field"}), 400

        # Convert to tensor with shape (1, num_samples)
        audio_array = np.array(data["audio"], dtype=np.float32)
        if audio_array.ndim != 1:
            return jsonify({"error": "Audio must be 1D list of floats"}), 400

        audio_tensor = torch.tensor(audio_array).unsqueeze(0)  # (1, num_samples)
        embedding = spectttra_predict(audio_tensor)
        return jsonify({"embedding": embedding.tolist()})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# --- SpecTTTra: batch audio ---
@app.route("/spectttra/train", methods=["POST"])
def train_audio():
    try:
        data = request.get_json()
        if "audios" not in data:
            return jsonify({"error": "Missing 'audios' field"}), 400

        audio_tensors = []
        for a in data["audios"]:
            arr = np.array(a, dtype=np.float32)
            if arr.ndim == 1:
                arr = np.expand_dims(arr, axis=0)  # (1, num_samples)
            tensor = torch.tensor(arr)
            audio_tensors.append(tensor)

        embeddings = spectttra_train(audio_tensors)
        return jsonify({"embeddings": embeddings.tolist()})
    except Exception as e:
        return jsonify({"error": str(e)}), 500


# Start Flask in Background

threading.Thread(target=app.run, kwargs={"use_reloader": False}).start()

print("\nYour Colab backend is live!")
print(f"L2V endpoints:\n  {public_url}/single\n  {public_url}/batch")
print(f"SpecTTTra endpoints:\n  {public_url}/spectttra/predict\n  {public_url}/spectttra/train")

In [None]:
import torch

def flush_gpu_cache():
    """Flushes the GPU cache."""
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        print("GPU cache flushed.")
    else:
        print("No GPU available.")

# Example usage: Call this function whenever you want to flush the cache
# flush_gpu_cache()