# Lyrics Clustering Pipeline
This notebook replicates the functionality of `lyrics_clustering.py` so the analysis can be run step by step.


In [None]:
import os
from pathlib import Path
import string
from collections import defaultdict
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk import pos_tag, word_tokenize
from sentence_transformers import SentenceTransformer

nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)

STOP_WORDS = set(stopwords.words('english'))
PUNCT_TABLE = str.maketrans('', '', string.punctuation)


In [None]:
def get_wordnet_pos(tag: str) -> str:
    tag = tag[0].upper()
    return {
        'J': 'a',
        'N': 'n',
        'V': 'v',
        'R': 'r',
    }.get(tag, 'n')


def preprocess(text: str, method: str = 'lemma', ngram: int = 1) -> str:
    tokens = [t.lower() for t in word_tokenize(text)]
    tokens = [t.translate(PUNCT_TABLE) for t in tokens]
    tokens = [t for t in tokens if t and t not in STOP_WORDS]
    if method == 'stem':
        stemmer = PorterStemmer()
        tokens = [stemmer.stem(t) for t in tokens]
    else:
        lemm = WordNetLemmatizer()
        pos_tags = pos_tag(tokens)
        tokens = [lemm.lemmatize(t, get_wordnet_pos(p)) for t, p in pos_tags]
    if ngram > 1:
        grams = [' '.join(tokens[i:i + ngram]) for i in range(len(tokens) - ngram + 1)]
    else:
        grams = tokens
    return ' '.join(grams)


def load_lyrics(csv_path: Path) -> pd.Series:
    df = pd.read_csv(csv_path)
    if 'Lyric' not in df.columns:
        raise ValueError('Expected column "Lyric" in dataset')
    df = df.dropna(subset=['Lyric'])
    return df['Lyric']


def embed_lyrics(lyrics: list[str], model_name: str) -> np.ndarray:
    model = SentenceTransformer(model_name)
    embeddings = model.encode(lyrics, show_progress_bar=True)
    return np.array(embeddings)


def run_kmeans(data: np.ndarray, k: int):
    km = KMeans(n_clusters=k, random_state=42)
    labels = km.fit_predict(data)
    score = silhouette_score(data, labels)
    return labels, score


def run_dbscan(data: np.ndarray, eps: float, min_samples: int):
    db = DBSCAN(eps=eps, min_samples=min_samples)
    labels = db.fit_predict(data)
    if len(set(labels)) > 1 and -1 not in set(labels):
        score = silhouette_score(data, labels)
    else:
        score = float('nan')
    return labels, score


def run_agglomerative(data: np.ndarray, k: int):
    agg = AgglomerativeClustering(n_clusters=k, linkage='ward')
    labels = agg.fit_predict(data)
    score = silhouette_score(data, labels)
    return labels, score


def elbow_plot(data: np.ndarray, k_range: range, out_path: Path):
    sse = []
    for k in k_range:
        km = KMeans(n_clusters=k, random_state=42)
        km.fit(data)
        sse.append(km.inertia_)
    plt.figure()
    plt.plot(list(k_range), sse, marker='o')
    plt.xlabel('Number of clusters (k)')
    plt.ylabel('SSE')
    plt.title('Elbow Method for K-Means')
    plt.savefig(out_path)
    plt.close()


def scatter_plot(data: np.ndarray, labels: np.ndarray, out_path: Path):
    pca = PCA(n_components=2, random_state=42)
    coords = pca.fit_transform(data)
    plt.figure()
    num_labels = len(set(labels))
    for lab in set(labels):
        idx = labels == lab
        plt.scatter(coords[idx, 0], coords[idx, 1], label=str(lab), s=10)
    if num_labels <= 10:
        plt.legend()
    plt.xlabel('PC1')
    plt.ylabel('PC2')
    plt.tight_layout()
    plt.savefig(out_path)
    plt.close()


def summarise_dataset(lyrics: pd.Series) -> dict:
    lengths = lyrics.str.split().str.len()
    return {
        'num_songs': len(lyrics),
        'min_length': lengths.min(),
        'max_length': lengths.max(),
        'avg_length': lengths.mean(),
    }


In [None]:
csv_path = Path('musicLyrics.csv')
model_name = 'all-MiniLM-L6-v2'
preprocess_method = 'lemma'  # or 'stem'
ngram_size = 1
output_dir = Path('outputs')
output_dir.mkdir(exist_ok=True)


In [None]:
lyrics = load_lyrics(csv_path)
summary = summarise_dataset(lyrics)
print(f"Loaded {summary['num_songs']} songs")
print(f"Average length: {summary['avg_length']:.1f} words")


In [None]:
processed = [preprocess(t, method=preprocess_method, ngram=ngram_size) for t in tqdm(lyrics, desc='Preprocess')]


In [None]:
embeddings = embed_lyrics(processed, model_name)


In [None]:
elbow_plot(embeddings, range(2, 8), output_dir / 'kmeans_elbow.png')


In [None]:
k_labels, k_score = run_kmeans(embeddings, k=2)
scatter_plot(embeddings, k_labels, output_dir / 'kmeans_scatter.png')

db_labels, db_score = run_dbscan(embeddings, eps=0.5, min_samples=5)
if not np.isnan(db_score):
    scatter_plot(embeddings, db_labels, output_dir / 'dbscan_scatter.png')

ag_labels, ag_score = run_agglomerative(embeddings, k=3)
scatter_plot(embeddings, ag_labels, output_dir / 'agg_scatter.png')


In [None]:
print('
Silhouette Scores:')
print(f'  K-Means (k=2): {k_score:.3f}')
print(f'  DBSCAN: {db_score:.3f}')
print(f'  Agglomerative (k=3): {ag_score:.3f}')

examples = defaultdict(list)
for lyric, lab in zip(lyrics, k_labels):
    if len(examples[lab]) < 2:
        examples[lab].append(lyric[:120] + '...')
    if all(len(v) >= 2 for v in examples.values()):
        break

print('
Cluster Examples (K-Means):')
for lab, exs in examples.items():
    print(f'Cluster {lab}')
    for ex in exs:
        print(f'  - {ex}')
