<a href="https://colab.research.google.com/github/sdbrgo/PERCEUL/blob/umap-hdbscan/PERCEUL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Libraries

In [None]:
!pip install hdbscan

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import joblib
import json

# pipeline
from sklearn.pipeline import Pipeline

# preprocesing
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.base import BaseEstimator, TransformerMixin # used to define NumericSelector(), which is used in preprocessing

# dimensionality reduction
from sklearn.decomposition import PCA
from umap import UMAP

# cluster validation
from sklearn.metrics import silhouette_score

# clustering
from sklearn.cluster import KMeans
import hdbscan

# Mount GDrive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Import Dataset

In [None]:
ds_name = ""
df = pd.read_csv(ds_name)

# Preprocessing

In [None]:
#=====================================================================
# a custom and dynamic function for selecting numeric columns only.
# will be used to make the pipeline
class NumericSelector(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        self.numeric_cols_ = X.select_dtypes(include=[float, int]).columns
        return self

    def transform(self, X):
        X_num = X[self.numeric_cols_]
        return X_num
#=====================================================================

si = SimpleImputer(strategy='median')
df_i = si.fit_transform(df)
df_i = pd.DataFrame(df_i, columns=df.columns)

ss = StandardScaler()
df_i_ss = ss.fit_transform(df_i)
df_p1 = pd.DataFrame(df_i_ss, columns=df.columns) # will undergo cluster exploration
df_p2 = df_p1.copy() # will undergo final clustering

# Dimensionality Reduction 1

In [None]:
umap_model = UMAP(n_components=2, random_state=42)
df_p1 = umap_model.fit_transform(df_p1)

# Cluster Exploration


In [None]:
clusterer1 = hdbscan.HDBSCAN(min_cluster_size=5, gen_min_span_tree=True)
clusterer1.fit(df_p1)

plt.figure(figsize=(10,8))
plt.scatter(df_p1.iloc[:, 0], df_p1.iloc[:, 1], c=clusterer1.labels_, cmap='Paired')
plt.title('HDBSCAN Clustering')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

# Dimensionality Reduction 2

In [None]:
pca = PCA(n_components=2)
df_p2 = pca.fit_transform(df_p2)

# Final Clustering

In [None]:
#============================================================
# function to identify the best `k` value.
# put this inside `cluster_utils.py`
def choose_k(X_pca, k_range=(2, 12)):
    best_k = 2
    best_score = -1

    for k in range(k_range[0], k_range[1]):
        km = KMeans(n_clusters=k, random_state=42)
        labels = km.fit_predict(X_pca)
        score = silhouette_score(X_pca, labels)

        if score > best_score:
            best_score = score
            best_k = k

    return best_k
#============================================================

best_k = choose_k(df_p2) # pre-defined function

km_final = Kmeans(n_clusters=best_k, random_state=42)
labels = km_final.fit_predict(df_p2)

plt.figure(figsize=(10, 8))
plt.scatter(df_p2[:, 0], df_p2[:, 1], c=labels, s=15)
plt.title('K-Means Clustering')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.tight_layout()

# Cluster Analysis

In [None]:
#============================================================
# place these functions in `cluster_utils.py` file
def compute_cluster_stats(X_processed, labels, feature_names):
    df = pd.DataFrame(X_processed, columns=feature_names)
    df['cluster'] = labels

    stats = {}

    for cluster_id in sorted(df['cluster'].unique()):
        cluster_data = df[df['cluster'] == cluster_id].drop(columns=['cluster'])

        stats[cluster_id] = {
            "count": len(cluster_data),
            "mean": cluster_data.mean().to_dict(),
            "median": cluster_data.median().to_dict(),
            "std": cluster_data.std().to_dict(),
            "min": cluster_data.min().to_dict(),
            "max": cluster_data.max().to_dict(),
            "range": (cluster_data.max() - cluster_data.min()).to_dict(),
        }

    return stats


def identify_extreme_features(X_processed, labels, feature_names, threshold=1.0):
    df = pd.DataFrame(X_processed, columns=feature_names)
    df['cluster'] = labels

    global_mean = df.drop(columns=['cluster']).mean()
    global_std = df.drop(columns=['cluster']).std()

    extremes = {}

    for cluster_id in sorted(df['cluster'].unique()):
        cluster_mean = df[df['cluster'] == cluster_id].drop(columns=['cluster']).mean()

        z_scores = ((cluster_mean - global_mean) / global_std).abs()

        extreme_features = z_scores[z_scores > threshold].sort_values(ascending=False)

        extremes[cluster_id] = {
            "features": extreme_features.index.tolist(),
            "z_scores": extreme_features.to_dict()
        }

    return extremes
#============================================================

cluster_stats = compute_cluster_stats(df_p2, labels, df_p2.columns)
extreme_features = identify_extreme_features(df_p2, labels, df_p2.columns)

# show cluster stats
for cluster_id in cluster_stats.keys():
    print(f"Cluster {cluster_id} Summary")

    print("\nExtreme Features:")
    print(json.dumps(extreme_features[cluster_id], indent=4))

    print("\nStatistics:")
    print(json.dumps(cluster_stats[cluster_id], indent=4))

    print("\n---\n")

# Creating the PERCEUL Pipeline

In [None]:
# pipeline for Cluster Exploration
exploration_pipeline = Pipeline(steps = [
    ('numeric_selector', NumericSelector()),
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
    ('umap_model', UMAP(n_components=2, random_state=42))
])

# pipeline for Final Clustering (Production)
core_pipeline = Pipeline(steps = [
    ('numeric_selector', NumericSelector()),
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2))
])

# Saving the Pipeline

In [None]:
joblib.dump(exploration_pipeline, 'exploration_pipeline.pkl')
joblib.dump(core_pipeline, 'core_pipeline.pkl')