In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import math
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_20newsgroups
from collections import Counter,deque
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.datasets import fetch_openml
from collections import Counter
from sklearn.preprocessing import MinMaxScaler
from scipy.spatial.distance import cdist


In [2]:
fashion_mnist = fetch_openml('Fashion-MNIST', version=1)
X = fashion_mnist.data.astype('float32')
y = fashion_mnist.target.astype('int')


X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test) 

print(f"Training Data Shape: {X_train.shape}, Training Labels Shape: {y_train.shape}")
print(f"Test Data Shape: {X_test.shape}, Test Labels Shape: {y_test.shape}")

Training Data Shape: (56000, 784), Training Labels Shape: (56000,)
Test Data Shape: (14000, 784), Test Labels Shape: (14000,)


In [3]:
class DBScanCustom:
    def __init__(self,minPts,epsilon):
        self.minPts = minPts
        self.epsilon = epsilon
        self.labels = None
        self.dataset = None

    def preProcessing(self,dataset):
        distMatrix = cdist(dataset,dataset,metric='euclidean')       
        data = []
        for i in range(len(dataset)):
            neighbors = [str(j) for j in range(len(dataset)) if i!=j and distMatrix[i][j] <= self.epsilon]
            data.append([i] + [neighbors] + [len(neighbors)])
        self.dataset = data

    def expand(self,currPt,currClusterId):
        self.labels[currPt] = currClusterId
        queue = deque([currPt])
        visited = set()

        while queue:
            inQueue = queue.popleft()
            inQueueNeighborsCnt = self.dataset[inQueue][2]
            visited.add(int(inQueue))

            if inQueueNeighborsCnt>=self.minPts:
                for n in self.dataset[inQueue][1]:
                    n = int(n)
                    if n in visited:
                        continue
                    if self.labels[n] == -1 and self.dataset[n][2] >= self.minPts:
                        self.labels[n] = currClusterId
                        queue.append(n)
                    
    def runDBSCAN(self,dataset):  
        self.labels = [-1]*len(dataset)
        startClusterId = 0
        self.preProcessing(dataset)
 
        for i in range(len(self.dataset)):
            if self.labels[i] != -1:
                continue
            neighbors = self.dataset[i][1]
            if len(neighbors) >= self.minPts:
                startClusterId += 1
                self.expand(i,startClusterId)
            
        return self.labels
        
    

In [4]:
subset_size = 20000
indices = np.random.choice(len(X_train), subset_size, replace=False)
X_subset = X_train[indices]

dbscan = DBScanCustom(epsilon=4, minPts=17)
labels = dbscan.runDBSCAN(X_subset)

subset_df = pd.DataFrame(X_train[indices].copy())
subset_df['cluster'] = labels

In [5]:
subset_df['cluster'].value_counts()

cluster
-1    14901
 1     5055
 2       25
 3       16
 4        2
 5        1
Name: count, dtype: int64

In [14]:
import numpy as np
from sklearn.metrics import silhouette_score

valid_indices = np.where(np.array(labels) != -1)[0]

X_valid = X_subset[valid_indices]

labels_valid = np.array(labels)[valid_indices].flatten()

if len(np.unique(labels_valid)) > 1:
    score = silhouette_score(X_valid, labels_valid)
    print(f"Silhouette Score: {score}")
else:
    print("Silhouette Score cannot be computed with less than 2 clusters.")


Silhouette Score: 0.05161961540579796
