# Data Loading and Preprocessing

In [None]:
import pandas as pd

In [None]:
df=pd.read_csv('DF_3Genres_Lyrics_En.csv')

In [None]:
df.shape

In [None]:
df.isnull().sum()

In [None]:
df.head()

In [None]:
df.columns

In [None]:
# Merge the genre columns into a single 'genre' column
def get_genre(row):
    if row['Hip Hop'] == 1:
        return 'hip hop'
    elif row['Pop'] == 1:
        return 'pop'
    elif row['Rock'] == 1:
        return 'rock'
    else:
        return 'unknown' # Handle cases where no genre is marked (should not happen with this dataset)

df['genre'] = df.apply(get_genre, axis=1)

# Drop the original genre columns
df.drop(['Hip Hop', 'Pop', 'Rock'], axis=1, inplace=True)

display(df.head())

# Exporatory Data Analysis

In [None]:
import plotly.express as px

genre_counts = df['genre'].value_counts().reset_index()
genre_counts.columns = ['genre', 'count']

fig = px.pie(genre_counts, values='count', names='genre', title='Distribution of Music Genres')
fig.show()

In [None]:
from wordcloud import WordCloud

all_lyrics = " ".join(df['Lyric'])


wordcloud = WordCloud(width=800, height=400, background_color='white').generate(all_lyrics)


plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

In [None]:
from collections import Counter
Counter(df['genre'])

# Trial of Combinations

In [None]:

from dataclasses import dataclass
from typing import List, Tuple, Dict, Any

from sklearn.model_selection import StratifiedKFold, cross_validate
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import make_scorer, f1_score, accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.naive_bayes import ComplementNB
from sklearn.linear_model import LogisticRegression, SGDClassifier, RidgeClassifier, PassiveAggressiveClassifier
from sklearn.svm import LinearSVC, SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neighbors import NearestCentroid
from sklearn.base import BaseEstimator, TransformerMixin
from joblib import dump

# Try optional deps
try:
    from xgboost import XGBClassifier
    HAVE_XGB = True
except Exception:
    HAVE_XGB = False

try:
    from sentence_transformers import SentenceTransformer
    HAVE_SBERT = True
except Exception:
    HAVE_SBERT = False

# =========================
# CONFIG
# =========================
DATA_PATH = "songs.csv"         # <- change to your file
TEXT_COL  = "Lyric"
TARGET_COL= "genre"

SBERT_MODEL_NAME = "all-mpnet-base-v2"  # change if you want
CV_FOLDS = 5
RANDOM_STATE = 42
N_JOBS = -1

SAVE_DIR = "artifacts"
BEST_MODEL_PATH = os.path.join(SAVE_DIR, "best_pipeline.joblib")
LABEL_ENCODER_PATH = os.path.join(SAVE_DIR, "label_encoder.joblib")
os.makedirs(SAVE_DIR, exist_ok=True)

# =========================
# UTIL: SBERT vectorizer as sklearn transformer
# =========================
class SBERTVectorizer(BaseEstimator, TransformerMixin):
    def __init__(self, model_name=SBERT_MODEL_NAME, batch_size=64, normalize=True):
        self.model_name = model_name
        self.batch_size = batch_size
        self.normalize = normalize
        self.model = None

    def fit(self, X, y=None):
        if not HAVE_SBERT:
            raise ImportError("sentence-transformers not installed. pip install sentence-transformers")
        if self.model is None:
            self.model = SentenceTransformer(self.model_name)
        return self

    def transform(self, X):
        embeddings = self.model.encode(
            list(X),
            batch_size=self.batch_size,
            show_progress_bar=False,
            normalize_embeddings=self.normalize
        )
        return np.asarray(embeddings, dtype=np.float32)

# =========================
# DATA
# =========================
df = pd.read_csv(DATA_PATH)
df = df.dropna(subset=[TEXT_COL, TARGET_COL])
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)
X_text = df_train[TEXT_COL].astype(str).values
y_str  = df_train[TARGET_COL].astype(str).values

le = LabelEncoder()
y = le.fit_transform(y_str)

# Save label encoder now (classes are final)
dump(le, LABEL_ENCODER_PATH)

# =========================
# SCORERS & CV
# =========================
scorers = {
    "f1_macro": make_scorer(f1_score, average="macro"),
    "accuracy": make_scorer(accuracy_score)
}
cv = StratifiedKFold(n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE)

# =========================
# VECTORIZERS
# =========================
def tfidf_words():
    return TfidfVectorizer(ngram_range=(1,2), min_df=2, max_df=0.9)

def tfidf_chars():
    # character n-grams shine on noisy lyrics
    return TfidfVectorizer(analyzer="char", ngram_range=(3,5), min_df=2, lowercase=True)

def count_words():
    return CountVectorizer(ngram_range=(1,2), min_df=2)

def sbert_vec():
    return SBERTVectorizer(model_name=SBERT_MODEL_NAME)

# =========================
# CLASSIFIERS (with small, sane grids)
# =========================
def linear_svc(C=1.0):
    return LinearSVC(C=C)

def logreg(C=2.0):
    # saga handles L1/L2; we keep default penalty='l2'
    return LogisticRegression(C=C, max_iter=2000, solver="saga", n_jobs=N_JOBS)

def sgd(loss="hinge", alpha=1e-4):
    return SGDClassifier(loss=loss, alpha=alpha, random_state=RANDOM_STATE)

def ridge(alpha=1.0):
    return RidgeClassifier(alpha=alpha, random_state=RANDOM_STATE)

def pa(C=1.0):
    return PassiveAggressiveClassifier(C=C, random_state=RANDOM_STATE)

def cnb():
    return ComplementNB()

def rbf_svm(C=2.0, gamma="scale"):
    return SVC(kernel="rbf", C=C, gamma=gamma)

def knn(k=7):
    return KNeighborsClassifier(n_neighbors=k, metric="cosine")

def xgb_small():
    if not HAVE_XGB:
        return None
    return XGBClassifier(
        n_estimators=400, max_depth=6, learning_rate=0.05,
        subsample=0.9, colsample_bytree=0.8,
        tree_method="hist", eval_metric="mlogloss",
        random_state=RANDOM_STATE, n_jobs=N_JOBS
    )

# =========================
# COMBOS
# Each entry: (name, pipeline_factory)
# The factory returns a ready sklearn Pipeline
# =========================
def make_sparse_pipeline(vectorizer, clf):
    return Pipeline([
        ("vec", vectorizer),
        ("clf", clf)
    ])

def make_sparse_svd_tree(vectorizer, clf):
    # Trees don't love high-dim sparse; reduce with SVD
    return Pipeline([
        ("vec", vectorizer),
        ("svd", TruncatedSVD(n_components=300, random_state=RANDOM_STATE)),
        ("clf", clf)
    ])

def make_dense_pipeline(vectorizer, clf, scale=False):
    steps = [("vec", vectorizer)]
    if scale:
        steps.append(("scaler", StandardScaler()))
    steps.append(("clf", clf))
    return Pipeline(steps)

combos: List[Tuple[str, Pipeline]] = []

# --- Sparse text: TF-IDF words ---
combos += [
    ("tfidf_words + LinearSVC",      make_sparse_pipeline(tfidf_words(), linear_svc(C=1.0))),
    ("tfidf_words + LogisticReg",    make_sparse_pipeline(tfidf_words(), logreg(C=2.0))),
    ("tfidf_words + SGD(hinge)",     make_sparse_pipeline(tfidf_words(), sgd(loss="hinge", alpha=1e-4))),
    ("tfidf_words + SGD(log)",       make_sparse_pipeline(tfidf_words(), sgd(loss="log_loss", alpha=1e-4))),
    ("tfidf_words + Ridge",          make_sparse_pipeline(tfidf_words(), ridge(alpha=1.0))),
    ("tfidf_words + PassiveAggressive", make_sparse_pipeline(tfidf_words(), pa(C=1.0))),
    ("tfidf_words + ComplementNB",   make_sparse_pipeline(tfidf_words(), cnb())),
    ("tfidf_words + NearestCentroid",make_sparse_pipeline(tfidf_words(), NearestCentroid(metric="cosine")))
]

# --- Sparse text: TF-IDF chars ---
combos += [
    ("tfidf_chars + LinearSVC",      make_sparse_pipeline(tfidf_chars(), linear_svc(C=1.0))),
    ("tfidf_chars + LogisticReg",    make_sparse_pipeline(tfidf_chars(), logreg(C=2.0))),
    ("tfidf_chars + Ridge",          make_sparse_pipeline(tfidf_chars(), ridge(alpha=1.0))),
]

# --- Sparse text: Count (for NB) ---
combos += [
    ("count_words + ComplementNB",   make_sparse_pipeline(count_words(), cnb())),
]

# --- Sparse -> Trees via SVD (optional XGB if available) ---
if HAVE_XGB:
    combos += [
        ("tfidf_words + SVD + XGB",  make_sparse_svd_tree(tfidf_words(), xgb_small())),
        ("tfidf_chars + SVD + XGB",  make_sparse_svd_tree(tfidf_chars(), xgb_small())),
    ]

# --- Dense embeddings: SBERT (if available) ---
if HAVE_SBERT:
    combos += [
        ("SBERT + LogisticReg",      make_dense_pipeline(sbert_vec(), logreg(C=2.0), scale=True)),
        ("SBERT + RBF SVM",          make_dense_pipeline(sbert_vec(), rbf_svm(C=2.0), scale=True)),
        ("SBERT + kNN(7)",           make_dense_pipeline(sbert_vec(), knn(k=7), scale=False)),
    ]
    if HAVE_XGB:
        combos += [("SBERT + XGB",   make_dense_pipeline(sbert_vec(), xgb_small(), scale=False))]

# =========================
# RUN ALL COMBOS (cross-validate)
# =========================
results: List[Dict[str, Any]] = []

print(f"Running {len(combos)} combos with {CV_FOLDS}-fold Stratified CV...")
for name, pipe in combos:
    print(f"\n▶ {name}")
    try:
        cvres = cross_validate(
            pipe, X_text, y,
            scoring=scorers, cv=cv, n_jobs=N_JOBS, return_train_score=False
        )
        res = {
            "name": name,
            "f1_macro_mean": float(np.mean(cvres["test_f1_macro"])),
            "f1_macro_std":  float(np.std(cvres["test_f1_macro"])),
            "acc_mean":      float(np.mean(cvres["test_accuracy"])),
            "acc_std":       float(np.std(cvres["test_accuracy"]))
        }
        print(f"   F1(macro): {res['f1_macro_mean']:.4f} ± {res['f1_macro_std']:.4f} | "
              f"Acc: {res['acc_mean']:.4f} ± {res['acc_std']:.4f}")
        results.append(res)
    except Exception as e:
        print(f"   ⚠️ Skipped due to error: {e}")

# =========================
# PICK BEST & REFIT ON FULL DATA
# =========================
if not results:
    raise RuntimeError("No successful runs. Check dependencies and data.")

res_df = pd.DataFrame(results).sort_values(by=["f1_macro_mean", "acc_mean"], ascending=False)
print("\n===== Leaderboard (top 10) =====")
print(res_df.head(10).to_string(index=False))

best_name = res_df.iloc[0]["name"]
print(f"\n🏆 Best combo: {best_name}")

best_pipe = None
for name, pipe in combos:
    if name == best_name:
        best_pipe = pipe
        break

print("Fitting best pipeline on full dataset...")
best_pipe.fit(X_text, y)

dump(best_pipe, BEST_MODEL_PATH)
print(f"Saved best pipeline to: {BEST_MODEL_PATH}")
print(f"Saved label encoder to: {LABEL_ENCODER_PATH}")



In [None]:

res_df = pd.DataFrame(results).sort_values(by=["f1_macro_mean", "acc_mean"], ascending=False)
print("\n===== Leaderboard (top 10) =====")
print(res_df.head(10).to_string(index=False))

best_name = res_df.iloc[0]["name"]
print(f"\n🏆 Best combo: {best_name}")

best_pipe = None
for name, pipe in combos:
    if name == best_name:
        best_pipe = pipe
        break

print("Fitting best pipeline on full dataset...")
best_pipe.fit(X_text, y)

dump(best_pipe, BEST_MODEL_PATH)
print(f"Saved best pipeline to: {BEST_MODEL_PATH}")
print(f"Saved label encoder to: {LABEL_ENCODER_PATH}")

In [None]:
# Plot the model performances
import matplotlib.pyplot as plt
import seaborn as sns

fig, ax = plt.subplots(1, 2, figsize=(15, 6))

# Plot F1-macro scores
ax[0].barh(res_df['name'], res_df['f1_macro_mean'], xerr=res_df['f1_macro_std'], capsize=5)
ax[0].set_xlabel('Macro F1-score')
ax[0].set_title('Model Performance (Macro F1-score)')
ax[0].invert_yaxis() # To show the best performing model at the top

# Plot Accuracy scores
ax[1].barh(res_df['name'], res_df['acc_mean'], xerr=res_df['acc_std'], capsize=5, color='orange')
ax[1].set_xlabel('Accuracy')
ax[1].set_title('Model Performance (Accuracy)')
ax[1].invert_yaxis() # To show the best performing model at the top

plt.tight_layout()
plt.show()

# Parameter Tuning Using GridSearch

In [None]:
from sklearn.model_selection import GridSearchCV
from joblib import load

pipe = load("artifacts/best_pipeline.joblib")

param_grid = {
    "vec__ngram_range": [(3,5), (4,6)],
    "clf__C": [0.5, 1, 2, 5],
}

grid = GridSearchCV(pipe, param_grid, cv=5, scoring="f1_macro", n_jobs=-1)
grid.fit(X_text, y)

print("Best Params:", grid.best_params_)
print("Best CV F1 Macro:", grid.best_score_)