# HW07 — Unsupervised Clustering (KMeans + DBSCAN)


In [1]:
import json
import math
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score
from sklearn.neighbors import NearestNeighbors

RANDOM_STATE = 42
BASE = Path('.')
DATA_DIR = BASE / 'data'
ART_DIR = BASE / 'artifacts'
FIG_DIR = ART_DIR / 'figures'
LABELS_DIR = ART_DIR / 'labels'

for d in [DATA_DIR, ART_DIR, FIG_DIR, LABELS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

DATASETS = [
    'S07-hw-dataset-01.csv',
    'S07-hw-dataset-02.csv',
    'S07-hw-dataset-03.csv',
]
print('Datasets:', DATASETS)

Datasets: ['S07-hw-dataset-01.csv', 'S07-hw-dataset-02.csv', 'S07-hw-dataset-03.csv']


## Утилиты

In [2]:
def load_dataset(path: Path):
    df = pd.read_csv(path)
    if 'sample_id' not in df.columns:
        raise ValueError('Ожидался столбец sample_id')
    sample_id = df['sample_id'].copy()
    X = df.drop(columns=['sample_id'])
    return df, sample_id, X

def internal_metrics(X_scaled: np.ndarray, labels: np.ndarray, noise_label: int = -1):
    labels = np.asarray(labels)
    has_noise = np.any(labels == noise_label)
    noise_fraction = float(np.mean(labels == noise_label)) if has_noise else 0.0

    if has_noise:
        mask = labels != noise_label
        X_eval = X_scaled[mask]
        labels_eval = labels[mask]
    else:
        X_eval = X_scaled
        labels_eval = labels

    n_clusters = len(set(labels_eval.tolist()))
    if n_clusters < 2:
        return {
            'silhouette': float('nan'),
            'davies_bouldin': float('nan'),
            'calinski_harabasz': float('nan'),
            'noise_fraction': noise_fraction,
            'n_clusters_eval': int(n_clusters),
            'n_points_eval': int(len(X_eval)),
        }

    return {
        'silhouette': float(silhouette_score(X_eval, labels_eval)),
        'davies_bouldin': float(davies_bouldin_score(X_eval, labels_eval)),
        'calinski_harabasz': float(calinski_harabasz_score(X_eval, labels_eval)),
        'noise_fraction': noise_fraction,
        'n_clusters_eval': int(n_clusters),
        'n_points_eval': int(len(X_eval)),
    }

def pca_scatter(X_scaled: np.ndarray, labels: np.ndarray, title: str, outpath: Path):
    Z = PCA(n_components=2, random_state=RANDOM_STATE).fit_transform(X_scaled)
    plt.figure(figsize=(6,4))
    plt.scatter(Z[:,0], Z[:,1], c=labels, s=10)
    plt.title(title)
    plt.xlabel('PC1'); plt.ylabel('PC2')
    plt.tight_layout()
    plt.savefig(outpath, dpi=160)
    plt.close()

def plot_curve(xs, ys, title, xlabel, ylabel, outpath: Path):
    plt.figure(figsize=(6,4))
    plt.plot(xs, ys, marker='o')
    plt.title(title)
    plt.xlabel(xlabel); plt.ylabel(ylabel)
    plt.tight_layout()
    plt.savefig(outpath, dpi=160)
    plt.close()

def k_distance_eps_candidates(X_scaled: np.ndarray, k: int = 5, n_candidates: int = 7):
    nbrs = NearestNeighbors(n_neighbors=k).fit(X_scaled)
    dists, _ = nbrs.kneighbors(X_scaled)
    kth = np.sort(dists[:, -1])
    percentiles = np.linspace(15, 85, n_candidates)
    eps = np.unique(np.round(np.percentile(kth, percentiles), 3))
    return eps.tolist(), kth

## Эксперимент: KMeans (k=2..12) и DBSCAN (eps, min_samples)

In [3]:
K_RANGE = list(range(2, 13))
MIN_SAMPLES_GRID = [3, 5, 8, 12]

metrics_summary = {}
best_configs = {}

for fname in DATASETS:
    path = DATA_DIR / fname
    df_raw, sample_id, X_df = load_dataset(path)

    # --- ЯВНЫЙ препроцессинг (обязательный для distance-based)
    # Поддерживаем и числовые, и категориальные признаки (на этих датасетах категориальных нет, но код готов под dataset-04).
    num_cols = X_df.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in X_df.columns if c not in num_cols]

    numeric_pipe = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])

    categorical_pipe = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('ohe', OneHotEncoder(handle_unknown='ignore')),
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_pipe, num_cols),
            ('cat', categorical_pipe, cat_cols),
        ],
        remainder='drop'
    )

    Xs = preprocessor.fit_transform(X_df)
    Xs = np.asarray(Xs)  # на всякий случай приводим к ndarray

    ds_key = fname.replace('.csv','')
    metrics_summary[ds_key] = {}

    # --- KMeans
    km_sils = []
    best_k = None
    best_km_labels = None
    best_km_metrics = None

    for k in K_RANGE:
        km = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init=20)
        labels = km.fit_predict(Xs)
        m = internal_metrics(Xs, labels)
        metrics_summary[ds_key][f'KMeans(k={k})'] = m
        km_sils.append(m['silhouette'])
        if not math.isnan(m['silhouette']) and (best_km_metrics is None or m['silhouette'] > best_km_metrics['silhouette']):
            best_k = k
            best_km_labels = labels
            best_km_metrics = m

    plot_curve(K_RANGE, km_sils,
              title=f'{ds_key}: KMeans silhouette vs k',
              xlabel='k', ylabel='silhouette',
              outpath=FIG_DIR / f'{ds_key}_kmeans_silhouette_vs_k.png')

    # --- DBSCAN
    eps_candidates, kth = k_distance_eps_candidates(Xs, k=5, n_candidates=7)

    plt.figure(figsize=(6,4))
    plt.plot(np.arange(len(kth)), kth, marker='.', linewidth=1)
    plt.title(f'{ds_key}: k-distance plot (k=5)')
    plt.xlabel('points sorted'); plt.ylabel('5-NN distance')
    plt.tight_layout()
    plt.savefig(FIG_DIR / f'{ds_key}_k_distance_plot.png', dpi=160)
    plt.close()

    db_best_metrics = None
    db_best_labels = None
    db_best_params = None

    for eps in eps_candidates:
        for ms in MIN_SAMPLES_GRID:
            db = DBSCAN(eps=eps, min_samples=ms, n_jobs=1)
            labels = db.fit_predict(Xs)
            m = internal_metrics(Xs, labels, noise_label=-1)
            metrics_summary[ds_key][f'DBSCAN(eps={eps},min_samples={ms})'] = m
            if math.isnan(m['silhouette']):
                continue
            if (db_best_metrics is None
                or m['silhouette'] > db_best_metrics['silhouette'] + 1e-12
                or (abs(m['silhouette'] - db_best_metrics['silhouette']) <= 1e-12 and m['noise_fraction'] < db_best_metrics['noise_fraction'])):
                db_best_metrics = m
                db_best_labels = labels
                db_best_params = {'eps': float(eps), 'min_samples': int(ms)}

    # --- Выбор лучшего
    chosen_method = 'KMeans'
    chosen_params = {'k': int(best_k), 'random_state': RANDOM_STATE, 'n_init': 20}
    chosen_labels = best_km_labels
    chosen_metrics = best_km_metrics

    if db_best_metrics is not None:
        if math.isnan(best_km_metrics['silhouette']) or db_best_metrics['silhouette'] > best_km_metrics['silhouette'] + 0.02:
            chosen_method = 'DBSCAN'
            chosen_params = db_best_params
            chosen_labels = db_best_labels
            chosen_metrics = db_best_metrics

    best_configs[ds_key] = {
        'chosen_method': chosen_method,
        'params': chosen_params,
        'selection_criterion': 'maximize silhouette (DBSCAN: on non-noise points), choose DBSCAN if silhouette improves by > 0.02',
        'metrics': chosen_metrics,
        'dbscan_eps_candidates_from_k_distance_percentiles': eps_candidates,
    }

    # PCA scatter for chosen
    pca_scatter(Xs, chosen_labels, f'{ds_key}: best {chosen_method} PCA(2D)', FIG_DIR / f'{ds_key}_best_pca2d.png')

    # labels в artifacts/labels/
    pd.DataFrame({'sample_id': sample_id, 'cluster_label': chosen_labels}).to_csv(LABELS_DIR / f'labels_{ds_key}.csv', index=False)

# Сохраняем JSON
(ART_DIR / 'metrics_summary.json').write_text(json.dumps(metrics_summary, ensure_ascii=False, indent=2), encoding='utf-8')
(ART_DIR / 'best_configs.json').write_text(json.dumps(best_configs, ensure_ascii=False, indent=2), encoding='utf-8')

print('Done!')
print('Artifacts:', list(ART_DIR.iterdir()))
print('Labels:', list(LABELS_DIR.iterdir()))

Done!
Artifacts: [PosixPath('artifacts/labels'), PosixPath('artifacts/best_configs.json'), PosixPath('artifacts/figures'), PosixPath('artifacts/labels_S07-hw-dataset-01.csv'), PosixPath('artifacts/labels_S07-hw-dataset-02.csv'), PosixPath('artifacts/labels_S07-hw-dataset-03.csv'), PosixPath('artifacts/metrics_summary.json')]
Labels: [PosixPath('artifacts/labels/labels_S07-hw-dataset-01.csv'), PosixPath('artifacts/labels/labels_S07-hw-dataset-02.csv'), PosixPath('artifacts/labels/labels_S07-hw-dataset-03.csv')]


## Итоговая таблица по лучшим решениям

In [4]:
rows = []
for ds_key, cfg in best_configs.items():
    m = cfg['metrics']
    rows.append({
        'dataset': ds_key,
        'method': cfg['chosen_method'],
        'params': cfg['params'],
        'silhouette': m['silhouette'],
        'davies_bouldin': m['davies_bouldin'],
        'calinski_harabasz': m['calinski_harabasz'],
        'noise_fraction': m['noise_fraction'],
        'n_clusters_eval': m['n_clusters_eval'],
    })

summary_df = pd.DataFrame(rows).sort_values('dataset')
summary_df

Unnamed: 0,dataset,method,params,silhouette,davies_bouldin,calinski_harabasz,noise_fraction,n_clusters_eval
0,S07-hw-dataset-01,KMeans,"{'k': 2, 'random_state': 42, 'n_init': 20}",0.52164,0.68533,11786.954623,0.0,2
1,S07-hw-dataset-02,DBSCAN,"{'eps': 0.096, 'min_samples': 12}",0.743035,0.354264,427.534217,0.994875,3
2,S07-hw-dataset-03,KMeans,"{'k': 3, 'random_state': 42, 'n_init': 20}",0.315545,1.157726,6957.16264,0.0,3
