In [19]:
import numpy as np
from scipy.spatial import distance
from scipy.spatial.distance import cdist
from scipy.spatial import ConvexHull
from scipy.spatial.distance import euclidean
from scipy.spatial import Voronoi
import kneed
from kneed import KneeLocator
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
from scipy.spatial import distance
from scipy.io import arff
import pandas as pd
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt
from scipy.spatial import ConvexHull, Voronoi, distance
import numpy as np
from scipy.stats import multivariate_normal

In [2]:
!pip install kneed

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kneed
  Downloading kneed-0.8.3-py3-none-any.whl (10 kB)
Installing collected packages: kneed
Successfully installed kneed-0.8.3


In [16]:
def euclidean_distance(point1, point2):
    return np.sqrt(np.sum((point1 - point2)**2))

class Cluster:
    def __init__(self, centroid):
        self.centroid = centroid
        self.points = []
        self.radius = 0.0
    
    def update_centroid(self):
        self.centroid = np.mean(self.points, axis=0)
    
    def add_point(self, point):
        self.points.append(point)
    
    def update_radius(self, distance):
        if distance > self.radius:
            self.radius = distance


def k_star_algorithm(K_star, K, dataset, N):
    # Step 1: Arbitrary choose K_star initial centers
    centers = dataset[np.random.choice(len(dataset), K_star, replace=False)]

    clusters = [Cluster(center) for center in centers]

    for point in dataset:
        distances = [euclidean_distance(point, cluster.centroid) for cluster in clusters]
        closest_cluster_index = np.argmin(distances)
        closest_cluster = clusters[closest_cluster_index]
        closest_cluster.add_point(point)

    while True:
        converged = True

        for idx, cluster in enumerate(clusters):
            cluster_set = [c for c in clusters if c != cluster]
            updated_points = []

            for point in cluster.points:
                distances = [euclidean_distance(point, c.centroid) for c in cluster_set]
                closest_cluster_index = np.argmin(distances)
                closest_cluster = cluster_set[closest_cluster_index]
                distance_to_closest_cluster = distances[closest_cluster_index]
                if distance_to_closest_cluster < euclidean_distance(point, cluster.centroid):

                    if distance_to_closest_cluster > cluster.radius + closest_cluster.radius + abs(cluster.radius * closest_cluster.radius):
                        continue  # Skip moving point if Lemma 4 condition is not satisfied

                    closest_cluster.add_point(point)

                    if distance_to_closest_cluster > closest_cluster.radius:
                      closest_cluster.radius = distance_to_closest_cluster
                    
                    # update m'
                    closest_cluster.centroid = closest_cluster.centroid + (euclidean_distance(point, closest_cluster.centroid)/(len(closest_cluster.points)+1))
                    cluster.centroid = cluster.centroid + (euclidean_distance(point, cluster.centroid)/(len(cluster.points)-1))

                elif euclidean_distance(point, cluster.centroid) > cluster.radius:
                    cluster.radius = euclidean_distance(point, cluster.centroid)
                

        if converged:
          if len(clusters) > K:
            clusters = top_n(clusters, N)
          else:
            break

    return clusters



def top_n(clusters, N):
    distances = []

    for i in range(len(clusters)):
        for j in range(i+1, len(clusters)):
            distance = euclidean_distance(clusters[i].centroid, clusters[j].centroid)
            distances.append((distance, i, j))

    distances.sort()
    selected_clusters = set()
    merged_clusters = []
    for distance, i, j in distances[:N]:
        if i not in selected_clusters and j not in selected_clusters:

            merged_clusters.append(clusters[i].points + clusters[j].points)
            selected_clusters.add(i)
            selected_clusters.add(j)
    
    for i in range(len(clusters)):
        if i not in selected_clusters:
            merged_clusters.append(clusters[i].points)


    # form new clusters
    new_clusters = []
    for new_cluster_points in merged_clusters:
        new_cluster = Cluster(np.mean(new_cluster_points, axis=0))
        new_cluster.points = new_cluster_points
        new_clusters.append(new_cluster)

    return new_clusters

In [17]:
class MyDBSCAN:
    def __init__(self, data):
        self.data = data

    def calculate_hull(self):
        hull = ConvexHull(self.data)
        return hull

    def calculate_voronoi(self):
        voronoi = Voronoi(self.data)
        return voronoi

    def calculate_interiors(self, hull, voronoi):
        v_vertices = np.array(voronoi.vertices)
        hull_points = np.array(self.data[hull.vertices])
        interiors = []
        for i in range(len(v_vertices)):
            new_array = np.append(hull_points, [v_vertices[i]], axis=0)
            convex_hull_to_test = ConvexHull(new_array)

            # check if the array is unchanged
            new_hull_points = np.array(new_array[convex_hull_to_test.vertices])
            if np.array_equal(new_hull_points, hull_points):
                interiors.append(v_vertices[i])
        return interiors

    def calculate_eps_ec(self, interiors):
        possible_centers = np.array(interiors)
        distances = distance.cdist(possible_centers, self.data, metric='euclidean')
        min_distances = [min(distances[i]) for i in range(len(distances))]
        radius = np.sort(min_distances)
        elbow = KneeLocator(range(len(radius)), radius, curve='convex', direction='increasing')
        eps_ec = radius[elbow.knee]
        return eps_ec

    def find_best_min_points(self, eps_ec):
        best_scores = {}
        for i in range(2, 100):
            dbscan = DBSCAN(eps=eps_ec, min_samples=i)
            labels = dbscan.fit_predict(self.data)
            if len(set(labels)) > 1:
                score = silhouette_score(self.data, labels)
                best_scores[score] = [i, eps_ec]
            else:
                continue
        best_min_points = max(best_scores.keys())
        return best_scores[best_min_points]

    def assign_labels(self, best_min_points):
        dbscan = DBSCAN(eps=best_min_points[1], min_samples=best_min_points[0])
        labels = dbscan.fit_predict(self.data)
        noise = []
        unique_labels = np.unique(labels)
        centroids = []
        for label in unique_labels:
            if label != -1:
                centroid = np.mean(self.data[labels == label], axis=0)
                centroids.append((label, centroid))
        for i in range(len(labels)):
            distance_centr = []
            if labels[i] == -1:
                temp = self.data[i]
                for j in range(len(centroids)):
                    d = distance.euclidean(temp, centroids[j][1])
                    distance_centr.append((d, centroids[j][0]))
                sorted_distance = sorted(distance_centr, key=lambda x: x[0])
                labels[i] = sorted_distance[0][1]
        return labels

    def run(self):
        hull = self.calculate_hull()
        voronoi = self.calculate_voronoi()
        interiors = self.calculate_interiors(hull, voronoi)
        eps_ec = self.calculate_eps_ec(interiors)
        best_min_points = self.find_best_min_points(eps_ec)
        labels = self.assign_labels(best_min_points)
        return labels


# Usage
data = X  # Assuming X is defined
dbscan = MyDBSCAN(data)
labels = dbscan.run()

In [18]:
class GMM:
    def __init__(self, n_components, max_iter=1000):
        self.n_components = n_components
        self.max_iter = max_iter
        self.comp_names = [index for index in range(self.n_components)]
        self.pi = [1/self.n_components for comp in range(self.n_components)]

    def multivariate_normal(self, X, mean_vector, covariance_matrix):
        return multivariate_normal(mean_vector, covariance_matrix).pdf(X)

    def e_step(self, X, means, covariances, phis):
        num_gaussians = len(means)
        W = np.zeros((num_gaussians, len(X)))
        for i in range(num_gaussians):
            W[i] = self.multivariate_normal(X, means[i], covariances[i])
        W = W / W.sum(axis=0)
        return W

    def m_step(self, X, W):
        num_gaussians = len(W)
        m, n = X.shape
      
        phis = np.zeros(num_gaussians)
        means = np.zeros((num_gaussians, n))
        covariances = np.zeros((num_gaussians, n, n))
      
        for j in range(num_gaussians):
            phis[j] = np.sum(W[j]) / m
            means[j] = np.sum(W[j][:, None] * X, axis=0) / np.sum(W[j])
            covariances[j] = np.dot((W[j][:, None] * (X - means[j])).T, (X - means[j])) / np.sum(W[j])
        return phis, means, covariances

    def fit(self, X):
        new_X = np.array_split(X, self.n_components)
        self.mean_vector = X[np.random.choice(X.shape[0], size=self.n_components, replace=False)]
        self.covariance_matrices = np.ones((self.n_components, X.shape[1], X.shape[1])) * np.identity(X.shape[1])

        for iteration in range(self.max_iter):
            W = self.e_step(X, self.mean_vector, self.covariance_matrices, self.pi)
            self.pi, self.mean_vector, self.covariance_matrices = self.m_step(X, W)

    def predict(self, X):
        '''
        The predicting function
        :param X: 2-d array numpy array
        The data on which we must predict the clusters
        '''
        probas = []
        for n in range(len(X)):
            probas.append([self.multivariate_normal(X[n], self.mean_vector[k], self.covariance_matrices[k])
                           for k in range(self.n_components)])
        cluster = []
        for proba in probas:
            cluster.append(self.comp_names[proba.index(max(proba))])
        return cluster
