In [None]:
import numpy as np
from sklearn.neighbors import NearestNeighbors
from collections import deque

class DBSCAN:
    def __init__(self, eps=0.5, min_samples=5):
        self.eps = eps          # 邻域半径
        self.min_samples = min_samples  # 核心点所需的最小邻域点数
        self.labels_ = None     # 聚类结果标签
    
    def fit(self, X):
        n_samples = X.shape[0]
        self.labels_ = np.full(n_samples, -1)  # 初始化为-1 (未访问)
        cluster_id = 0
        
        # 使用KD树加速邻域查询
        neighbors_model = NearestNeighbors(radius=self.eps).fit(X)
        
        for i in range(n_samples):
            if self.labels_[i] != -1:  # 已访问过
                continue
                
            # 获取i点的ε-邻域点
            neighbors = neighbors_model.radius_neighbors([X[i]], return_distance=False)[0]
            
            if len(neighbors) < self.min_samples:  # 标记为噪声
                self.labels_[i] = -1
            else:  # 新簇
                self._expand_cluster(X, i, neighbors, cluster_id, neighbors_model)
                cluster_id += 1
        return self
    
    def _expand_cluster(self, X, index, neighbors, cluster_id, neighbors_model):
        """扩展簇"""
        queue = deque(neighbors)
        self.labels_[index] = cluster_id
        
        while queue:
            current_point = queue.popleft()
            
            if self.labels_[current_point] == -1:  # 之前标记为噪声，现在是边界点
                self.labels_[current_point] = cluster_id
                
            if self.labels_[current_point] != -1:  # 已处理过
                continue
                
            self.labels_[current_point] = cluster_id
            
            # 获取当前点的邻域
            current_neighbors = neighbors_model.radius_neighbors(
                [X[current_point]], return_distance=False)[0]
            
            if len(current_neighbors) >= self.min_samples:  # 是核心点
                for n in current_neighbors:
                    if self.labels_[n] == -1 or self.labels_[n] == 0:
                        if n not in queue:
                            queue.append(n)

In [None]:
from sklearn.datasets import make_moons
import matplotlib.pyplot as plt

# 生成测试数据
X, _ = make_moons(n_samples=300, noise=0.05, random_state=42)

# 使用DBSCAN聚类
dbscan = DBSCAN(eps=0.2, min_samples=5)
dbscan.fit(X)

# 可视化结果
plt.scatter(X[:, 0], X[:, 1], c=dbscan.labels_, cmap='viridis')
plt.title("DBSCAN Clustering")
plt.show()