<a href="https://colab.research.google.com/github/richirey75/Data-Mining-CS4990/blob/main/CS4990GroupAssgn3DBScanModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##DBScan From scratch

In [3]:
import numpy as np

def dbscan(data, columns, eps, min_samples):
    """
    Perform DBSCAN clustering on the specified columns of a dataset.

    Parameters:
      data        : Iterable containing data instances.
      columns     : List of column indices to use for clustering.
      eps         : Radius to consider neighbors.
      min_samples : Minimum number of neighbors to be a core point.

    Returns:
      List of clusters (each cluster is a list of data instances) and a list of noise points.
    """

    # Create a data subset based on specified columns for clustering
    # Changed line: Use advanced indexing with columns list
    D = np.array([instance for instance in data])
    D = D[:, columns]
    labels = [0] * len(D)  # Initialize all labels to 0 (unvisited)
    C = 0  # Cluster ID

    for P in range(len(D)):
        if labels[P] != 0:  # Skip if the point has already been processed
            continue

        NeighborPts = region_query(D, P, eps)

        # Check if the point is a core point
        if len(NeighborPts) < min_samples:
            labels[P] = -1  # Mark as noise
        else:
            C += 1  # Increment cluster ID
            grow_cluster(D, labels, P, NeighborPts, C, eps, min_samples)

    # Organize points into clusters and noise
    clusters = [[] for _ in range(C)]
    noise = []
    for i, label in enumerate(labels):
        if label == -1:
            noise.append(data[i])
        else:
            clusters[label - 1].append(data[i])

    return clusters, noise
def grow_cluster(D, labels, P, NeighborPts, C, eps, min_samples):
    """
    Grow a cluster from the seed point `P`.

    Parameters:
      D           : Data subset with relevant columns
      labels      : List of point labels
      P           : Seed point index
      NeighborPts : List of neighbors of the seed point
      C           : Cluster ID
      eps         : Radius to consider neighbors
      min_samples : Minimum number of neighbors to be a core point
    """
    labels[P] = C  # Assign the cluster label to the seed point

    i = 0
    while i < len(NeighborPts):
        Pn = NeighborPts[i]

        if labels[Pn] == -1:
            labels[Pn] = C  # Convert noise to border point

        elif labels[Pn] == 0:
            labels[Pn] = C
            PnNeighborPts = region_query(D, Pn, eps)
            if len(PnNeighborPts) >= min_samples:
                NeighborPts += PnNeighborPts

        i += 1


def region_query(D, P, eps):
    """
    Find neighbors within `eps` distance from point `P`.

    Parameters:
      D   : Data subset with relevant columns
      P   : Index of the point in question
      eps : Radius to consider neighbors

    Returns:
      List of indices of points within `eps` distance of `P`.
    """
    neighbors = []
    for Pn in range(len(D)):
        if np.linalg.norm(D[P] - D[Pn]) < eps:
            neighbors.append(Pn)

    return neighbors


##Comparision with sklearn's DBScan

In [4]:

import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN as sklearn_DBSCAN
from sklearn.preprocessing import StandardScaler


def test_dbscan():
    """
    Tests the DBSCAN implementation against scikit-learn's DBSCAN.
    """

    # Load your dataset
    data = pd.read_csv("combined_tracks_info_ArijitSingh_KK.csv")  # Replace with your file name

    # Select the relevant numerical columns for clustering
    # You may need to adjust these columns based on your dataset.
    columns_to_cluster = [
        "danceability", "energy", "loudness", "speechiness", "acousticness",
        "instrumentalness", "liveness", "valence", "tempo"
    ]

    X = data[columns_to_cluster].values

    # Scale the data for better performance
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # DBSCAN parameters
    eps = 0.5
    min_samples = 5

    # Run your DBSCAN implementation
    clusters_custom, noise_custom = dbscan(
        data=[list(row) for row in X_scaled],
        columns=list(range(len(columns_to_cluster))),
        eps=eps,
        min_samples=min_samples
    )

    # Run scikit-learn's DBSCAN
    dbscan_sklearn = sklearn_DBSCAN(eps=eps, min_samples=min_samples)
    labels_sklearn = dbscan_sklearn.fit_predict(X_scaled)

    # Compare the results
    clusters_sklearn = [[] for _ in range(len(set(labels_sklearn)) - (1 if -1 in labels_sklearn else 0))]
    noise_sklearn = []
    for i, label in enumerate(labels_sklearn):
      if label == -1:
        noise_sklearn.append(X_scaled[i])
      else:
        clusters_sklearn[label].append(X_scaled[i])


    # You can implement more detailed comparison here
    # For example, compare the number of clusters, noise points, etc.
    print("Number of clusters (custom):", len(clusters_custom))
    print("Number of noise points (custom):", len(noise_custom))

    print("Number of clusters (scikit-learn):", len(clusters_sklearn))
    print("Number of noise points (scikit-learn):", len(noise_sklearn))


if __name__ == "__main__":
    test_dbscan()

Number of clusters (custom): 13
Number of noise points (custom): 399
Number of clusters (scikit-learn): 13
Number of noise points (scikit-learn): 399


In [21]:

import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN as sklearn_DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import adjusted_rand_score


def dbscan(data, columns, eps, min_samples):
    """
    Perform DBSCAN clustering on the specified columns of a dataset.

    Parameters:
      data        : Iterable containing data instances.
      columns     : List of column indices to use for clustering.
      eps         : Radius to consider neighbors.
      min_samples : Minimum number of neighbors to be a core point.

    Returns:
      List of clusters (each cluster is a list of data instances) and a list of noise points.
    """

    # Create a data subset based on specified columns for clustering
    # Changed line: Use advanced indexing with columns list
    D = np.array([instance for instance in data])
    D = D[:, columns]
    labels = [0] * len(D)  # Initialize all labels to 0 (unvisited)
    C = 0  # Cluster ID

    for P in range(len(D)):
        if labels[P] != 0:  # Skip if the point has already been processed
            continue

        NeighborPts = region_query(D, P, eps)

        # Check if the point is a core point
        if len(NeighborPts) < min_samples:
            labels[P] = -1  # Mark as noise
        else:
            C += 1  # Increment cluster ID
            grow_cluster(D, labels, P, NeighborPts, C, eps, min_samples)

    # Organize points into clusters and noise
    clusters = [[] for _ in range(C)]
    noise = []
    for i, label in enumerate(labels):
        if label == -1:
            noise.append(data[i])
        else:
            clusters[label - 1].append(data[i])

    return clusters, noise
def grow_cluster(D, labels, P, NeighborPts, C, eps, min_samples):
    """
    Grow a cluster from the seed point `P`.

    Parameters:
      D           : Data subset with relevant columns
      labels      : List of point labels
      P           : Seed point index
      NeighborPts : List of neighbors of the seed point
      C           : Cluster ID
      eps         : Radius to consider neighbors
      min_samples : Minimum number of neighbors to be a core point
    """
    labels[P] = C  # Assign the cluster label to the seed point

    i = 0
    while i < len(NeighborPts):
        Pn = NeighborPts[i]

        if labels[Pn] == -1:
            labels[Pn] = C  # Convert noise to border point

        elif labels[Pn] == 0:
            labels[Pn] = C
            PnNeighborPts = region_query(D, Pn, eps)
            if len(PnNeighborPts) >= min_samples:
                NeighborPts += PnNeighborPts

        i += 1


def region_query(D, P, eps):
    """
    Find neighbors within `eps` distance from point `P`.

    Parameters:
      D   : Data subset with relevant columns
      P   : Index of the point in question
      eps : Radius to consider neighbors

    Returns:
      List of indices of points within `eps` distance of `P`.
    """
    neighbors = []
    for Pn in range(len(D)):
        if np.linalg.norm(D[P] - D[Pn]) < eps:
            neighbors.append(Pn)

    return neighbors




def test_dbscan():
    """
    Tests the DBSCAN implementation against scikit-learn's DBSCAN.
    """

    # Load your dataset
    data = pd.read_csv("combined_tracks_info_ArijitSingh_KK.csv")  # Replace with your file name

    # Select the relevant numerical columns for clustering
    # You may need to adjust these columns based on your dataset.
    columns_to_cluster = [
        "danceability", "energy", "loudness", "speechiness", "acousticness",
        "instrumentalness", "liveness", "valence", "tempo"
    ]

    X = data[columns_to_cluster].values

    # Scale the data for better performance
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # DBSCAN parameters
    eps = 0.8
    min_samples = 7

    # Run your DBSCAN implementation
    clusters_custom, noise_custom = dbscan(
        data=[list(row) for row in X_scaled],
        columns=list(range(len(columns_to_cluster))),
        eps=eps,
        min_samples=min_samples
    )

    # Run scikit-learn's DBSCAN
    dbscan_sklearn = sklearn_DBSCAN(eps=eps, min_samples=min_samples)
    labels_sklearn = dbscan_sklearn.fit_predict(X_scaled)

    # Compare the results
    clusters_sklearn = [[] for _ in range(len(set(labels_sklearn)) - (1 if -1 in labels_sklearn else 0))]
    noise_sklearn = []
    labels_custom = [-1] * len(X_scaled)
    for i, cluster in enumerate(clusters_custom):
        for point in cluster:
          index = np.where((X_scaled == point).all(axis=1))[0][0]
          labels_custom[index] = i

    for i, label in enumerate(labels_sklearn):
      if label == -1:
        noise_sklearn.append(X_scaled[i])
      else:
        clusters_sklearn[label].append(X_scaled[i])


    # Calculate Adjusted Rand Index (ARI) to evaluate cluster similarity
    ari = adjusted_rand_score(labels_sklearn, labels_custom)
    print("Adjusted Rand Index:", ari)


if __name__ == "__main__":
    test_dbscan()

Adjusted Rand Index: 0.5798440646164023
