In [None]:
# Requirements
# !pip install os pymysql dotenv typing json pandas transformers torch pandas sentence-transformers scikit-learn numpy openai

# Sentimental Analysis for Spotify Playlist

## Exploratory Data Analysis

In [None]:
import libraries.db as db
import pandas as pd

We retrieve the data from the database

In [None]:
conn = db.create_connection()
tracks = db.fetch_tracks_dataset(conn)
conn.close()
print(len(tracks))

We build the dataframe to better analyze the data.

In [None]:
df = pd.DataFrame(tracks, columns=["track_spotify_id", "artist_name", "title", "keywords"])
df.head(10)

Now we are going to create a dictionary based on the main keywords to normalize them. Using embeddings, we can adjust the threshold to make the group larger or smaller—the lower the number, the larger the group. In this case, I set it to 0.4.

In [None]:
import itertools
import json
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering

# ========= 1) Create a unique keywords list =========
all_kws = list(itertools.chain.from_iterable(df['keywords']))
unique_kws = sorted({
    " ".join(str(k).lower().split())
    for k in all_kws
    if isinstance(k, str) and k.strip()
})

print(f"Total unique keywords: {len(unique_kws)}")

# ========= 2) Embeddings =========
model = SentenceTransformer('all-MiniLM-L6-v2')
emb = model.encode(unique_kws, normalize_embeddings=True)

# ========= 3) Hierarchical cluster =========
clu = AgglomerativeClustering(
    n_clusters=None,
    metric='cosine',
    linkage='average',
    distance_threshold=0.4  # ajusta el threshold: más pequeño = clusters más finos
)
labels = clu.fit_predict(emb)

# ========= 4) Build the clusters =========
clusters = {}
for kw, lab in zip(unique_kws, labels):
    clusters.setdefault(lab, []).append(kw)


# ========= 5) Choose canonical representative per cluster =========
def pick_rep(words):
    # heurística: palabra con menos tokens, si empata, la más corta
    return sorted(words, key=lambda w: (len(w.split()), len(w)))[0]


suggested_norm = {}
for words in clusters.values():
    rep = pick_rep(words)
    for w in words:
        suggested_norm[w] = rep

# ========= 6) Save suggested dictionary =========
with open("keyword_norm_suggested.json", "w", encoding="utf-8") as f:
    json.dump(suggested_norm, f, indent=2, ensure_ascii=False)

print("Example of normalized (First 20):")
for k, v in list(suggested_norm.items())[:20]:
    print(f"{k} -> {v}")


Count unique normalized keywords (the canonical representatives). If the number of suggested words is considered optimal compared to the number of unique words, we can proceed; otherwise, we adjust the threshold.

In [None]:
unique_norm = set(suggested_norm.values())
print(f"{len(unique_kws)} unique words - suggested words {len(unique_norm)}")

Keeps the original 'keywords' column and adds a new column
    'normalized_keywords' with normalized keywords as a list (unique, no duplicates).

In [None]:
def apply_normalization_array(df: pd.DataFrame, norm_dict: dict) -> pd.DataFrame:
    df_copy = df.copy()
    df_copy["normalized_keywords"] = df_copy["keywords"].apply(
        lambda kws: list({
            norm_dict.get(str(k).lower().strip(), str(k).lower().strip())
            for k in kws if isinstance(k, str) and k.strip()
        })
    )
    return df_copy


df_norm = apply_normalization_array(df, suggested_norm)
df_norm[["title", "artist_name", "keywords", "normalized_keywords"]].head(5)

We can see the top keywords, and if we want, we can go back to the dictionary creation step to re-normalize with another threshold (this is the key).

In [None]:
import itertools
from collections import Counter


def top_normalized_keywords(df: pd.DataFrame, top_n: int = 20) -> pd.DataFrame:
    """
    Returns the top N most frequent normalized keywords across all tracks.
    """
    all_kws = list(itertools.chain.from_iterable(df["normalized_keywords"]))
    counter = Counter(all_kws)
    top = counter.most_common(top_n)
    return pd.DataFrame(top, columns=["keyword", "count"])


top_keywords_df = top_normalized_keywords(df_norm, top_n=20)
top_keywords_df


To simplify the process, we will save the DataFrame with normalized keywords into a CSV file.

In [None]:
df_norm.to_csv("tracks_with_normalized_keywords.csv", index=False, encoding="utf-8")

We also save our normalized keywords in the database to ensure persistence.

In [None]:
def update_normalized_keywords(conn, df_norm):
    try:
        with conn.cursor() as cursor:
            sql = """
                UPDATE Tracks
                SET normalized_keywords = %s
                WHERE spotify_id = %s
            """
            data = []
            for _, row in df_norm.iterrows():
                norm_kw = json.dumps(row["normalized_keywords"])
                track_id = row["track_spotify_id"]
                data.append((norm_kw, track_id))

            cursor.executemany(sql, data)
        conn.commit()
        print(f"{len(data)} filas actualizadas correctamente.")
    except Exception as e:
        conn.rollback()
        print("Error al actualizar:", e)


conn = db.create_connection()
update_normalized_keywords(conn, df_norm)
conn.close()

## Sentimental Analysis

Utilities functions

In [48]:
import os
from transformers import pipeline
import json
import ast
import pandas as pd
from collections import defaultdict


# -------------------------
# Utilities
# -------------------------
def load_df_if_needed(df: pd.DataFrame | None = None,
                      csv_path: str = "tracks_with_normalized_keywords.csv") -> pd.DataFrame:
    """
    Returns a DataFrame ready for processing.
    If df is None OR df does not contain 'normalized_keywords', tries to load from CSV.
    """
    if df is None or "normalized_keywords" not in df.columns:
        if not os.path.exists(csv_path):
            raise FileNotFoundError(
                f"DataFrame is missing and '{csv_path}' was not found. "
                "Provide a DataFrame with columns: track_spotify_id, artist_name, title, keywords (list)."
            )
        df = pd.read_csv(csv_path)
    return df


def ensure_keywords_list(df: pd.DataFrame, col: str = "keywords") -> pd.DataFrame:
    """
    Robustly converts the column `col` into a Python list[str] per row.
    Handles:
      - JSON lists: ["love","party"]
      - Python repr lists: ['love', 'party']
      - Double-escaped JSON (e.g., '"[\"love\",\"party\"]"')
      - Comma-separated fallbacks: love, party
      - NaN/None → []
    """
    if col not in df.columns:
        raise KeyError(f"Column '{col}' not found in DataFrame.")

    def _strip_outer_quotes(s: str) -> str:
        # Remove a single pair of wrapping quotes if present: '"[...]' or "'[...]'"
        if len(s) >= 2 and ((s[0] == s[-1] == '"') or (s[0] == s[-1] == "'")):
            return s[1:-1]
        return s

    def _parse(x):
        # Already a list
        if isinstance(x, list):
            return [str(t).strip() for t in x if isinstance(t, (str, int, float)) and str(t).strip()]

        # Missing
        if x is None or (isinstance(x, float) and pd.isna(x)):
            return []

        # String-like
        if isinstance(x, str):
            s = x.strip()
            if not s:
                return []

            # Try to un-wrap if double-escaped: e.g., '"[\"love\",\"party\"]"'
            s_unwrapped = _strip_outer_quotes(s)

            # 1) Try JSON
            for candidate in (s, s_unwrapped):
                if candidate.startswith("[") and candidate.endswith("]"):
                    try:
                        v = json.loads(candidate)
                        if isinstance(v, list):
                            return [str(t).strip() for t in v if str(t).strip()]
                    except Exception:
                        pass

            # 2) Try Python literal (handles single quotes lists)
            for candidate in (s, s_unwrapped):
                if candidate.startswith("[") and candidate.endswith("]"):
                    try:
                        v = ast.literal_eval(candidate)
                        if isinstance(v, list):
                            return [str(t).strip() for t in v if str(t).strip()]
                    except Exception:
                        pass

            # 3) Fallback: comma-separated
            parts = [p.strip().strip("'").strip('"') for p in s.split(",")]
            return [p for p in parts if p]

        # Anything else → try stringify
        try:
            s = str(x).strip()
            if s.startswith("[") and s.endswith("]"):
                v = ast.literal_eval(s)
                if isinstance(v, list):
                    return [str(t).strip() for t in v if str(t).strip()]
        except Exception:
            pass
        return []

    out = df.copy()
    out[col] = out[col].apply(_parse)
    return out


# -------------------------
# Emotion analysis (model-based, no lexicon)
# -------------------------
# Load a pre-trained emotion classifier (GoEmotions fine-tuned DistilRoBERTa)
# 1) Multi-label emotion pipeline (GoEmotions)
_emotion_pipe = None


def get_emotion_pipeline_multilabel():
    """
    Multi-label emotion classifier based on GoEmotions.
    Uses sigmoid (not softmax) under the hood.
    """
    global _emotion_pipe
    if _emotion_pipe is None:
        _emotion_pipe = pipeline(
            "text-classification",
            model="joeddav/distilbert-base-uncased-go-emotions-student",
            return_all_scores=True,
            top_k=None,  # return all labels with scores
            truncation=True
        )
    return _emotion_pipe


# 2) Utility: classify a list of keywords IN BATCH, then aggregate per label
def analyze_emotion_from_keywords_multilabel(
        keywords: list[str],
        min_score_threshold: float = 0.15,  # keep labels with mean score >= threshold
        top_k: int = 5,  # return top-k emotions after threshold
        batch_size: int = 32,
        max_keywords: int | None = 100  # cap to avoid very long batches
) -> list[dict]:
    """
    Multi-label emotion analysis:
      - Classifies each keyword separately (batched).
      - Averages scores per emotion across keywords.
      - Filters by threshold and returns top_k labels.

    Returns a list of dicts: [{"label": "...", "score": 0.xx}, ...]
    """
    if not keywords:
        return []

    # Clean and cap keywords
    kws = [str(k).strip() for k in keywords if isinstance(k, str) and str(k).strip()]
    if not kws:
        return []
    if max_keywords is not None:
        kws = kws[:max_keywords]

    pipe = get_emotion_pipeline_multilabel()

    # Batch inference
    all_scores = pipe(kws, batch_size=batch_size)  # list of list[{"label","score"}]

    # Aggregate scores per label (mean over keywords)
    # Initialize label space from the first result
    if not all_scores or not all_scores[0]:
        return []

    label_scores = defaultdict(list)
    labels = [d["label"] for d in all_scores[0]]
    for per_kw in all_scores:
        for d in per_kw:
            label_scores[d["label"]].append(float(d["score"]))

    mean_scores = {lab: (sum(vals) / len(vals)) for lab, vals in label_scores.items()}

    # Threshold and sort
    filtered = [{"label": lab, "score": sc} for lab, sc in mean_scores.items() if sc >= min_score_threshold]
    filtered.sort(key=lambda x: x["score"], reverse=True)

    # Take top_k (if threshold filters too much and nothing remains, fall back to top_k without threshold)
    if not filtered:
        fallback = [{"label": lab, "score": sc} for lab, sc in mean_scores.items()]
        fallback.sort(key=lambda x: x["score"], reverse=True)
        return fallback[:top_k]
    return filtered[:top_k]


# 3) Attach emotions to your DataFrame using the ORIGINAL 'keywords' column
def attach_emotions_multilabel(df: pd.DataFrame, top_k: int = 5) -> pd.DataFrame:
    """
    Adds 'emotions' column with the aggregated multi-label results
    computed from the 'keywords' column for each row.
    """
    df_out = df.copy()
    df_out["emotions"] = df_out["keywords"].apply(
        lambda kws: analyze_emotion_from_keywords_multilabel(
            kws,
            min_score_threshold=0.15,
            top_k=top_k
        )
    )
    return df_out

We are going to perform sentiment analysis. To do this, we’ll use a pre-trained model: joeddav/distilbert-base-uncased-go-emotions-student. This model will help us extract the emotions from our keywords. We can define how many emotions we want to retrieve by setting the top_k parameter in our attach_emotions_multilabel function. Depending on this value, we can decide how many emotions to visualize and how deep we want the analysis to be.

In [49]:
# If you already have a DataFrame named `df`, pass it directly to attach_emotions(df, top_k=3).
# Otherwise, load from CSV (expects at least: track_spotify_id, artist_name, title, keywords):
try:
    df = load_df_if_needed(df=None, csv_path="tracks_with_normalized_keywords.csv")
except FileNotFoundError as e:
    print(e)
    raise

# Ensure keywords are lists, then attach emotions
df = ensure_keywords_list(df, col="keywords")
df = attach_emotions_multilabel(df, top_k=28)  # Top K depends on how many emotions the dataset can describe.

# Inspect result (emotions column contains top-3 emotions with scores per track)
df[["track_spotify_id", "artist_name", "title", "emotions"]].head(10)

Device set to use mps:0


Unnamed: 0,track_spotify_id,artist_name,title,emotions
0,00AoRZ8103mVeOVfpTfGuR,Luny Tunes,Mayor Que Yo 3,"[{'label': 'desire', 'score': 0.14666313482448..."
1,00BuKLSAFkaEkaVAgIMbeA,Lady Gaga,Telephone,"[{'label': 'excitement', 'score': 0.0790549959..."
2,00i0O74dXdaKKdCrqHnfXm,Ricky Martin,La Mordidita (feat. Yotuel),"[{'label': 'desire', 'score': 0.08667167080566..."
3,00iQcGMeC6agUvBjHdkAAM,La Oreja de Van Gogh,Un Cuento Sobre el Agua,"[{'label': 'caring', 'score': 0.07104973383247..."
4,017PF4Q3l4DBUiWoXk4OWT,Dua Lipa,Break My Heart,"[{'label': 'sadness', 'score': 0.0907404598547..."
5,01uqI4H13Gsd8Lyl1EYd8H,Macklemore & Ryan Lewis,Same Love (feat. Mary Lambert),"[{'label': 'realization', 'score': 0.104344944..."
6,01YedSX5OYtSCWdO1DRhvY,La Oreja de Van Gogh,20 de Enero,"[{'label': 'realization', 'score': 0.062199747..."
7,02bKaAG61tMw9c63fzKXal,Alex Ubago,Sin miedo a nada (feat. Amaia Montero),"[{'label': 'desire', 'score': 0.10555011378601..."
8,02d1E4NRuh7OEQO4vCb9PD,Lady Gaga,Marry The Night,"[{'label': 'realization', 'score': 0.068049886..."
9,02JIdsrod3BYucThfUFDUX,Camille,Le Festin,"[{'label': 'excitement', 'score': 0.0745692684..."


We verify what we obtained.

In [50]:
# Get the first non-empty emotions entry
first_non_empty = df.loc[df["emotions"].map(lambda x: isinstance(x, list) and len(x) > 0), "emotions"].iloc[0]
second = df.loc[df["emotions"].map(lambda x: isinstance(x, list) and len(x) > 0), "emotions"].iloc[1]

# Pretty print as JSON for readability
print(json.dumps(first_non_empty, indent=2))
print(json.dumps(second, indent=2))


[
  {
    "label": "desire",
    "score": 0.1466631348244846
  },
  {
    "label": "caring",
    "score": 0.08240479482337833
  },
  {
    "label": "realization",
    "score": 0.07717188512906432
  },
  {
    "label": "excitement",
    "score": 0.06025317693129182
  },
  {
    "label": "admiration",
    "score": 0.05247069443576038
  },
  {
    "label": "pride",
    "score": 0.05070672715082765
  },
  {
    "label": "approval",
    "score": 0.04951061676256359
  },
  {
    "label": "love",
    "score": 0.04948227442800999
  },
  {
    "label": "curiosity",
    "score": 0.04367535659112036
  },
  {
    "label": "optimism",
    "score": 0.03690654546953738
  },
  {
    "label": "joy",
    "score": 0.03547618095763028
  },
  {
    "label": "relief",
    "score": 0.03178699493873864
  },
  {
    "label": "amusement",
    "score": 0.026313653052784502
  },
  {
    "label": "surprise",
    "score": 0.026229654159396886
  },
  {
    "label": "neutral",
    "score": 0.02188704744912684
  },
  

We will denormalize, create an appropriate table, and insert the extracted emotions for later analysis.

1. Discover unique emotions from df

In [51]:
import re


def all_emotions_from_df(df):
    """
    Returns a sorted list of unique emotion labels present in df['emotions'].
    """
    labels = set()
    for lst in df["emotions"]:
        if isinstance(lst, list):
            for d in lst:
                lab = str(d.get("label", "")).strip().lower()
                if lab:
                    labels.add(lab)
    return sorted(labels)


def to_snake(s: str) -> str:
    """
    Safe snake_case for MySQL column names.
    """
    s = s.strip().lower()
    s = re.sub(r"[^a-z0-9]+", "_", s)
    return re.sub(r"_+", "_", s).strip("_")


emotions = all_emotions_from_df(df)
emotion_cols = [to_snake(e) for e in emotions]
print(len(emotion_cols), "emotions discovered")


28 emotions discovered


 2. Build DDL for a wide, denormalized table

In [71]:
def build_create_table_sql(table: str, emotion_cols: list[str]) -> str:
    """
    Builds a CREATE TABLE statement with one DECIMAL(6,3) column per emotion (0–100%).
    Primary key: track_spotify_id
    """
    cols_sql = ",\n  ".join([f"`{c}` DECIMAL(6,4) NOT NULL DEFAULT 0" for c in emotion_cols])
    ddl = f"""
CREATE TABLE IF NOT EXISTS `{table}` (
  `track_spotify_id` VARCHAR(64) NOT NULL,
  {cols_sql},
  PRIMARY KEY (`track_spotify_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""".strip()
    return ddl


# Example: run once in MySQL
table_name = "track_emotions_wide"
ddl = build_create_table_sql(table_name, emotion_cols)

3. Pivot df to wide (percentages per emotion)

In [73]:
import pandas as pd


def df_to_emotion_wide(df: pd.DataFrame, emotions: list[str]) -> pd.DataFrame:
    """
    Returns a wide DataFrame with one column per emotion (percentage 0–100),
    filling 0 where the emotion is absent.
    """
    cols = ["track_spotify_id"] + [to_snake(e) for e in emotions]
    out_rows = []
    for _, row in df.iterrows():
        base = {c: 0.0 for c in cols}
        base["track_spotify_id"] = str(row["track_spotify_id"]).strip()
        lst = row["emotions"] if isinstance(row["emotions"], list) else []
        for d in lst:
            lab = str(d.get("label", "")).strip().lower()
            score = round(float(d.get("score", 0.0) * 100), 4)
            col = to_snake(lab)
            if col in base:
                val = score  # store as percentage
                # if multiple entries for the same label, keep max
                base[col] = max(base[col], val)
        out_rows.append(base)
    wide = pd.DataFrame(out_rows, columns=cols)
    return wide


wide_df = df_to_emotion_wide(df, emotions)
print(f"Value: {wide_df.iloc[0, 1:].sum()}")  # The value needs to be ~= 100
wide_df.head()


Value: 100.00020000000002


Unnamed: 0,track_spotify_id,admiration,amusement,anger,annoyance,approval,caring,confusion,curiosity,desire,...,love,nervousness,neutral,optimism,pride,realization,relief,remorse,sadness,surprise
0,00AoRZ8103mVeOVfpTfGuR,5.2471,2.6314,1.0984,1.8209,4.9511,8.2405,2.1029,4.3675,14.6663,...,4.9482,1.9162,2.1887,3.6907,5.0707,7.7172,3.1787,2.1446,1.4251,2.623
1,00BuKLSAFkaEkaVAgIMbeA,2.5483,4.9206,2.3158,4.3069,4.0989,6.2656,3.555,4.1534,4.7432,...,2.3739,2.8681,2.2161,2.6131,4.2846,4.9172,3.3866,2.7343,2.6041,2.4336
2,00i0O74dXdaKKdCrqHnfXm,2.4668,4.2057,2.0524,3.5313,4.0391,5.9327,2.2842,4.1299,8.6672,...,3.381,2.9953,2.067,3.8467,4.0853,5.7221,4.3536,2.4415,1.8666,2.9859
3,00iQcGMeC6agUvBjHdkAAM,3.1316,4.4714,1.6054,2.7343,3.8828,7.105,2.9283,5.0686,5.7196,...,4.888,3.0396,2.4263,6.8149,3.7328,5.413,5.2132,3.2846,2.7707,2.882
4,017PF4Q3l4DBUiWoXk4OWT,1.6237,2.1248,1.3385,1.8788,2.496,6.4593,3.7262,3.262,3.6006,...,4.1245,2.7741,1.6141,3.6729,2.0447,6.1625,2.7624,6.5137,9.074,2.2786


4. Bulk upsert into MySQL (denormalized table)

In [74]:
import libraries.db as db


def upsert_emotion_wide(conn, table: str, wide_df: pd.DataFrame):
    """
    Bulk UPSERT wide_df into MySQL table:
    - INSERT ... ON DUPLICATE KEY UPDATE for all emotion columns.
    """
    if wide_df.empty:
        return
    cols = list(wide_df.columns)  # first col must be track_spotify_id
    placeholders = ", ".join(["%s"] * len(cols))
    col_list = ", ".join([f"`{c}`" for c in cols])
    update_clause = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in cols[1:]])  # skip PK

    sql = f"""
        INSERT INTO `{table}` ({col_list})
        VALUES ({placeholders})
        ON DUPLICATE KEY UPDATE {update_clause}
    """.strip()

    with conn.cursor() as cur:
        cur.executemany(sql, [tuple(row) for row in wide_df.itertuples(index=False, name=None)])
    conn.commit()


def truncate_track_emotions_wide(conn):
    try:
        with conn.cursor() as cursor:
            cursor.execute("TRUNCATE TABLE track_emotions_wide;")
        conn.commit()
    except Exception as e:
        conn.rollback()


conn = db.create_connection()
truncate_track_emotions_wide(conn)
with conn.cursor() as cur:
    cur.execute(ddl)
conn.commit()
upsert_emotion_wide(conn, table_name, wide_df)
conn.close()

We store the extracted emotions in the track table of the database for added reliability.

In [75]:
def update_track_emotions(conn, df):
    """
    Update the 'emotions' field in tracks table.
    - df must have columns: track_spotify_id, emotions
    - emotions is a list[dict] -> will be stored as JSON string
    - match df.track_spotify_id to tracks.spotify_id
    """
    try:
        with conn.cursor() as cur:
            sql = """
                UPDATE tracks
                SET emotions = %s
                WHERE spotify_id = %s
            """
            data = []
            for _, row in df.iterrows():
                spid = str(row["track_spotify_id"]).strip()
                emos = row["emotions"]

                # Ensure JSON string
                emo_json = json.dumps(emos, ensure_ascii=False) if emos is not None else "[]"

                data.append((emo_json, spid))

            cur.executemany(sql, data)

        conn.commit()
        print(f"Updated emotions for {len(data)} tracks.")

    except Exception as e:
        conn.rollback()
        print("Error while updating emotions:", e)
        raise


conn = db.create_connection()
update_track_emotions(conn, df)
conn.close()

Updated emotions for 911 tracks.


## Emotion Dictionary Creation

In this step, we will **extract the emotion columns from `wide_df`**, use the language model (LLM) to classify them as **Positive, Neutral, or Negative**, and also assign a representative **emoji**.

With this information, we will build an **emotion dictionary** and store it in the database in the `emotions_dictionary` table (creating or truncating it beforehand).

In [58]:
import os
from dotenv import load_dotenv

# Carga las variables del archivo .env en el entorno
load_dotenv()

# Ahora puedes verificar que la clave está disponible
print("✅ OPENAI_API_KEY found:", bool(os.getenv("OPENAI_API_KEY")))

✅ OPENAI_API_KEY found: True


In [59]:
# =========================
# Emotions Dictionary: wide_df -> LLM -> DB
# =========================
# Requisitos:
#   - pip install openai python-dotenv pymysql
#   - .env con OPENAI_API_KEY=<tu_key>
#   - objeto 'db' con db.create_connection() -> PyMySQL connection
#   - DataFrame 'wide_df' ya en memoria
#
# Qué hace:
#   1) Extrae emociones desde wide_df.columns (excluye 'track_spotify_id')
#   2) Pide al LLM: emotion -> {emotion, normalize (Positive|Neutral|Negative), emoji}
#   3) Crea/TRUNCATE emotions_dictionary
#   4) Inserta mapping en MySQL
#   5) Devuelve el mapping

import os
import re
import json
import time
from typing import List, Dict, Any, Optional

import pymysql
from pymysql.cursors import DictCursor

# 1) Cargar .env (si existe)
try:
    from dotenv import load_dotenv

    load_dotenv()
except Exception:
    pass

# 2) Cliente OpenAI (lee OPENAI_API_KEY del entorno)
from openai import OpenAI

_client = OpenAI()  # si no existe la var, lanzará error al invocar


# ---------- Utilidades ----------

def _parse_json_lenient(text: str) -> Any:
    """
    Intenta parsear JSON aunque el modelo devuelva texto extra o ```json fences.
    """
    if text is None:
        raise ValueError("Empty LLM response.")
    cleaned = text.strip()

    # quitar fences ```json ... ```
    cleaned = re.sub(r"^```(?:json|JSON)?\s*", "", cleaned)
    cleaned = re.sub(r"\s*```$", "", cleaned)

    # intento directo
    try:
        return json.loads(cleaned)
    except Exception:
        pass

    # buscar objeto {...}
    s, e = cleaned.find("{"), cleaned.rfind("}")
    if s != -1 and e != -1 and s < e:
        try:
            return json.loads(cleaned[s:e + 1])
        except Exception:
            pass

    # buscar array [...]
    s, e = cleaned.find("["), cleaned.rfind("]")
    if s != -1 and e != -1 and s < e:
        try:
            return json.loads(cleaned[s:e + 1])
        except Exception:
            pass

    raise ValueError(f"Could not parse JSON from LLM output. Got:\n{cleaned[:500]}")


def _canon_norm(label: str) -> str:
    """
    Normaliza etiquetas a Positive|Neutral|Negative (acepta inglés/español/minúsculas).
    """
    if not label:
        return "Neutral"
    l = label.strip().lower()
    if l.startswith(("pos", "poz", "positivo")):
        return "Positive"
    if l.startswith(("neu", "neutro")):
        return "Neutral"
    if l.startswith(("neg", "negativo")):
        return "Negative"
    return "Neutral"


# ---------- LLM ----------

def llm_classify_emotions_with_openai(
        emotions: List[str],
        model: str = "gpt-4o-mini",
        temperature: float = 0.0,
        max_retries: int = 3,
        dry_run: bool = False,
        debug_print: bool = False,
) -> List[Dict[str, Any]]:
    """
    Pide al chat.completions un JSON con:
    [ {emotion, normalize (FP:Full Positive|MP: Positive|N:Neutral|MN:Mid Negative|FN: Full Negative), emoji}, ... ]
    Usa parser tolerante para extraer el JSON.
    """
    if not emotions:
        return []
    if dry_run:
        return [{"emotion": e, "normalize": "N", "emoji": "❓"} for e in emotions]

    system_msg = '''
    You are a precise JSON generator.
    For each input emotion, return a JSON array of objects with the following keys:
    - "emotion": the emotion name,
    - "normalize": one of ["FP" (Full Positive), "MP" (Mid Positive), "N" (Neutral), "MN" (Mid Negative), "FN" (Full Negative)],
    - "emoji": a single representative emoji.

    Return ONLY a valid JSON array, with no explanations and no markdown.
    '''
    user_msg = "Emotions:\n" + json.dumps(emotions, ensure_ascii=False)

    last_text: Optional[str] = None
    for attempt in range(1, max_retries + 1):
        try:
            resp = _client.chat.completions.create(
                model=model,
                temperature=temperature,
                messages=[
                    {"role": "system", "content": system_msg},
                    {"role": "user", "content": user_msg},
                ],
                max_tokens=1200,
            )
            text = resp.choices[0].message.content
            last_text = text
            parsed = _parse_json_lenient(text)

            # Acepta lista directa o objeto con "result"
            if isinstance(parsed, dict) and "result" in parsed and isinstance(parsed["result"], list):
                items = parsed["result"]
            elif isinstance(parsed, list):
                items = parsed
            else:
                raise ValueError("Parsed JSON is not a list nor an object with 'result'.")

            out: List[Dict[str, Any]] = []
            for row in items:
                if not isinstance(row, dict):
                    continue
                emo = str(row.get("emotion", "")).strip()
                norm = str(row.get("normalize", "")).strip()
                emoji = str(row.get("emoji", "")).strip()
                if emo and emoji:
                    out.append({"emotion": emo, "normalize": norm, "emoji": emoji})

            if not out:
                raise ValueError("Empty mapping after validation.")
            return out

        except Exception as e:
            if debug_print and last_text:
                print("LLM RAW OUTPUT (truncated):\n", last_text[:800])
            if attempt == max_retries:
                raise
            time.sleep(0.7 * attempt)


# ---------- DB ----------

def create_and_truncate_emotions_dictionary():
    """
    Crea la tabla emotions_dictionary si no existe y la limpia (TRUNCATE).
    """
    conn = db.create_connection()
    try:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS emotions_dictionary (
                    emotion   VARCHAR(128) PRIMARY KEY,
                    normalize VARCHAR(20) NOT NULL,
                    emoji     VARCHAR(16) NOT NULL
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
            """)
            cur.execute("TRUNCATE TABLE emotions_dictionary;")
        conn.commit()
    finally:
        try:
            conn.close()
        except:
            pass


def insert_emotions_dictionary(rows: List[Dict[str, Any]]):
    """
    Inserta filas en emotions_dictionary: [{emotion, normalize, emoji}, ...]
    """
    if not rows:
        return
    conn = db.create_connection()
    try:
        with conn.cursor() as cur:
            cur.executemany(
                """
                INSERT INTO emotions_dictionary (emotion, normalize, emoji)
                VALUES (%s, %s, %s)
                """,
                [(r["emotion"], r["normalize"], r["emoji"]) for r in rows]
            )
        conn.commit()
    finally:
        try:
            conn.close()
        except:
            pass


# ---------- Orquestación ----------

def build_and_store_emotions_dictionary_with_llm(
        wide_df,
        llm_model: str = "gpt-4o-mini",
        temperature: float = 0.0,
        dry_run_llm: bool = False,
        verbose: bool = False,
) -> List[Dict[str, Any]]:
    """
    1) Toma columnas de wide_df (excluye 'track_spotify_id') => emociones.
    2) Llama LLM para clasificar y asignar emoji.
    3) Crea/TRUNCATE emotions_dictionary y guarda.
    4) Devuelve la lista final insertada.
    """
    # 1) Extraer nombres de emociones desde los headers del DF
    all_cols = list(wide_df.columns)
    emotions_in_df = [c for c in all_cols if c.lower() != "track_spotify_id"]
    if verbose:
        print("Emotions from wide_df:", emotions_in_df)

    if not emotions_in_df:
        raise ValueError("No emotion columns found in wide_df (other than 'track_spotify_id').")

    # 2) LLM -> mapping
    mapping = llm_classify_emotions_with_openai(
        emotions=emotions_in_df,
        model=llm_model,
        temperature=temperature,
        dry_run=dry_run_llm,
        debug_print=verbose,
    )

    # 3) Ajustar a headers exactos (por si el LLM cambia mayúsculas/minúsculas)
    by_lower_original = {c.lower(): c for c in emotions_in_df}
    cleaned: List[Dict[str, Any]] = []
    for item in mapping:
        key = item["emotion"].strip().lower()
        if key in by_lower_original:
            cleaned.append({
                "emotion": by_lower_original[key],
                "normalize": item["normalize"],
                "emoji": item["emoji"]
            })

    if not cleaned:
        raise ValueError("LLM produced no valid emotions present in wide_df.")

    # 4) Persistir en DB
    create_and_truncate_emotions_dictionary()
    insert_emotions_dictionary(cleaned)

    return cleaned


In [60]:
result = build_and_store_emotions_dictionary_with_llm(
    wide_df,
    llm_model="gpt-4o-mini",
    temperature=0.2,
    dry_run_llm=False,  # True para probar sin consumir API
    verbose=True  # imprime las columnas/RAW si hay errores
)
print(f"Inserted {len(result)} emotions into emotions_dictionary")
result

Emotions from wide_df: ['admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief', 'joy', 'love', 'nervousness', 'neutral', 'optimism', 'pride', 'realization', 'relief', 'remorse', 'sadness', 'surprise']
Inserted 28 emotions into emotions_dictionary


[{'emotion': 'admiration', 'normalize': 'FP', 'emoji': '😍'},
 {'emotion': 'amusement', 'normalize': 'FP', 'emoji': '😂'},
 {'emotion': 'anger', 'normalize': 'FN', 'emoji': '😠'},
 {'emotion': 'annoyance', 'normalize': 'MN', 'emoji': '😒'},
 {'emotion': 'approval', 'normalize': 'FP', 'emoji': '👍'},
 {'emotion': 'caring', 'normalize': 'FP', 'emoji': '❤️'},
 {'emotion': 'confusion', 'normalize': 'N', 'emoji': '😕'},
 {'emotion': 'curiosity', 'normalize': 'MP', 'emoji': '🤔'},
 {'emotion': 'desire', 'normalize': 'FP', 'emoji': '😍'},
 {'emotion': 'disappointment', 'normalize': 'MN', 'emoji': '😞'},
 {'emotion': 'disapproval', 'normalize': 'FN', 'emoji': '👎'},
 {'emotion': 'disgust', 'normalize': 'FN', 'emoji': '🤢'},
 {'emotion': 'embarrassment', 'normalize': 'MN', 'emoji': '😳'},
 {'emotion': 'excitement', 'normalize': 'FP', 'emoji': '🎉'},
 {'emotion': 'fear', 'normalize': 'FN', 'emoji': '😨'},
 {'emotion': 'gratitude', 'normalize': 'FP', 'emoji': '🙏'},
 {'emotion': 'grief', 'normalize': 'FN', 'emo

# CLustering

We will now apply data clustering to generate groups that will serve as playlists, built around key aspects of related songs

In [None]:
import json, ast
import pandas as pd


def ensure_list_column(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """
    Ensures column `col` is a Python list[str].
    Handles:
      - Python repr lists with single quotes: "['a', 'b']"
      - JSON lists: ["a","b"]
      - Comma-separated fallback: a, b
    """

    def _parse(x):
        if isinstance(x, list):
            return [str(t).strip() for t in x if str(t).strip()]
        if x is None or (isinstance(x, float) and pd.isna(x)):
            return []
        if isinstance(x, str):
            s = x.strip()
            if not s:
                return []
            # 1) Try Python literal (handles single quotes)
            if s.startswith("[") and s.endswith("]"):
                try:
                    v = ast.literal_eval(s)
                    if isinstance(v, list):
                        return [str(t).strip() for t in v if str(t).strip()]
                except Exception:
                    pass
                # 2) Try JSON (double quotes)
                try:
                    v = json.loads(s)
                    if isinstance(v, list):
                        return [str(t).strip() for t in v if str(t).strip()]
                except Exception:
                    pass
                # 3) Fallback: split inner by comma
                inner = s[1:-1]
                parts = [p.strip().strip("'").strip('"') for p in inner.split(",")]
                return [p for p in parts if p]
            # 4) Plain comma-separated
            return [t.strip() for t in s.split(",") if t.strip()]
        # last resort
        return [str(x).strip()] if str(x).strip() else []

    out = df.copy()
    out[col] = out[col].apply(_parse)
    return out

We check that the normalized_keywords column is of type list.

In [None]:
# Convert the column from string to list[str]
df = ensure_list_column(df, col="normalized_keywords")

# Sanity check
print(type(df.loc[df.index[0], "normalized_keywords"]), df.loc[df.index[0], "normalized_keywords"][:5])

This script implements an end-to-end pipeline to group songs into **thematic playlists** using **semantic keyword embeddings** and clustering with **KMeans**.

1. Build embeddings per track
- Each song has a set of *normalized keywords*.
- Using a *Sentence Transformers* model (`all-MiniLM-L6-v2`), vector embeddings are generated for those keywords.
- A *mean pooling* operation (averaging) is applied to obtain a single dense vector representing the entire track.

2. Clustering with KMeans
- The embeddings of all tracks are clustered into *k* groups using KMeans.
- The **Silhouette Score** is computed to quickly evaluate clustering quality.

3. Attach clusters back to the DataFrame
- A new column `cluster_emb` is added to the original DataFrame, indicating the cluster assignment for each track.

4. Cluster interpretability
- The most frequent keywords per cluster are extracted, producing a top-N list that summarizes the common themes of each group.
- Representative “Artist — Title” examples are sampled for each cluster, making it easier to inspect them manually.

In [None]:
import numpy as np
import pandas as pd
from typing import List
from collections import Counter, defaultdict
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score


# ---------------------------
# 1) Build embeddings per track
# ---------------------------
def compute_track_embeddings(
        df: pd.DataFrame,
        keywords_col: str = "normalized_keywords",
        model_name: str = "all-MiniLM-L6-v2",
        normalize: bool = True,
        max_keywords: int | None = None
) -> tuple[np.ndarray, List[int], SentenceTransformer]:
    """
    Computes a dense embedding per track by averaging the embeddings of its normalized keywords.
    Returns:
      - X: np.ndarray of shape (n_tracks, dim)
      - valid_idx: indices of rows that have at least 1 keyword (used to subset df later)
      - model: the SentenceTransformer model used
    """
    model = SentenceTransformer(model_name)
    vectors = []
    valid_idx = []

    for i, kws in enumerate(df[keywords_col].tolist()):
        if not isinstance(kws, list) or len(kws) == 0:
            continue
        toks = [str(k).strip() for k in kws if isinstance(k, str) and str(k).strip()]
        if not toks:
            continue
        if max_keywords is not None:
            toks = toks[:max_keywords]

        kw_emb = model.encode(toks, normalize_embeddings=normalize)
        if isinstance(kw_emb, list):
            kw_emb = np.asarray(kw_emb)
        track_vec = kw_emb.mean(axis=0)  # mean pooling over keywords
        vectors.append(track_vec)
        valid_idx.append(i)

    if not vectors:
        raise ValueError("No tracks produced embeddings. Check 'normalized_keywords' column.")
    X = np.vstack(vectors)
    return X, valid_idx, model


# ---------------------------
# 2) KMeans clustering
# ---------------------------
def cluster_tracks_embeddings(
        X: np.ndarray,
        k: int = 8,
        random_state: int = 42
) -> tuple[np.ndarray, KMeans, float]:
    """
    Clusters the embedding matrix X with KMeans.
    Returns:
      - labels: cluster assignment per row in X
      - kmeans: fitted model
      - sil: silhouette score (for quick quality check)
    """
    km = KMeans(n_clusters=k, random_state=random_state, n_init="auto")
    labels = km.fit_predict(X)
    sil = silhouette_score(X, labels) if len(set(labels)) > 1 else np.nan
    return labels, km, sil


# ---------------------------
# 3) Attach clusters back to df
# ---------------------------
def attach_clusters_to_df(
        df: pd.DataFrame,
        labels: np.ndarray,
        valid_idx: List[int],
        cluster_col: str = "cluster_emb"
) -> pd.DataFrame:
    """
    Adds the cluster labels to a copy of df at the indices used for embeddings.
    Rows without embeddings remain with NaN clusters.
    """
    out = df.copy()
    out[cluster_col] = np.nan
    out.loc[out.index[valid_idx], cluster_col] = labels
    if out[cluster_col].isna().any():
        out[cluster_col] = out[cluster_col].astype("Int64")
    return out


# ---------------------------
# 4) Cluster summaries (human-readable)
# ---------------------------
def top_keywords_per_cluster(
        df_with_clusters: pd.DataFrame,
        cluster_col: str = "cluster_emb",
        keywords_col: str = "normalized_keywords",
        top_n: int = 12
) -> dict[int, List[tuple[str, int]]]:
    """
    For interpretability: returns top-N most frequent normalized keywords per cluster.
    """
    summary = {}
    for c in sorted(df_with_clusters[cluster_col].dropna().unique()):
        sub = df_with_clusters[df_with_clusters[cluster_col] == c]
        cnt = Counter()
        for kws in sub[keywords_col]:
            if isinstance(kws, list):
                cnt.update([k for k in kws if isinstance(k, str) and k.strip()])
        summary[int(c)] = cnt.most_common(top_n)
    return summary


def sample_titles_per_cluster(
        df_with_clusters: pd.DataFrame,
        cluster_col: str = "cluster_emb",
        n_samples: int = 5
) -> dict[int, List[str]]:
    """
    Returns up to n_samples "Artist — Title" examples per cluster.
    """
    examples = {}
    for c in sorted(df_with_clusters[cluster_col].dropna().unique()):
        sub = df_with_clusters[df_with_clusters[cluster_col] == c].head(n_samples)
        examples[int(c)] = [
            f"{row['artist_name']} — {row['title']}"
            for _, row in sub.iterrows()
        ]
    return examples


# ---------------------------
# 5) End-to-end usage
# ---------------------------
# df is your DataFrame with columns:
# - track_spotify_id
# - artist_name
# - title
# - normalized_keywords (list[str])

# Build embeddings
X, valid_idx, st_model = compute_track_embeddings(
    df, keywords_col="normalized_keywords", model_name="all-MiniLM-L6-v2", normalize=True
)

# Cluster (choose k based on your dataset size; start with 6–12)
labels, kmeans, sil = cluster_tracks_embeddings(X, k=15, random_state=42)
print("Silhouette:", sil)

# Attach cluster labels back to df
df_emb = attach_clusters_to_df(df, labels, valid_idx, cluster_col="cluster_emb")

# Inspect
df_emb[["track_spotify_id", "artist_name", "title", "cluster_emb"]].head(10)

# Top keywords per cluster (for interpretation)
cluster_keywords = top_keywords_per_cluster(df_emb, cluster_col="cluster_emb", keywords_col="normalized_keywords",
                                            top_n=20)
for cid, tops in cluster_keywords.items():
    print(f"\nCluster {cid} top keywords:")
    print(", ".join([f"{w}({c})" for w, c in tops]))

# Optional: sample titles per cluster
examples = sample_titles_per_cluster(df_emb, cluster_col="cluster_emb", n_samples=10)
for cid, ex in examples.items():
    print(f"\nCluster {cid} examples:")
    for s in ex:
        print(" -", s)


Here we’ll do something interesting: we will send our clusters with their top keywords to the AI so that it can return a name and an engaging description, which we will use to build our playlists.

In [None]:
import os, json, re
import pandas as pd
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


def build_cluster_payload(df_with_clusters: pd.DataFrame,
                          cluster_col: str = "cluster_emb",
                          keywords_col: str = "normalized_keywords",
                          title_cols=("artist_name", "title"),
                          top_k_keywords: int = 15,
                          max_examples: int = 5) -> dict:
    """
    Prepares a payload per cluster:
      {cluster_id: {"keywords": [...], "examples": ["Artist — Title", ...]}}
    """
    payload = {}
    for cid in sorted(df_with_clusters[cluster_col].dropna().unique()):
        sub = df_with_clusters[df_with_clusters[cluster_col] == cid]
        # top keywords by simple frequency inside the cluster
        kw_series = sub[keywords_col].explode()
        kw_series = kw_series[kw_series.notna()].astype(str).str.strip().str.lower()
        top = (kw_series.value_counts().head(top_k_keywords).index.tolist()
               if not kw_series.empty else [])
        payload[int(cid)] = {"keywords": top}
    return payload


def name_clusters_with_llm(cluster_payload: dict,
                           model: str = "gpt-4.1-mini",
                           temperature: float = 0.2) -> dict:
    """
    Sends cluster summaries to the LLM and returns:
      {cluster_id: {"name": str, "description": str}}
    Robust JSON parsing with fallback.
    """
    # Build prompt
    items = []
    for cid, data in cluster_payload.items():
        kws = ", ".join(data["keywords"])
        items.append(
            f"Cluster {cid}:\n"
            f"- Top keywords: {kws or '(none)'}\n"
        )
    clusters_block = "\n\n".join(items)

    prompt = (
        "You are an expert music curator. Name each cluster of songs concisely.\n"
        "Rules:\n"
        "- Provide a short, human-friendly name (≤ 4 words) and a 1–2 sentence description.\n"
        "- Avoid artist names and proper nouns; focus on themes/moods.\n"
        "- Prefer genre/vibe/emotion terms (e.g., 'Dancefloor Energy', 'Melancholic Love').\n"
        "- Keep names in Title Case, English only.\n"
        "Return ONLY a JSON object mapping cluster id to an object with fields 'name' and 'description'.\n\n"
        "Example output format:\n"
        "{\n"
        "  \"0\": {\"name\": \"Late-Night Romance\", \"description\": \"Sensual, intimate themes with slow dance vibes.\"},\n"
        "  \"1\": {\"name\": \"Party Anthems\", \"description\": \"High-energy club tracks centered on dancing and celebration.\"}\n"
        "}\n\n"
        "Clusters to label:\n"
        f"{clusters_block}\n\n"
        "Now return ONLY the JSON."
    )

    resp = client.responses.create(model=model, input=prompt, temperature=temperature)
    text = resp.output_text.strip()

    # Parse JSON robustly
    def parse_json_maybe(s: str) -> dict:
        try:
            return json.loads(s)
        except Exception:
            m = re.search(r"\{(?:.|\n)*\}\s*$", s)  # grab last JSON-looking block
            if m:
                try:
                    return json.loads(m.group(0))
                except Exception:
                    return {}
            return {}

    data = parse_json_maybe(text)
    # Normalize keys to ints if possible
    out = {}
    for k, v in data.items():
        try:
            cid = int(k)
        except Exception:
            cid = k
        name = (v or {}).get("name", "").strip()
        desc = (v or {}).get("description", "").strip()
        if name:
            out[cid] = {"name": name, "description": desc}
    return out


def attach_cluster_names(df_with_clusters: pd.DataFrame,
                         labels_map: dict,
                         cluster_col: str = "cluster_emb") -> pd.DataFrame:
    """
    Adds 'cluster_name' and 'cluster_desc' columns using labels_map from LLM.
    """
    out = df_with_clusters.copy()
    out["cluster_name"] = out[cluster_col].map(
        lambda c: labels_map.get(int(c), {}).get("name") if pd.notna(c) else None)
    out["cluster_desc"] = out[cluster_col].map(
        lambda c: labels_map.get(int(c), {}).get("description") if pd.notna(c) else None)
    return out


In [None]:
# 1) Build payload from your clustered DataFrame (assumes 'cluster_emb' and 'normalized_keywords')
payload = build_cluster_payload(df_emb, cluster_col="cluster_emb", keywords_col="normalized_keywords",
                                top_k_keywords=20, max_examples=5)

# 2) Ask the LLM for names/descriptions
labels_map = name_clusters_with_llm(payload, model="gpt-4.1-mini", temperature=0.2)

# 3) Attach names back to your DataFrame
df_named = attach_cluster_names(df_emb, labels_map, cluster_col="cluster_emb")
df_named[["cluster_emb", "cluster_name", "cluster_desc"]].drop_duplicates().sort_values("cluster_emb").head(20)



We update the CSV to streamline the process.

In [None]:
df_emb.to_csv("tracks_with_normalized_keywords.csv", index=False, encoding="utf-8")

In this step, we connected our DataFrame (`df_named`) with the database to properly manage clusters and link them to tracks.

In [None]:
import pymysql
import pandas as pd


def sync_clusters_and_update_tracks(conn, df_named: pd.DataFrame):
    """
    Safe sync without TRUNCATE (FK-friendly).

    Steps:
    1) UPDATE tracks SET cluster_id = NULL        # break FK references safely
    2) DELETE FROM clusters;                      # clear parent table
    3) ALTER TABLE clusters AUTO_INCREMENT = 1;   # optional reset
    4) INSERT unique (name, description) from df_named
    5) SELECT id, name FROM clusters -> name→id map
    6) UPDATE tracks.cluster_id by matching spotify_id (df) to tracks.spotify_id (db)
    """

    required_cols = {"cluster_name", "cluster_desc", "track_spotify_id"}
    missing = required_cols - set(df_named.columns)
    if missing:
        raise ValueError(f"df_named is missing required columns: {missing}")

    clusters_df = (
        df_named[["cluster_name", "cluster_desc"]]
        .dropna(subset=["cluster_name"])
        .drop_duplicates(subset=["cluster_name"])
        .rename(columns={"cluster_name": "name", "cluster_desc": "description"})
    )
    track_map_df = df_named.dropna(subset=["cluster_name"])[["track_spotify_id", "cluster_name"]]

    try:
        with conn.cursor() as cur:
            # 1) Break FK references in child table
            cur.execute("UPDATE tracks SET cluster_id = NULL;")

            # 2) Clear parent table without TRUNCATE (FK-safe)
            cur.execute("DELETE FROM clusters;")

            # 3) Optionally reset AUTO_INCREMENT
            cur.execute("ALTER TABLE clusters AUTO_INCREMENT = 1;")

            # 4) Insert new clusters
            cur.executemany(
                "INSERT INTO clusters (name, description) VALUES (%s, %s)",
                [(str(r["name"]).strip(),
                  str(r["description"]).strip() if pd.notna(r["description"]) else "")
                 for _, r in clusters_df.iterrows()]
            )

        # 5) Build name -> id map
        with conn.cursor(pymysql.cursors.DictCursor) as cur:
            cur.execute("SELECT id, name FROM clusters;")
            rows = cur.fetchall()
            name_to_id = {row["name"]: row["id"] for row in rows}

        # 6) Update tracks with cluster_id
        with conn.cursor() as cur:
            update_sql = """
                UPDATE tracks
                SET cluster_id = %s
                WHERE spotify_id = %s
            """
            data = []
            for _, r in track_map_df.iterrows():
                name = str(r["cluster_name"]).strip()
                track_id = str(r["track_spotify_id"]).strip()
                cid = name_to_id.get(name)
                if cid is not None and track_id:
                    data.append((cid, track_id))
            if data:
                cur.executemany(update_sql, data)

        conn.commit()
        print(f"Inserted clusters: {len(clusters_df)} | Updated tracks: {len(data)}")

    except Exception:
        conn.rollback()
        raise


conn = db.create_connection()
sync_clusters_and_update_tracks(conn, df_named)
conn.close()

# Show Results

## Useful Functions

We’ll start with functions that will help us get what we want. In this case, we’re going to create a function that retrieves the overall sentiment of all the songs along with the specific data, taking the top 5 most representative songs for each emotion.
We’ll also be able to see it by album, and even by individual song if we want. Additionally, we’ll obtain the suggested playlists along with their main emotion.

####
1. Get the overall emotion of the entire Spotify

In [93]:
from typing import Dict, List, Any
from pymysql.cursors import DictCursor


def get_emotions_from_dictionary() -> list[str]:
    """
    Step 1: Fetch all emotions from emotions_dictionary.
    """
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute("SELECT emotion FROM emotions_dictionary")
            emotions = [r["emotion"] for r in cur.fetchall()]
    finally:
        try:
            conn.close()
        except:
            pass

    if not emotions:
        raise ValueError("No emotions found in emotions_dictionary")

    return emotions


def build_avg_emotions_sql(emotions: list[str]) -> str:
    """
    Step 2: Build a dynamic SQL query with AVG(col) for each emotion.
    Note: we use aliases equal to the original emotion names.
    """
    avg_exprs = [f"AVG(`{emo}`) AS `{emo}`" for emo in emotions]
    sql = f"SELECT {', '.join(avg_exprs)} FROM track_emotions_wide"
    return sql


def execute_avg_emotions_query(sql: str) -> dict:
    """
    Step 3: Execute the generated SQL query and return the results
    with keys = original emotion names.
    """
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql)
            row = cur.fetchone()
    finally:
        try:
            conn.close()
        except:
            pass

    return {k: float(v) if v is not None else None for k, v in row.items()}


def fetch_avg_per_emotion() -> dict:
    """
    Orchestrates the 3 steps:
    1. Fetch emotions from the dictionary,
    2. Build the dynamic SQL query,
    3. Execute the query and return the dict with averages.
    """
    emotions = get_emotions_from_dictionary()
    sql = build_avg_emotions_sql(emotions)
    result = execute_avg_emotions_query(sql)
    return result


def fetch_emotion_normalize_mapping() -> Dict[str, str]:
    """
    Step 1: Query emotions_dictionary and return a mapping:
      { emotion_lower: normalize_code }

    Works case-insensitive by lowering all emotion names.
    """
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute("SELECT emotion, normalize FROM emotions_dictionary")
            rows = cur.fetchall()
    finally:
        try:
            conn.close()
        except:
            pass

    if not rows:
        raise ValueError("emotions_dictionary is empty or missing.")

    # Build a dict where keys are lowercase emotion names and values are normalize codes
    return {r["emotion"].strip().lower(): str(r["normalize"]).strip() for r in rows}


def group_avg_emotions_by_normalize(avg_per_emotion: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Step 2: Receive a dict with per-emotion averages, e.g.:
      {'admiration': 3.2, 'anger': 1.8, ...}

    Group those averages by their normalize category from emotions_dictionary.

    Returns a list of dicts with:
      - normalize: category (FP/MP/N/MN/FN or Positive/Neutral/Negative)
      - sum_avg: sum of averages of all emotions in this group
      - mean_avg: mean of those averages (sum_avg / emotion_count)
      - emotion_count: how many emotions were grouped
    """
    if not isinstance(avg_per_emotion, dict) or not avg_per_emotion:
        raise ValueError("avg_per_emotion must be a non-empty dict like {'admiration': 3.2, ...}")

    # Get mapping from emotions to normalize codes
    mapping = fetch_emotion_normalize_mapping()  # {emotion_lower: normalize}
    groups: Dict[str, List[float]] = {}

    # Match emotions to normalize groups
    for emo_name, avg_val in avg_per_emotion.items():
        if avg_val is None:
            continue
        try:
            v = float(avg_val)
        except (TypeError, ValueError):
            continue
        norm = mapping.get(str(emo_name).strip().lower())
        if not norm:
            # Emotion not found in dictionary → skip
            continue
        groups.setdefault(norm, []).append(v)

    # Build final result list
    results: List[Dict[str, Any]] = []
    # Preferred order (if you are using 5-level system); fallback: alphabetical
    preferred_order = ["FP", "MP", "N", "MN", "FN", "Positive", "Neutral", "Negative"]
    ordered_norms = [n for n in preferred_order if n in groups] + \
                    [n for n in sorted(groups.keys()) if n not in preferred_order]

    for norm in ordered_norms:
        vals = groups[norm]
        s = sum(vals)
        c = len(vals)
        results.append({
            "normalize": norm,
            "sum_avg": s,
            "emotion_count": c,
        })

    return results

In [127]:
from typing import List, Dict, Any
from pymysql.cursors import DictCursor

def fetch_emotions_dictionary() -> List[Dict[str, Any]]:
    """
    Fetch all rows from emotions_dictionary.
    Returns a list of dicts with keys: emotion, normalize, emoji.
    """
    sql = "SELECT emotion, emoji FROM emotions_dictionary ORDER BY emotion"

    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
    finally:
        try: conn.close()
        except: pass

    return rows


In [123]:
fetch_emotions_dictionary()

[{'emotion': 'admiration', 'normalize': 'FP', 'emoji': '😍'},
 {'emotion': 'amusement', 'normalize': 'FP', 'emoji': '😂'},
 {'emotion': 'anger', 'normalize': 'FN', 'emoji': '😠'},
 {'emotion': 'annoyance', 'normalize': 'MN', 'emoji': '😒'},
 {'emotion': 'approval', 'normalize': 'FP', 'emoji': '👍'},
 {'emotion': 'caring', 'normalize': 'FP', 'emoji': '❤️'},
 {'emotion': 'confusion', 'normalize': 'N', 'emoji': '😕'},
 {'emotion': 'curiosity', 'normalize': 'MP', 'emoji': '🤔'},
 {'emotion': 'desire', 'normalize': 'FP', 'emoji': '😍'},
 {'emotion': 'disappointment', 'normalize': 'MN', 'emoji': '😞'},
 {'emotion': 'disapproval', 'normalize': 'FN', 'emoji': '👎'},
 {'emotion': 'disgust', 'normalize': 'FN', 'emoji': '🤢'},
 {'emotion': 'embarrassment', 'normalize': 'MN', 'emoji': '😳'},
 {'emotion': 'excitement', 'normalize': 'FP', 'emoji': '🎉'},
 {'emotion': 'fear', 'normalize': 'FN', 'emoji': '😨'},
 {'emotion': 'gratitude', 'normalize': 'FP', 'emoji': '🙏'},
 {'emotion': 'grief', 'normalize': 'FN', 'emo

####
2. Fetch clusters

In [103]:
from typing import List, Dict, Any, Optional
from pymysql.cursors import DictCursor


# ---------- Helpers (reuse-safe) ----------

def _get_existing_tew_columns() -> List[str]:
    """
    Return the list of existing columns in track_emotions_wide
    (excluding track_spotify_id), preserving table order.
    """
    conn = db.create_connection()
    sql = """
        SELECT COLUMN_NAME
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA = DATABASE()
          AND TABLE_NAME = 'track_emotions_wide'
        ORDER BY ORDINAL_POSITION
    """
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql)
            cols = [r["COLUMN_NAME"] for r in cur.fetchall()]
    finally:
        try:
            conn.close()
        except:
            pass

    return [c for c in cols if c and c.lower() != "track_spotify_id"]


def get_emotions_from_dictionary() -> List[str]:
    """
    Step 1 for dynamic AVG building: fetch emotion column names from emotions_dictionary.
    """
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute("SELECT emotion FROM emotions_dictionary")
            emotions = [r["emotion"] for r in cur.fetchall()]
    finally:
        try:
            conn.close()
        except:
            pass

    if not emotions:
        raise ValueError("No emotions found in emotions_dictionary")
    return emotions


# ---------- 1) All tracks with their clusters, artist name, and album title ----------

def fetch_all_tracks_with_clusters() -> List[Dict[str, Any]]:
    """
    Return all tracks (expected ~990) with:
      - track_spotify_id
      - cluster_id, cluster_name
      - artist_spotify_id, artist_name
      - album_spotify_id, album_name
    """
    sql = """
        SELECT
          t.spotify_id          AS track_spotify_id,
          t.cluster_id          AS cluster_id,
          t.name                AS track_name,
          c.name                AS cluster_name,
          ar.spotify_id         AS artist_spotify_id,
          ar.name               AS artist_name,
          a.spotify_id          AS album_spotify_id,
          a.name                AS album_name
        FROM tracks t
        LEFT JOIN clusters c ON c.id = t.cluster_id
        LEFT JOIN artists  ar ON ar.spotify_id = t.artist_spotify_id
        LEFT JOIN albums   a  ON a.spotify_id  = t.album_spotify_id
        ORDER BY ar.name, a.name, t.spotify_id
    """
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
    finally:
        try:
            conn.close()
        except:
            pass

    return rows


# ---------- 2) All tracks for a given cluster (with artist and album names) ----------

def fetch_tracks_by_cluster(cluster_id: int) -> List[Dict[str, Any]]:
    """
    Return tracks for a specific cluster with the same fields as above.
    """
    sql = """
        SELECT
          t.spotify_id          AS track_spotify_id,
          t.cluster_id          AS cluster_id,
          t.name                AS track_name,
          c.name                AS cluster_name,
          ar.spotify_id         AS artist_spotify_id,
          ar.name               AS artist_name,
          a.spotify_id          AS album_spotify_id,
          a.name                AS album_name
        FROM tracks t
        JOIN clusters c  ON c.id = t.cluster_id
        LEFT JOIN artists ar ON ar.spotify_id = t.artist_spotify_id
        LEFT JOIN albums  a  ON a.spotify_id  = t.album_spotify_id
        WHERE t.cluster_id = %s
        ORDER BY ar.name, a.name, t.spotify_id
    """
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql, (cluster_id,))
            rows = cur.fetchall()
    finally:
        try:
            conn.close()
        except:
            pass

    return rows


# ---------- 4) Per-cluster averages for every emotion column ----------

def build_cluster_avg_emotions_sql(emotions: List[str]) -> str:
    """
    Build a dynamic SQL SELECT that computes AVG(col) AS `col` for all given emotions,
    restricted to a cluster via WHERE t.cluster_id = %s.
    The SELECT joins track_emotions_wide with tracks by spotify_id.
    """
    # Intersect requested emotions with actual columns in track_emotions_wide
    existing = set(_get_existing_tew_columns())
    valid_emotions = [e for e in emotions if e in existing]
    if not valid_emotions:
        raise ValueError("None of the emotions exist in track_emotions_wide")

    avg_exprs = [f"AVG(tew.`{emo}`) AS `{emo}`" for emo in valid_emotions]
    sql = f"""
        SELECT {', '.join(avg_exprs)}
        FROM track_emotions_wide AS tew
        JOIN tracks AS t ON t.spotify_id = tew.track_spotify_id
        WHERE t.cluster_id = %s
    """
    return sql


def fetch_cluster_avg_emotions(cluster_id: int) -> Dict[str, Optional[float]]:
    """
    Compute emotion-wise averages for a given cluster.
    - Uses emotions from emotions_dictionary
    - Validates columns against track_emotions_wide
    - Returns a dict: { 'admiration': 3.2, 'anger': 1.8, ... }
    """
    emotions = get_emotions_from_dictionary()
    sql = build_cluster_avg_emotions_sql(emotions)

    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql, (cluster_id,))
            row = cur.fetchone()  # one row with AVG per emotion
    finally:
        try:
            conn.close()
        except:
            pass

    # Convert Decimal/None to float/None
    return {k: (float(v) if v is not None else None) for k, v in row.items()}


# ---------- Optional convenience: also return track_count for the cluster ----------

def fetch_cluster_avg_emotions_with_count(cluster_id: int) -> Dict[str, Any]:
    """
    Same as fetch_cluster_avg_emotions, but also returns 'track_count' for the cluster.
    """
    # Averages
    emo_avgs = fetch_cluster_avg_emotions(cluster_id)

    # Track count
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute("SELECT COUNT(*) AS n FROM tracks WHERE cluster_id = %s", (cluster_id,))
            n = cur.fetchone()["n"]
    finally:
        try:
            conn.close()
        except:
            pass

    out: Dict[str, Any] = {"cluster_id": cluster_id, "track_count": int(n)}
    out.update(emo_avgs)
    return out


In [121]:
get_emotions_from_dictionary()

['admiration',
 'amusement',
 'anger',
 'annoyance',
 'approval',
 'caring',
 'confusion',
 'curiosity',
 'desire',
 'disappointment',
 'disapproval',
 'disgust',
 'embarrassment',
 'excitement',
 'fear',
 'gratitude',
 'grief',
 'joy',
 'love',
 'nervousness',
 'neutral',
 'optimism',
 'pride',
 'realization',
 'relief',
 'remorse',
 'sadness',
 'surprise']

####
3. Fetch info

In [105]:
from typing import List, Dict, Any, Optional
from pymysql.cursors import DictCursor


# ------------------- Artists -------------------

def fetch_all_artists() -> List[Dict[str, Any]]:
    """
    Return all artists with their spotify_id and name.
    """
    sql = "SELECT spotify_id, name FROM artists ORDER BY name"
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
    finally:
        try:
            conn.close()
        except:
            pass
    return rows


def fetch_artist_by_id(artist_spotify_id: str) -> Optional[Dict[str, Any]]:
    """
    Return a single artist by its spotify_id.
    """
    sql = "SELECT spotify_id, name FROM artists WHERE spotify_id = %s"
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql, (artist_spotify_id,))
            row = cur.fetchone()
    finally:
        try:
            conn.close()
        except:
            pass
    return row


# ------------------- Albums -------------------

def fetch_all_albums() -> List[Dict[str, Any]]:
    """
    Return all albums with their spotify_id and name.
    """
    sql = "SELECT spotify_id, name FROM albums ORDER BY name"
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
    finally:
        try:
            conn.close()
        except:
            pass
    return rows


def fetch_album_by_id(album_spotify_id: str) -> Optional[Dict[str, Any]]:
    """
    Return a single album by its spotify_id.
    """
    sql = "SELECT spotify_id, name FROM albums WHERE spotify_id = %s"
    conn = db.create_connection()
    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(sql, (album_spotify_id,))
            row = cur.fetchone()
    finally:
        try:
            conn.close()
        except:
            pass
    return row


## FastAPI Service

We are going to set up a FastAPI service to provide the endpoints that will power a frontend, so you can visualize the lists created by the app and more information about your emotions. For this, I’ve built a small interface with Vue.

In [129]:
import threading, time
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware


# Create FastAPI app
app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],      # permite todos los orígenes
    allow_credentials=True,
    allow_methods=["*"],      # permite todos los métodos (GET, POST, etc.)
    allow_headers=["*"],      # permite todos los headers
)


@app.get("/")
def root():
    return {"message": "Hello World"}

@app.get("/playlist-emotions")
def getNormalizeEmotions():
    avgEmotions = group_avg_emotions_by_normalize(fetch_avg_per_emotion())
    return {"emotions": avgEmotions}


@app.get("/playlist-emotions/all")
def getAllEmotions():
    avgEmotions = fetch_avg_per_emotion()
    return {"emotions": avgEmotions, "dictionary": fetch_emotions_dictionary()}


# Global state to keep track of the server
SERVER_STATE = {"server": None, "thread": None}


def start_server(host="127.0.0.1", port=8000):
    """
    Start the Uvicorn server in a separate thread.
    """
    if SERVER_STATE["server"] is not None:
        print(f"⚠️ A server is already running at http://{host}:{port}")
        return

    config = uvicorn.Config(app, host=host, port=port, log_level="info")
    server = uvicorn.Server(config)

    thread = threading.Thread(target=server.run, daemon=True)
    thread.start()

    time.sleep(1)  # wait a bit to let the server start
    SERVER_STATE.update({"server": server, "thread": thread})
    print(f"🚀 Server running at http://{host}:{port}")


def stop_server():
    """
    Stop the server if it is running.
    """
    server = SERVER_STATE.get("server")
    thread = SERVER_STATE.get("thread")
    if server is None:
        print("ℹ️ No server is currently running.")
        return
    server.should_exit = True
    if thread and thread.is_alive():
        thread.join(timeout=3)
    SERVER_STATE.update({"server": None, "thread": None})
    print("🛑 Server stopped.")


In [130]:
start_server()

INFO:     Started server process [33886]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


🚀 Server running at http://127.0.0.1:8000


## Stop Server

When you finish, run this cell to shut down the server.

In [128]:
stop_server()

🛑 Server stopped.
