In [1]:
import numpy as np 
import math

class DBSCAN:
    
    UNLABELLED_DATA = -1
        
    metrics = 'euclidean'    
    eps = 0.5
    min_pts = 5
    available_metrics = ['euclidean', 'manhattan']

    def __init__ (self, min_pts=min_pts, eps=eps, metrics=metrics):
        
        if eps <= 0:
            raise Exception('eps must be higher than 0')
        if min_pts <= 0:
            raise Exception('min_pts must be higher than 0')
        if metrics not in self.available_metrics:
            raise Exception('No metrics \'' + str(metrics) + '\'. Available metrics '+ str(self.available_metrics))
            
        self.min_pts = min_pts
        self.eps = eps
        self.metrics=metrics
        
    def __euclidean_distance(self, point_a, point_b):
        dist = 0
        for a, b in zip(point_a, point_b):
            dist += (a - b) * (a - b)
        return np.sqrt(dist)

    def __manhattan_distance(self, point_a, point_b):
        dist = 0
        for a, b in zip(point_a, point_b):
            dist += abs(a - b)
        return dist
    
    def __distance(self, point_a, point_b, metrics=metrics):
        # 
        if len(point_a) == len(point_b):
            if metrics == 'euclidean':
                return self.__euclidean_distance(point_a, point_b)
            if metrics == 'manhattan':
                return self.__manhattan_distance(point_a, point_b)
        else:
            raise Exception("feature length doesn't same")
    
    def fit_predict(self, data):
        size_data = len(data)
        
        # generate all neighbours 
        neighbours = []
        for i in range(size_data):
            neighbour_i = []
            for j in range(size_data):
                if self.__distance(data[i], data[j], self.metrics) <= self.eps:
                    neighbour_i.append(j)
            neighbours.append(neighbour_i)
        
        # initialize label
        result = np.full((size_data), self.UNLABELLED_DATA)
        
        # giving label to data
        curr_label = 0
        for i in range(size_data):
            # if neighbours > min_pts (core points) then give label 
            if len(neighbours[i]) >= self.min_pts and result[i] == self.UNLABELLED_DATA: 
                label = curr_label
                # giving label to all neighbours
                neighbours_i = [i]
                while len(neighbours_i) > 0:
                    neigh_i = neighbours_i.pop()
                    # if not yet labelled then give label to data and the neighbours
                    if result[neigh_i] == self.UNLABELLED_DATA:
                        result[neigh_i] = label
                        # if neigh_i is core point, then give label to the neighbour
                        if len(neighbours[neigh_i]) >= self.min_pts:
                            neighbours_i += neighbours[neigh_i]
                curr_label += 1
                
        return result