In [1]:
import numpy as np

In [4]:
class KMeans:
    def __init__(self, n_clusters=3, max_iters=100, random_state=None):
        self.n_clusters = n_clusters
        self.max_iters = max_iters
        self.random_state = random_state
        self.centroids = None
        
    def fit(self, X):
        np.random.seed(self.random_state)
        
        # Initialize centroids randomly
        idx = np.random.choice(len(X), self.n_clusters, replace=False)
        print('idx:', idx)
        self.centroids = X[idx]
        print('centroids:', self.centroids)
        
        for _ in range(self.max_iters):
            # Assign points to nearest centroid
            distances = self._calculate_distances(X)
            print('distances:', distances)
            labels = np.argmin(distances, axis=0)
            print('labels:', labels)
            
            # Update centroids
            new_centroids = np.array([
                X[labels == k].mean(axis=0)
                for k in range(self.n_clusters)
            ])
            print('new_centroids:', new_centroids)
            
            # Check for convergence
            if np.all(self.centroids == new_centroids):
                break
            
            self.centroids = new_centroids
        
        return labels
    
    def predict(self, X):
        distances = self._calculate_distances(X)
        return np.argmin(distances, axis=0)
    
    def _calculate_distances(self, X):
        """
        Calculate distances between each point in X and all centroids.
        """
        n_samples = X.shape[0]
        distances = np.zeros((self.n_clusters, n_samples))
        
        for i in range(n_samples):
            diff = self.centroids - X[i]
            distances[:, i] = np.linalg.norm(diff, axis=1)
        
        return distances

In [7]:
def test_kmeans():
    # Test case 1: Simple 2D dataset
    X1 = np.array([[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11]])
    kmeans = KMeans(n_clusters=2, random_state=42)
    labels1 = kmeans.fit(X1)
    assert len(np.unique(labels1)) == 2, "Test case 1 failed: Incorrect number of clusters"
    
    # Test case 2: Larger random dataset
    np.random.seed(42)
    X2 = np.vstack((np.random.randn(100, 2) * 0.5 + np.array([2, 2]),
                    np.random.randn(100, 2) * 0.5 + np.array([-2, -2]),
                    np.random.randn(100, 2) * 0.5 + np.array([2, -2])))
    kmeans = KMeans(n_clusters=3, random_state=42)
    labels2 = kmeans.fit(X2)
    assert len(np.unique(labels2)) == 3, "Test case 2 failed: Incorrect number of clusters"
    
    # Test case 3: Predict on new data
    X_new = np.array([[0, 0], [5, 5]])
    predicted_labels = kmeans.predict(X_new)
    assert len(predicted_labels) == 2, "Test case 3 failed: Incorrect number of predictions"
    
    print("All test cases passed!")

test_kmeans()

All test cases passed!


In [6]:
class KMeans:
    def __init__(self, n_clusters=3, max_iters=100, random_state=None):
        self.n_clusters = n_clusters
        self.max_iters = max_iters
        self.random_state = random_state
        self.centroids = None
        
    def fit(self, X):
        np.random.seed(self.random_state)
        
        # Initialize centroids randomly
        idx = np.random.choice(len(X), self.n_clusters, replace=False)
        self.centroids = X[idx]
        
        for _ in range(self.max_iters):
            # Assign points to nearest centroid using the optimized distance function
            distances = self._compute_distances(X)
            labels = np.argmin(distances, axis=1)
            
            # Update centroids
            new_centroids = np.array([X[labels == k].mean(axis=0) for k in range(self.n_clusters)])
            
            # Check for convergence
            if np.all(self.centroids == new_centroids):
                break
            
            self.centroids = new_centroids
        
        return labels
    
    def predict(self, X):
        distances = self._compute_distances(X)
        return np.argmin(distances, axis=1)

    def _compute_distances(self, X):
        # Using broadcasting to compute distances efficiently    
        return np.sqrt(((X[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2).sum(axis=2)) 