In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from typing import List, Tuple

In [None]:
def load_dataset(path: str) -> np.ndarray:
    """Load dataset from CSV; generate synthetic data if not available."""
    
    csv_path = Path(path)
    if csv_path.exists():
        try:
            df = pd.read_csv(csv_path)
            # if file is a git-lfs pointer, it will only have few lines/columns
            if df.shape[1] > 1:
                return df.values.astype(float)
        except Exception:
            pass
    # fallback synthetic data
    rng = np.random.default_rng(0)
    X = np.vstack([
        rng.normal(loc=-2, scale=0.5, size=(100, 2)),
        rng.normal(loc=0, scale=0.5, size=(100, 2)),
        rng.normal(loc=2, scale=0.5, size=(100, 2)),
    ])
    print("Warning: Using synthetic dataset with three clusters.")
    return X

In [None]:
def initialize_centroids(X: np.ndarray, k: int, rng: np.random.Generator, method: str) -> np.ndarray:
    if method == "random":
        indices = rng.choice(X.shape[0], size=k, replace=False)
        return X[indices]
    elif method == "kmeans++":
        centroids = []
        # choose first centroid randomly
        centroids.append(X[rng.integers(0, X.shape[0])])
        for _ in range(1, k):
            dist_sq = np.min(np.linalg.norm(X[:, None, :] - np.array(centroids)[None, :, :], axis=2) ** 2, axis=1)
            probs = dist_sq / dist_sq.sum()
            centroids.append(X[rng.choice(X.shape[0], p=probs)])
        return np.array(centroids)
    else:
        raise ValueError(f"Unknown init method: {method}")

In [None]:
def assign_clusters(X: np.ndarray, centroids: np.ndarray) -> np.ndarray:
    distances = np.linalg.norm(X[:, None, :] - centroids[None, :, :], axis=2)
    return np.argmin(distances, axis=1)

In [None]:
def compute_centroids(X: np.ndarray, labels: np.ndarray, k: int) -> np.ndarray:
    centroids = np.zeros((k, X.shape[1]))
    for i in range(k):
        cluster_points = X[labels == i]
        if len(cluster_points) == 0:
            centroids[i] = X[np.random.randint(0, X.shape[0])]
        else:
            centroids[i] = cluster_points.mean(axis=0)
    return centroids

In [None]:
def inertia(X: np.ndarray, labels: np.ndarray, centroids: np.ndarray) -> float:
    return np.sum((X - centroids[labels]) ** 2)

In [None]:
def kmeans(X: np.ndarray, k: int, init: str, max_iters: int = 300, tol: float = 1e-4, seed: int = None) -> Tuple[np.ndarray, np.ndarray, int, List[float]]:
    rng = np.random.default_rng(seed)
    centroids = initialize_centroids(X, k, rng, init)
    history = []
    for it in range(max_iters):
        labels = assign_clusters(X, centroids)
        current_inertia = inertia(X, labels, centroids)
        history.append(current_inertia)
        new_centroids = compute_centroids(X, labels, k)
        shift = np.linalg.norm(new_centroids - centroids)
        centroids = new_centroids
        if shift < tol:
            break
    labels = assign_clusters(X, centroids)
    history.append(inertia(X, labels, centroids))
    return centroids, labels, it + 1, history

In [None]:
data_path = Path("CSC2042S-Assignment1-Data/cleaned_data.csv")
X = load_dataset(data_path)
k = 3
runs = 5

### 3. Convergence Criteria
This section explores different stopping rules for the K-means algorithm.

In [None]:
def kmeans_convergence(
    X: np.ndarray,
    k: int,
    init: str,
    max_iters: int = 300,
    centroid_tol: float | None = 1e-4,
    inertia_tol: float | None = None,
    seed: int | None = None,
) -> Tuple[np.ndarray, np.ndarray, int, List[float], List[float]]:
    """K-means with configurable convergence criteria.
    
    Returns centroids, labels, iterations, inertia history and centroid-shift history."""
    
    rng = np.random.default_rng(seed)
    centroids = initialize_centroids(X, k, rng, init)
    history = []
    shift_history = []
    prev_inertia: float | None = None
    
    for it in range(max_iters):
        labels = assign_clusters(X, centroids)
        current_inertia = inertia(X, labels, centroids)
        history.append(current_inertia)
        
        new_centroids = compute_centroids(X, labels, k)
        shift = np.linalg.norm(new_centroids - centroids)
        shift_history.append(float(shift))
        
        if centroid_tol is not None and shift < centroid_tol:
            centroids = new_centroids
            break
            
        if prev_inertia is not None and inertia_tol is not None:
            if abs(prev_inertia - current_inertia) < inertia_tol:
                centroids = new_centroids
                break
                
        centroids = new_centroids
        prev_inertia = current_inertia
    else:
        labels = assign_clusters(X, centroids)
        history.append(inertia(X, labels, centroids))
        return centroids, labels, max_iters, history, shift_history
    
    labels = assign_clusters(X, centroids)
    history.append(inertia(X, labels, centroids))
    return centroids, labels, it + 1, history, shift_history

In [None]:
def plot_convergence(history: List[float], shifts: List[float], title: str, filename: str):
    """Plot inertia and centroid shift over iterations."""
    iters = range(len(history))
    
    plt.figure(figsize=(8,4))
    
    plt.subplot(1,2,1)
    plt.plot(iters, history, marker='o')
    plt.xlabel('Iteration')
    plt.ylabel('Inertia')
    
    plt.subplot(1,2,2)
    plt.plot(range(1, len(shifts)+1), shifts, marker='o')
    plt.xlabel('Iteration')
    plt.ylabel('Centroid shift')
    
    plt.suptitle(title)
    plt.tight_layout()
    plt.savefig(filename)
    plt.close()

In [None]:
# Evaluate different convergence criteria
Path("figures").mkdir(exist_ok=True)

configs = [
    ("max_iter_10", {"max_iters": 10, "centroid_tol": None, "inertia_tol": None}),
    ("centroid_tol_1e-2", {"centroid_tol": 1e-2}),
    ("inertia_tol_1e-2", {"inertia_tol": 1e-2}),
]

for name, params in configs:
    c, lbls, iters, hist, shifts = kmeans_convergence(X, k, init="kmeans++", **params)
    print(f"{name}: iterations={iters}, final inertia={hist[-1]:.3f}")
    plot_convergence(hist, shifts, title=name.replace('_', ' '), filename=f"figures/{name}_convergence.png")