In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from scipy.stats import multivariate_t, multivariate_normal
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, silhouette_samples, adjusted_rand_score
from sklearn.base import clone
from sklearn.utils import check_random_state

In [2]:
# Definimos una función para formatear los números
def format_func(value, tick_number):
    return f'{value/1000:.1f}K'

def cluster_stability(X, est, n_iter=20, random_state=None):
    rng = np.random.RandomState(random_state)
    labels = []
    indices = []
    for i in range(n_iter):
        # Draw bootstrap samples and store indices
        sample_indices = rng.randint(0, X.shape[0], X.shape[0])
        indices.append(sample_indices)

        # Clone the estimator to make sure that we are fitting fresh models
        est = clone(est)

        # The hasattr() method returns checks if the estimator has a random_state attribute
        if hasattr(est, "random_state"):
            # Randomize estimator if possible
            est.random_state = rng.randint(1e5)
        
        # Creates a bootstrap sample
        X_bootstrap = X[sample_indices]

        # Fit the clustering model
        est.fit(X_bootstrap)

        # Store clustering outcome using original indices
        relabel = -np.ones(X.shape[0], dtype=np.int)
        relabel[sample_indices] = est.labels_
        labels.append(relabel)

    scores = []
    for l, i in zip(labels, indices):
        for k, j in zip(labels, indices):
            in_both = np.intersect1d(i, j)
            scores.append(adjusted_rand_score(l[in_both], k[in_both]))

    return np.mean(scores)

# Ejercicio 1

Probar el algoritmo utilizando distribuciones t-bivariadas.

## Muestras

In [None]:
# Observaciones
n = 50

# Medias
mu1 = np.array([2, 2])
mu2 = np.array([-1, -1])

# Covarianza
sigma = np.array([[1, 0], [0, 1]])

# Miramos el determinante de la matriz de covarianza. Como tiene que se semi-definida positiva, el determinante tiene que ser mayor que 0
det = np.linalg.det(sigma)
print('Determinante', det)

In [None]:
# Distribuciones
x = multivariate_t.rvs(loc=mu1, shape=sigma, size=n, random_state=42)
y = multivariate_t.rvs(loc=mu2, shape=sigma, size=n, random_state=42)

X = np.concatenate((x, y), axis=0)

## Modelo

In [None]:
# Inicializamos el modelo y lo ajustamos
kmeans = KMeans(n_clusters=2, max_iter=10, n_init=2, random_state=42)
y = kmeans.fit_predict(X)

In [None]:
# Miramos los centros de los clusters y la dispersión total intra-grupo
print('Centros:\n', kmeans.cluster_centers_)
print('\nDispersión total intra-grupo:', kmeans.inertia_.__format__('.2f'))

### Gráfico

Miramos los datos utilizando los clusters encontrados

In [None]:
# Existing scatter plot
figure = px.scatter(
    x=X[:, 0],
    y=X[:, 1],
    color=y.astype(str),
    color_discrete_map={'0': '#E65983', '1': '#2D3846'},
    size=[1] * X.shape[0],
)

# Update layout
figure.update_layout(
    title='K-Means Clustering with Additional Scatter Plot',
    title_font=dict(size=16, family='Arial', color='black', weight='bold'),
    xaxis_title='X1',
    yaxis_title='X2',
    plot_bgcolor='white',
    yaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
    xaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
)

figure.show()

## Validación

### Elbow method

Calculamos la dispersión total intra-grupo para distintos valores de $K$. El valor óptimo de $K$ es aquel a partir del cual la reducción en la dispersión total intra-grupo comienza a desacelerarse.

In [None]:
# Calculamos, para distintos valores de K, la dispersión total intra-grupo
inertia = []
for i in np.arange(1, 11):
    kmeans = KMeans(n_clusters=i, max_iter=10, n_init=2, random_state=42)
    y = kmeans.fit_predict(X)

    inertia.append(kmeans.inertia_)

figure = px.line(
    x=np.arange(1, 11),
    y=inertia,
    title='Elbow Method',
    labels={'x': 'K', 'y': 'Inertia'},
    line_shape='linear'
)

figure.update_layout(
    title='Elbow Method',
    title_font=dict(size=16, family='Arial', color='black', weight='bold'),
    xaxis_title='K',
    yaxis_title='Dispersión total intra-grupo',
    plot_bgcolor='white',
    yaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
    xaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray')
)

Probamos el algoritmo con $k=4$

In [None]:
kmeans = KMeans(n_clusters=4, max_iter=10, n_init=2, random_state=42)
y = kmeans.fit_predict(X)

In [None]:
# Existing scatter plot
figure = px.scatter(
    x=X[:, 0],
    y=X[:, 1],
    color=y.astype(str),
    color_discrete_map={'0': '#E65983', '1': '#2D3846', '2': '#F6AE2D', '3': '#3C7A89'},
    size=[1] * X.shape[0],
)

# Update layout
figure.update_layout(
    title='K-Means Clustering with Additional Scatter Plot',
    title_font=dict(size=16, family='Arial', color='black', weight='bold'),
    xaxis_title='X1',
    yaxis_title='X2',
    plot_bgcolor='white',
    yaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
    xaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
)

figure.show()

### Mean Silhouette

Calculamos el índice de Silhouette para cada observación. Esto nos da una idea de que tan compactos y separados están los clusters.

In [None]:
silhouette_values = silhouette_samples(X, y)

In [None]:
n_clusters = len(np.unique(y))
y_lower, y_upper = 0, 0
yticks = []

colors = ['#E65983', '#2D3846', '#F6AE2D', '#3C7A89']
fig, ax = plt.subplots(figsize=(10, 10))

for i in np.arange(n_clusters):
    cluster_silhouette_vals = silhouette_values[y == i]
    cluster_silhouette_vals.sort()
    y_upper += len(cluster_silhouette_vals)
    ax.barh(np.arange(y_lower, y_upper), cluster_silhouette_vals, edgecolor='white', height=1, color=colors[i])
    yticks.append((y_lower + y_upper) / 2)
    y_lower += len(cluster_silhouette_vals)

ax.axvline(x=np.mean(silhouette_values), color="black", linestyle="--")
ax.set_yticks(yticks, [f'Cluster {i}' for i in np.arange(n_clusters)])
ax.set_xlabel('Silhouette Coefficient')
ax.set_title('Silhouette Plot', loc='left', fontdict={'fontsize': 16, 'fontweight': 'bold'})

# Ejercicio 2

Modificando las matrices de covarianza encontrar un conjunto de datos que logre un mal desempeño del algoritmo. Intentar con una configuración de tres normales bivariadas.

## Muestras

In [None]:
# Observaciones
n = 50

# Medias
mu1 = np.array([2, 2])
mu2 = np.array([1, 1])
mu3 = np.array([0, 0])
means = np.array([mu1, mu2, mu3])

# Cambiamos la matriz de covarianza. Aumentamos la varianza de X2 variable y reducimos la de X1
sigma = np.array([[0.01, 0.09], [0.09, 1]])

# Miramos el determinante de la matriz de covarianza. Como tiene que se semi-definida positiva, el determinante tiene que ser mayor que 0
det = np.linalg.det(sigma)
print('Determinante', det)

In [None]:
# Distribuciones
x = multivariate_normal.rvs(mean=mu1, cov=sigma, size=n, random_state=42)
y = multivariate_normal.rvs(mean=mu2, cov=sigma, size=n, random_state=42)
z = multivariate_normal.rvs(mean=mu3, cov=sigma, size=n, random_state=42)

X = np.concatenate((x, y, z), axis=0)

In [None]:
# Existing scatter plot
figure = px.scatter(
    x=X[:, 0],
    y=X[:, 1],
    #color=X[:, 1].astype(str),
    #color_discrete_map={'0': '#E65983', '1': '#2D3846'},
    size=[1] * X.shape[0],
)

# Update layout
figure.update_layout(
    title='Distribution',
    title_font=dict(size=16, family='Arial', color='black', weight='bold'),
    xaxis_title='X1',
    yaxis_title='X2',
    plot_bgcolor='white',
    yaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
    xaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
)

figure.show()

## Modelo

In [None]:
# Ajustamos el modelo
kmeans = KMeans(n_clusters=3, max_iter=10, n_init=2, random_state=42)
y = kmeans.fit_predict(X)

In [None]:
# Miramos los centros de los clusters y la dispersión total intra-grupo
print('Centros:\n', kmeans.cluster_centers_)
print('\nDispersión total intra-grupo:', kmeans.inertia_.__format__('.2f'))

Lo que podemos observar en el siguiente gráfico es que, como el algoritmo utiliza la distancia euclídea, y como busca **minimizar** la dispersión **intra-grupo**, prefiere agrupar calculando las distancias horizontalmente y no verticalmente, como debería ser en realidad.

In [None]:
# Existing scatter plot
figure = px.scatter(
    x=X[:, 0],
    y=X[:, 1],
    color=y.astype(str),
    color_discrete_map={'0': '#E65983', '1': '#2D3846', '2': '#4FDFEF'},
    size=[1] * X.shape[0],
)

# Update layout
figure.update_layout(
    title='Distribution',
    title_font=dict(size=16, family='Arial', color='black', weight='bold'),
    xaxis_title='X1',
    yaxis_title='X2',
    plot_bgcolor='white',
    yaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
    xaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
)

# Set x-axis and y-axis limits
figure.update_xaxes(range=[-1, 3])
figure.update_yaxes(range=[-2, 5])

figure.show()

# Ejercicio 3

¿Es posible que algunos valores atípicos puedan quebrar el algoritmo? Probar agregando algunos outliers al conjunto de datos que usamos al comienzo.

## Muestras

In [None]:
# Observaciones
n = 50

# Medias
mu1 = np.array([2, 2])
mu2 = np.array([-1, -1])

# Covarianza
sigma = np.array([[1, 0], [0, 1]])

In [None]:
# Distribuciones
x = multivariate_t.rvs(loc=mu1, shape=sigma, size=n, random_state=42)
y = multivariate_t.rvs(loc=mu2, shape=sigma, size=n, random_state=42)

X = np.concatenate((x, y), axis=0)
X = np.concatenate((X, np.array([[1000, 1000]])), axis=0)

## Modelo

In [None]:
# Ajustamos el modelo
kmeans = KMeans(n_clusters=2, max_iter=10, n_init=2, random_state=42)
y = kmeans.fit_predict(X)

La idea es que, como el algoritmo busca minimizar la dispersión intra-grupo, prefiere clasificar todas las observaciones en un cluster y el outlier en otro, lo cual afecta el rendimiento del algoritmo significativamente.

In [None]:
# Existing scatter plot
figure = px.scatter(
    x=X[:, 0],
    y=X[:, 1],
    color=y.astype(str),
    color_discrete_map={'0': '#E65983', '1': '#2D3846', '2': '#4FDFEF'},
    size=[1] * X.shape[0],
)

# Update layout
figure.update_layout(
    title='K-Means Clustering',
    title_font=dict(size=16, family='Arial', color='black', weight='bold'),
    xaxis_title='X1',
    yaxis_title='X2',
    plot_bgcolor='white',
    yaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
    xaxis=dict(showgrid=True, gridcolor='LightGray', showline=True, linecolor='Black', zeroline=True, zerolinecolor='LightGray'),
)

figure.show()