# setup

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objects as go
import psycopg
from kneed import KneeLocator
from pgvector.psycopg import register_vector
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

In [None]:
conn = psycopg.connect(
    dbname="postgres_db",
    user="postgres_user",
    password="postgres_password",
    host="veld_embeddings_platform_run_sql_server",
    port="5432",
)
conn.autocommit = True
register_vector(conn)
cursor = conn.cursor()
cursor.execute("SELECT version();")
print(cursor.fetchone())

# load_from_db

In [None]:
def load_from_db(lemma_list, limit=None):
    cursor.execute(
        "SELECT e.lemma, e.sentence_id, e.token_index, s.text, e.embedding"
        " FROM embeddings__dbmdz__bert_base_german_cased__test AS e JOIN sentences AS s ON e.sentence_id = s.sentence_id"
        " WHERE lemma = ANY(%s) ORDER BY sentence_id LIMIT %s;",
        (lemma_list, limit),
    )
    rows = cursor.fetchall()
    labels = []
    sentences = []
    embeddings = []
    for r in rows:
        labels.append("lemma:" + r[0] + "__sent:" + str(r[1]) + "__token:" + str(r[2]))
        sentences.append(r[3])
        embeddings.append(r[4])
    labels = np.array(labels)
    sentences = np.array(sentences)
    embeddings = np.array(embeddings)
    print("results from db:", len(embeddings))
    return labels, sentences, embeddings


labels, sentences, embeddings = load_from_db(["Frau", "Mann", "Haus", "Küche", "Werkstatt"])

# calculate_tsne

In [None]:
def calculate_tsne(embeddings, n_components=2, perpexity=5):
    embeddings_tsne = TSNE(
        n_components=n_components, perplexity=perpexity, random_state=42
    ).fit_transform(embeddings)
    return embeddings_tsne


embeddings_tsne = calculate_tsne(embeddings)

# calculate_pca

In [None]:
def calculate_pca(embeddings, n_components=2):
    pca = PCA(n_components=2)
    embeddings_pca = pca.fit_transform(embeddings)
    return embeddings_pca


embeddings_pca = calculate_pca(embeddings)

# calculate_kmeans

In [None]:
def calculate_kmeans(embeddings, n_clusters=None, max_k=5, title=None, show_knee_chart=True):
    if n_clusters:
        embeddings_kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(embeddings)
    else:
        inertia = []
        k_iter = range(1, max_k + 1)
        embeddings_kmeans_list = []
        for k in k_iter:
            embeddings_kmeans = KMeans(n_clusters=k, n_init=10, random_state=0).fit(embeddings)
            embeddings_kmeans_list.append(embeddings_kmeans)
            inertia.append(embeddings_kmeans.inertia_)
        knee = KneeLocator(k_iter, inertia, curve="convex", direction="decreasing")
        embeddings_kmeans = embeddings_kmeans_list[knee.knee - 1]
        if show_knee_chart:
            plt.plot(k_iter, inertia, marker="o")
            plt.axvline(x=knee.knee, color="r", linestyle="--", label=f"Elbow at k={knee.knee}")
            plt.xlabel("k")
            plt.ylabel("Inertia")
            plt.title(title)
            plt.xticks(k_iter)
            plt.xlim(1, max_k)
            plt.show()
    print("embeddings_kmeans.cluster_centers_.shape:", embeddings_kmeans.cluster_centers_.shape)
    return embeddings_kmeans


embeddings_kmeans_tsne = calculate_kmeans(embeddings_tsne, n_clusters=None, max_k=10, title="tsne")
embeddings_kmeans_pca = calculate_kmeans(embeddings_pca, n_clusters=None, max_k=10, title="pca")

# show_plot

In [None]:
def show_plot(labels, sentences, embeddings, kmeans=None, title=None, color_by=None):

    if kmeans:
        labels_new = []
        for label, cluster_id in zip(labels, kmeans.labels_):
            labels_new.append(label + "__centroid:" + str(cluster_id))
        labels = np.array(labels_new)

    color_set = set()
    if color_by:
        if color_by == "lemma":
            color_id_list = []
            color_id_dict = {}
            color_id_current = 0
            for l in labels:
                lemma = l.split("__")[0]
                color_id = color_id_dict.get(lemma)
                if color_id is None:
                    color_id = color_id_current
                    color_id_dict[lemma] = color_id
                    color_id_current += 1
                color_id_list.append(color_id)
                color_set.add(color_id)
        elif color_by == "kmeans":
            color_id_list = kmeans.labels_
            color_set = set(color_id_list)
        else:
            raise Exception('arg `color_by` must be either `"lemma"`, `"kmeans"`, or `None`')
    else:
        color_id_list = [0 for _ in embeddings]

    if len(color_set) > 1:
        color_dict = dict(
            color=color_id_list,
            colorscale="rainbow",
        )
    else:
        color_dict = dict(color="red")

    sentences = np.array([s[:110] for s in sentences])

    fig = go.Figure(
        data=go.Scatter(
            x=embeddings[:, 0],
            y=embeddings[:, 1],
            mode="markers",
            customdata=np.stack((labels, sentences), axis=1),
            marker=color_dict,
            hovertemplate="x: %{x}<br>y: %{y}<br>label: %{customdata[0]}<br>sentence: %{customdata[1]}<extra></extra>",
            showlegend=False,
        ),
        layout=go.Layout(
            title=title,
            width=800,
            height=800,
            xaxis=dict(showgrid=False),
            yaxis=dict(showgrid=False),
        ),
    )
    if kmeans:
        for i, (cx, cy) in enumerate(kmeans.cluster_centers_):
            kmeans_label = f"Centroid {i}"
            fig.add_scatter(
                x=[cx],
                y=[cy],
                mode="markers+text",
                marker=dict(
                    color="black",
                    size=20,
                    symbol="x",
                ),
                text=["<b>" + kmeans_label + "</b>"],
                textposition="top center",
                showlegend=False,
                hovertemplate="x: %{x}<br>y: %{y}<br>" + kmeans_label + "<extra></extra>",
            )
    fig.show()


show_plot(
    labels=labels,
    sentences=sentences,
    embeddings=embeddings_pca,
    kmeans=embeddings_kmeans_pca,
    color_by="lemma",
)

# query_and_plot

In [None]:
def query_and_plot(lemma_list, color_by="lemma", show_knee_chart=True):
    labels, sentences, embeddings = load_from_db(lemma_list)
    embeddings_tsne = calculate_tsne(embeddings)
    embeddings_pca = calculate_pca(embeddings)
    embeddings_kmeans_tsne = calculate_kmeans(
        embeddings_tsne,
        n_clusters=None,
        max_k=10,
        title="tsne elbow",
        show_knee_chart=show_knee_chart,
    )
    embeddings_kmeans_pca = calculate_kmeans(
        embeddings_pca,
        n_clusters=None,
        max_k=10,
        title="pca elbow",
        show_knee_chart=show_knee_chart,
    )
    show_plot(
        labels=labels,
        sentences=sentences,
        embeddings=embeddings_tsne,
        kmeans=embeddings_kmeans_tsne,
        title="tsne: " + str(lemma_list),
        color_by=color_by,
    )
    show_plot(
        labels=labels,
        sentences=sentences,
        embeddings=embeddings_pca,
        kmeans=embeddings_kmeans_pca,
        title="pca: " + str(lemma_list),
        color_by=color_by,
    )


query_and_plot(
    lemma_list=["Frau", "Mann", "Haus", "Küche", "Werkstatt"],
    color_by="lemma",
    show_knee_chart=False,
)

# analyses

In [None]:
query_and_plot(["Frau"], color_by="kmeans")

In [None]:
query_and_plot(["Frau", "Mann"], color_by="lemma")

In [None]:
query_and_plot(["Schloss"], color_by="kmeans")

In [None]:
query_and_plot(["Band"], color_by="kmeans")

# various experiments

**NOTE: Do not execute "experimental cells" below and then "production cells" above, as the cells below are not checked for potential overwriting of variables used in cells above**

## kmeans

### kmeans from scratch

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.datasets import make_blobs

np.random.seed(42)

In [None]:
k = 4

In [None]:
data_test, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.9, random_state=0)
print(data_test.shape)

In [None]:
def init_centroids(data, k, debug_print=False):
    indices = np.random.choice(data.shape[0], size=k, replace=False)
    centroids = data[indices]
    if debug_print:
        print("-- init_centroids --")
        print(centroids.shape)
        print(centroids)
    return centroids


centroids_test = init_centroids(data_test, k, True)

In [None]:
def assign_centroid(data, centroids, debug_print=False):
    distances = np.linalg.norm(data[:, np.newaxis] - centroids, axis=2)
    data_centroids_indices = np.argmin(distances, axis=1)
    if debug_print:
        print("-- assign_centroid --")
        print("distances.shape:", distances.shape)
        print("distances[0]:", distances[0])
        print("data[0]:", data[0])
        print("centroids:", centroids)
        print(
            "np.linalg.norm(data[0] - centroids, axis=1):",
            np.linalg.norm(data[0] - centroids, axis=1),
        )
        print("data_centroids_indices[0]:", data_centroids_indices[0])
    return data_centroids_indices


data_centroids_indices_test = assign_centroid(data_test, centroids_test, True)

In [None]:
def update_centroids(data, centroids, data_centroids_indices, debug_print=False):
    centroids_new = np.array([data[data_centroids_indices == i].mean(axis=0) for i in range(k)])
    if debug_print:
        print("-- update_centroids --")
        print("centroids:", centroids)
        print("centroids_new:", centroids_new)
    return centroids_new


centroids_new_test = update_centroids(data_test, centroids_test, data_centroids_indices_test, True)

In [None]:
def have_centroids_converged(centroids, centroids_new, threshold, debug_print=False):
    distances = np.linalg.norm(centroids - centroids_new, axis=1)
    have_centroids_converged = np.all(distances < threshold)
    if debug_print:
        print("-- have_centroids_converged --")
        print("distance:", distances)
        print("have_centroids_converged:", have_centroids_converged)
    return have_centroids_converged


centroids_converged_test = have_centroids_converged(centroids_test, centroids_new_test, True)

In [None]:
def kmeans_main(data, k, threshold=1e-1, debug_print=False):
    if debug_print:
        print("-- kmeans_main: start --")
    centroids = init_centroids(data, k, debug_print)
    centroids_converged = False
    while not centroids_converged:
        data_centroids_indices = assign_centroid(data, centroids, debug_print)
        centroids_new = update_centroids(data, centroids, data_centroids_indices, debug_print)
        centroids_converged = have_centroids_converged(
            centroids, centroids_new, threshold, debug_print
        )
        centroids = centroids_new
    if debug_print:
        print("-- kmeans_main: end --")
        print(centroids, data_centroids_indices)
    return centroids, data_centroids_indices


centroids, data_centroids_indices = kmeans_main(
    data=data_test, k=4, threshold=1e-4, debug_print=False
)

In [None]:
plt.figure(figsize=(8, 6))
plt.scatter(data_test[:, 0], data_test[:, 1], c=data_centroids_indices, cmap="viridis", s=30)
handles = []
labels_for_legend = []
for i, (x, y) in enumerate(centroids):
    handle = plt.scatter(x, y, color="red", s=200, marker="X")
    handles.append(handle)
    labels_for_legend.append(f"Centroid {i}: ({x:.2f}, {y:.2f})")
plt.legend(handles, labels_for_legend, title="Centroids", loc="best")
plt.title("K-Means from scratch")
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

kmeans = KMeans(n_clusters=4, random_state=42)
kmeans.fit(data_test)
centroids = kmeans.cluster_centers_
labels = kmeans.labels_

plt.figure(figsize=(8, 6))
plt.scatter(data_test[:, 0], data_test[:, 1], c=labels, cmap="viridis", s=30)
handles = []
labels_for_legend = []
for i, (x, y) in enumerate(centroids):
    handle = plt.scatter(x, y, color="red", s=200, marker="X")
    handles.append(handle)
    labels_for_legend.append(f"Centroid {i}: ({x:.2f}, {y:.2f})")
plt.legend(handles, labels_for_legend, title="Centroids", loc="best")
plt.title("K-Means Clustering with sklearn")
plt.show()

### kmeans visualizations

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

data_test, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.9, random_state=0)
print(data_test.shape)

kmeans = KMeans(n_clusters=4, random_state=42)
kmeans.fit(data_test)
centroids = kmeans.cluster_centers_
labels = kmeans.labels_

plt.figure(figsize=(8, 6))
plt.scatter(data_test[:, 0], data_test[:, 1], c=labels, cmap="viridis", s=30)
handles = []
labels_for_legend = []
for i, (x, y) in enumerate(centroids):
    handle = plt.scatter(x, y, color="red", s=200, marker="X")
    handles.append(handle)
    labels_for_legend.append(f"Centroid {i}: ({x:.2f}, {y:.2f})")
plt.legend(handles, labels_for_legend, title="Centroids", loc="best")
plt.title("matplotlib")
plt.show()

In [None]:
import numpy as np
import plotly.graph_objects as go
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

data_test, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.9, random_state=0)
print(data_test.shape)

kmeans = KMeans(n_clusters=4, random_state=0).fit(data_test)
labels = kmeans.labels_
centroids = kmeans.cluster_centers_

fig = go.Figure(
    data=go.Scatter(
        x=data_test[:, 0],
        y=data_test[:, 1],
        mode="markers",
        marker=dict(
            color=labels,
            colorscale="rainbow",
        ),
        hovertemplate="x: %{x}<br>y: %{y}<extra></extra>",
        showlegend=False,
    ),
    layout=go.Layout(
        title="plotly",
        width=800,
        height=800,
        xaxis=dict(showgrid=False),
        yaxis=dict(showgrid=False),
    ),
)
for i, (cx, cy) in enumerate(centroids):
    fig.add_scatter(
        x=[cx],
        y=[cy],
        mode="markers+text",
        marker=dict(
            color="red",
            size=20,
            symbol="x",
        ),
        text=[f"Centroid {i}"],
        textposition="top center",
        showlegend=False,
        hovertemplate="x: %{x}<br>y: %{y}<extra></extra>",
    )

fig.show()

### elbow method

In [None]:
import matplotlib.pyplot as plt
from kneed import KneeLocator
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

# Sample data
X, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.6, random_state=0)

inertia = []
K = range(1, 11)

for k in K:
    model = KMeans(n_clusters=k, n_init=10, random_state=0)
    model.fit(X)
    inertia.append(model.inertia_)

# Find elbow point
knee = KneeLocator(K, inertia, curve="convex", direction="decreasing")

# Plot
plt.plot(K, inertia, marker="o")
plt.axvline(x=knee.knee, color="r", linestyle="--", label=f"Elbow at k={knee.knee}")
plt.xlabel("k")
plt.ylabel("Inertia")
plt.title("Elbow Method with Automatic Detection")
plt.legend()
plt.xticks(K)
plt.xlim(1, 10)
plt.show()
print(f"Optimal number of clusters (Elbow): {knee.knee}")

## snippets to be removed

In [None]:
def show_plot_pca(labels, embeddings, title=None):
    pca = PCA(n_components=2)
    embeddings_reduced = pca.fit_transform(embeddings)
    plt.figure(figsize=(8, 6))
    plt.scatter(embeddings_reduced[:, 0], embeddings_reduced[:, 1], color="blue", alpha=0.7)
    for r, l in zip(embeddings_reduced, labels):
        plt.text(r[0], r[1], l)
    plt.title(title)
    plt.show()


show_plot_pca(labels, embeddings)

In [None]:
def show_plot_tsne(labels, embeddings, title=None):
    tsne = TSNE(n_components=2, perplexity=5, random_state=42)
    embeddings_reduced = tsne.fit_transform(embeddings)
    plt.figure(figsize=(8, 6))
    plt.scatter(embeddings_reduced[:, 0], embeddings_reduced[:, 1], c="blue", alpha=0.7)
    for i, label in enumerate(labels):
        plt.text(
            embeddings_reduced[i, 0],
            embeddings_reduced[i, 1],
            label,
        )
    plt.title(title)
    plt.show()


show_plot_tsne(labels, embeddings)