# LNG320 Gen Z Slang Similarity Analysis


In [None]:
%pip install -qU datasets pinecone-client "langchain==0.3.27" "langchain-core>=0.3.72,<1.0.0" langchain-pinecone umap-learn hdbscan scikit-learn plotly tqdm "threadpoolctl==3.5.0"


In [None]:
import os
from getpass import getpass
from typing import Any, Dict, List, Literal, Optional

import numpy as np
import pandas as pd
from datasets import load_dataset
from IPython.display import display
from pinecone import Pinecone
from tqdm import tqdm

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans, DBSCAN

import hdbscan

import plotly.express as px
import plotly.graph_objects as go

try:
    import umap  # type: ignore
except ImportError:  # pragma: no cover
    import umap.umap_ as umap  # fallback if namespace layout differs


In [None]:
if "PINECONE_API_KEY" not in os.environ or not os.environ["PINECONE_API_KEY"]:
    os.environ["PINECONE_API_KEY"] = getpass("Enter your Pinecone API key: ")

pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])


## Load and Prepare Dataset


In [None]:
raw_ds = load_dataset("MLBtrio/genz-slang-dataset", split="train")
df = raw_ds.to_pandas().copy()

df.columns = [col.lower().strip() for col in df.columns]
slang_column = "slang"

df["input_for_embedding"] = (
    df[slang_column].astype(str)
    + " is a slang term that means "
    + df["description"].astype(str)
)

display(df.head())
print(f"Dataset shape: {df.shape}")


In [None]:
import uuid

df["id"] = [
    str(uuid.uuid5(uuid.NAMESPACE_DNS, term)) for term in df[slang_column].astype(str)
]
print("Sample IDs:")
display(df[["slang", "id"]].head())


## Pinecone Index Setup


In [None]:
index_name = "lng320-genz-slang"

existing_indexes = {item["name"] for item in pc.list_indexes()}
if index_name not in existing_indexes:
    pc.create_index_for_model(
        name=index_name,
        cloud="aws",
        region="us-east-1",
        embed={
            "model": "llama-text-embed-v2",
            "field_map": {"text": "input_for_embedding"},
        },
    )
    print(f"Created index '{index_name}'")
else:
    print(f"Using existing index '{index_name}'")

index = pc.Index(index_name)


## Generate or Retrieve Embeddings


In [None]:
GENERATE_EMBEDDINGS = False

if GENERATE_EMBEDDINGS:
    texts = df["input_for_embedding"].tolist()
    batch_size = 96
    embeddings: List[List[float]] = []

    for start in tqdm(range(0, len(texts), batch_size), desc="Embedding batches"):
        batch = texts[start : start + batch_size]
        embed_result = pc.inference.embed(
            model="llama-text-embed-v2",
            inputs=batch,
            parameters={"input_type": "passage"},
        )
        batch_embeddings = [item.values for item in embed_result.data]
        embeddings.extend(batch_embeddings)

    df["values"] = embeddings
    print(f"Generated {len(df)} embeddings")
else:
    fetched_vectors: Dict[str, List[float]] = {}
    batch_size = 200
    for start in tqdm(range(0, len(df), batch_size), desc="Fetching embeddings"):
        batch_ids = df["id"].iloc[start : start + batch_size].tolist()
        response = index.fetch(ids=batch_ids)
        fetched_vectors.update(
            {item[0]: item[1]["values"] for item in response.vectors.items()}
        )

    df["values"] = [fetched_vectors[row.id] for row in df.itertuples(index=False)]
    missing = [
        row.id for row in df.itertuples(index=False) if row.id not in fetched_vectors
    ]
    if missing:
        raise RuntimeError(
            f"Missing vectors for {len(missing)} ids. Regenerate embeddings instead."
        )


In [None]:
if GENERATE_EMBEDDINGS:
    vectors = [
        {
            "id": row.id,
            "values": row.values,
            "metadata": {"text": row.input_for_embedding},
        }
        for row in df.itertuples(index=False)
    ]
    batch_size = 100
    for start in tqdm(range(0, len(vectors), batch_size), desc="Upserting to Pinecone"):
        index.upsert(vectors=vectors[start : start + batch_size])
    print("Upserted embeddings to Pinecone")


In [None]:
embedding_matrix = np.vstack(df["values"].to_numpy())
print(f"Embedding matrix shape: {embedding_matrix.shape}")


### UMAP


In [None]:
umap_model = umap.UMAP(n_components=2, metric="cosine", random_state=42)
umap_coords = umap_model.fit_transform(embedding_matrix)

viz_df = df[["id", "slang", "description"]].copy()
viz_df[["umap_x", "umap_y"]] = umap_coords

viz_df.head()

#### HDBSCAN


In [None]:
clusterer = hdbscan.HDBSCAN(
    min_cluster_size=5,
    min_samples=3,
    cluster_selection_method="eom",
    metric="euclidean",
)
cluster_labels = clusterer.fit_predict(umap_coords)

viz_df["cluster"] = cluster_labels
viz_df["cluster_prob"] = clusterer.probabilities_

n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
n_noise = (cluster_labels == -1).sum()

print(f"Number of clusters: {n_clusters}")
print(f"Noise points: {n_noise} ({n_noise / len(cluster_labels) * 100:.1f}%)")

#### Cluster Analysis


In [None]:
cluster_counts = viz_df["cluster"].value_counts().sort_index()
print("Cluster distribution:")
display(cluster_counts)

print("\nExemplar slang terms per cluster:")
for cluster_id in sorted(viz_df["cluster"].unique()):
    cluster_terms = viz_df[viz_df["cluster"] == cluster_id]
    top_terms = cluster_terms.nlargest(5, "cluster_prob")[
        ["slang", "description", "cluster_prob"]
    ]
    label = "Noise" if cluster_id == -1 else f"Cluster {cluster_id}"
    print(f"\n{label} ({len(cluster_terms)} terms):")
    display(top_terms)


#### Cluster Visualization


In [None]:
viz_df["cluster_label"] = viz_df["cluster"].apply(
    lambda x: "Noise" if x == -1 else f"Cluster {x}"
)

fig = px.scatter(
    viz_df,
    x="umap_x",
    y="umap_y",
    color="cluster_label",
    hover_data=["slang", "description", "cluster_prob"],
    title="UMAP + HDBSCAN Clustering of Gen Z Slang",
    labels={"umap_x": "UMAP 1", "umap_y": "UMAP 2", "cluster_label": "Cluster"},
)

fig.update_traces(marker=dict(size=8, opacity=0.7))
fig.update_layout(
    width=900,
    height=700,
    legend=dict(title="Cluster", orientation="v"),
)

fig.show()
