# KNNModel

In [1]:
import torch
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, roc_auc_score, r2_score, mean_squared_error
from collections import Counter
import numpy as np
import logging
from typing import Optional, Union, Dict, Any, Tuple

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class KNearestNeighborsModel(BaseEstimator, ClassifierMixin, RegressorMixin):
    """
    A K-Nearest Neighbors (KNN) model implemented in PyTorch, compatible with scikit-learn.
    Supports both classification and regression tasks with GPU acceleration.

    Parameters:
        n_neighbors (int): Number of neighbors to use.
        weights (str): Weight function used in prediction. 'uniform' or 'distance'.
        metric (str): Distance metric to use. Currently supports 'euclidean'.
        p (int): Power parameter for the Minkowski metric. Not used if metric is 'euclidean'.
        algorithm (str): Algorithm used to compute the nearest neighbors. Not implemented; always uses brute force.
        leaf_size (int): Leaf size passed to BallTree or KDTree. Not used; included for compatibility.
        task (str): 'classification' or 'regression'.
        random_state (Optional[int]): Seed for reproducibility. Not used but included for compatibility.
        scaler (Optional[str]): Feature scaling method. 'standard' for StandardScaler or None.
    """

    def __init__(self,
                 n_neighbors: int = 5,
                 weights: str = 'uniform',
                 metric: str = 'euclidean',
                 p: int = 2,
                 algorithm: str = 'auto',
                 leaf_size: int = 30,
                 task: str = 'classification',
                 random_state: Optional[int] = None,
                 scaler: Optional[str] = 'standard'):
        self.n_neighbors = n_neighbors
        self.weights = weights
        self.metric = metric
        self.p = p
        self.algorithm = algorithm
        self.leaf_size = leaf_size
        self.task = task
        self.random_state = random_state
        self.scaler = scaler
        self.is_fitted_ = False
        self.device = 'cpu'  # Default device
        self.scaler_ = None  # To store the scaler
        self.classes_ = None  # To store unique classes for classification

        if self.random_state is not None:
            torch.manual_seed(self.random_state)
            np.random.seed(self.random_state)

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'KNearestNeighborsModel':
        """
        Fit the KNN model using the training data.

        Parameters:
            X (np.ndarray): Training data of shape (n_samples, n_features).
            y (np.ndarray): Target values of shape (n_samples,).

        Returns:
            self
        """
        # Validate input
        X, y = check_X_y(X, y)
        self.n_samples_, self.n_features_ = X.shape

        # Validate task
        self._validate_fit(X, y)

        # Determine device
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        logger.info(f"Using device: {self.device}")

        # Feature Scaling
        if self.scaler == 'standard':
            self.scaler_ = StandardScaler()
            X = self.scaler_.fit_transform(X)
            logger.info("StandardScaler applied to features.")
        elif self.scaler is None:
            pass  # No scaling
        else:
            raise ValueError("Unsupported scaler. Choose 'standard' or None.")

        # For classification, store unique classes
        if self.task == 'classification':
            self.classes_ = np.unique(y)
            logger.info(f"Classes found: {self.classes_}")

        # Store training data as tensors on the appropriate device
        self.X_train_ = torch.tensor(X, dtype=torch.float32).to(self.device)
        if self.task == 'classification':
            self.y_train_ = torch.tensor(y, dtype=torch.long).to(self.device)
        elif self.task == 'regression':
            self.y_train_ = torch.tensor(y, dtype=torch.float32).to(self.device)

        self.is_fitted_ = True
        logger.info("KNN model fitted successfully.")
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict target values for samples in X.

        Parameters:
            X (np.ndarray): Input data of shape (n_queries, n_features).

        Returns:
            np.ndarray: Predicted class labels or regression values.
        """
        check_is_fitted(self, 'is_fitted_')
        X = check_array(X)

        # Feature Scaling
        if self.scaler == 'standard':
            if self.scaler_ is not None:
                X = self.scaler_.transform(X)
            else:
                raise AttributeError("Scaler not found. Ensure the model is fitted with a scaler.")
        elif self.scaler is None:
            pass  # No scaling
        else:
            raise ValueError("Unsupported scaler. Choose 'standard' or None.")

        # Convert to tensor and move to device
        X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)

        # Compute distances between X and X_train_
        with torch.no_grad():
            # Computation of Euclidean distances
            # (x - y)^2 = x^2 + y^2 - 2xy
            X_square = torch.sum(X_tensor ** 2, dim=1, keepdim=True)  # (n_queries, 1)
            X_train_square = torch.sum(self.X_train_ ** 2, dim=1)     # (n_samples,)
            cross_term = torch.mm(X_tensor, self.X_train_.T)          # (n_queries, n_samples)
            distances = torch.sqrt(X_square - 2 * cross_term + X_train_square)  # (n_queries, n_samples)

            # Find the k nearest neighbors
            _, indices = torch.topk(distances, self.n_neighbors, largest=False, sorted=True)  # (n_queries, k)

            # Gather the labels of the nearest neighbors using indexing
            neighbor_labels = self.y_train_[indices]  # (n_queries, k)

            if self.task == 'classification':
                # Majority vote
                y_pred = []
                neighbor_labels_np = neighbor_labels.cpu().numpy()
                for row in neighbor_labels_np:
                    y_pred.append(Counter(row).most_common(1)[0][0])
                return np.array(y_pred)
            elif self.task == 'regression':
                # Simple average
                y_pred = torch.mean(neighbor_labels, dim=1).cpu().numpy()
                return y_pred
            else:
                raise ValueError("Invalid task. Choose 'classification' or 'regression'.")

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        """
        Predict class probabilities for samples in X.

        Parameters:
            X (np.ndarray): Input data of shape (n_queries, n_features).

        Returns:
            np.ndarray: Predicted probabilities of shape (n_queries, n_classes).
        """
        if self.task != 'classification':
            raise AttributeError("predict_proba is only available for classification tasks.")
        check_is_fitted(self, 'is_fitted_')
        if self.classes_ is None:
            raise AttributeError("Classes not found. Ensure that the model is fitted properly.")
        X = check_array(X)

        # Feature Scaling
        if self.scaler == 'standard':
            if self.scaler_ is not None:
                X = self.scaler_.transform(X)
            else:
                raise AttributeError("Scaler not found. Ensure the model is fitted with a scaler.")
        elif self.scaler is None:
            pass  # No scaling
        else:
            raise ValueError("Unsupported scaler. Choose 'standard' or 'None'.")

        # Convert to tensor and move to device
        X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)

        # Compute distances between X and X_train_
        with torch.no_grad():
            X_square = torch.sum(X_tensor ** 2, dim=1, keepdim=True)  # (n_queries, 1)
            X_train_square = torch.sum(self.X_train_ ** 2, dim=1)     # (n_samples,)
            cross_term = torch.mm(X_tensor, self.X_train_.T)          # (n_queries, n_samples)
            distances = torch.sqrt(X_square - 2 * cross_term + X_train_square)  # (n_queries, n_samples)

            # Find the k nearest neighbors
            _, indices = torch.topk(distances, self.n_neighbors, largest=False, sorted=True)  # (n_queries, k)

            # Gather the labels of the nearest neighbors using indexing
            neighbor_labels = self.y_train_[indices]  # (n_queries, k)

            # Gather the distances for weighted voting if needed
            if self.weights == 'distance':
                neighbor_distances = torch.gather(distances, 1, indices)  # (n_queries, k)
                neighbor_distances = neighbor_distances.cpu().numpy()
            else:
                neighbor_distances = None

            neighbor_labels_np = neighbor_labels.cpu().numpy()

        n_samples = X.shape[0]
        n_classes = len(self.classes_)
        proba = np.zeros((n_samples, n_classes))

        for i in range(n_samples):
            labels = neighbor_labels_np[i]
            if self.weights == 'uniform':
                counts = Counter(labels)
                total = self.n_neighbors
                for cls in self.classes_:
                    proba[i, cls] = counts.get(cls, 0) / total
            elif self.weights == 'distance':
                distances_i = neighbor_distances[i]
                # Handle zero distances by assigning a large weight
                distances_i = np.where(distances_i == 0, 1e-10, distances_i)
                weights_i = 1 / distances_i
                counts = Counter()
                for label, wt in zip(labels, weights_i):
                    counts[label] += wt
                total = np.sum(list(counts.values()))
                for cls in self.classes_:
                    proba[i, cls] = counts.get(cls, 0) / total
            else:
                raise ValueError("Unsupported weights. Choose 'uniform' or 'distance'.")

        return proba

    def get_params(self, deep: bool = True) -> Dict[str, Any]:
        """
        Get parameters for this estimator.

        Returns:
            Dict[str, Any]: Parameter names mapped to their values.
        """
        return {
            'n_neighbors': self.n_neighbors,
            'weights': self.weights,
            'metric': self.metric,
            'p': self.p,
            'algorithm': self.algorithm,
            'leaf_size': self.leaf_size,
            'task': self.task,
            'random_state': self.random_state,
            'scaler': self.scaler
        }

    def set_params(self, **params: Any) -> 'KNearestNeighborsModel':
        """
        Set the parameters of this estimator.

        Parameters:
            **params: Estimator parameters.

        Returns:
            self
        """
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        """
        Compute the score of the model.
        For classification: accuracy.
        For regression: R² score.

        Parameters:
            X (np.ndarray): Test samples of shape (n_samples, n_features).
            y (np.ndarray): True labels or target values of shape (n_samples,).

        Returns:
            float: Score.
        """
        X = check_array(X)
        y = np.array(y)
        y_pred = self.predict(X)

        if self.task == 'classification':
            return accuracy_score(y, y_pred) * 100.0  # Percentage
        elif self.task == 'regression':
            return r2_score(y, y_pred)
        else:
            raise ValueError("Invalid task. Choose 'classification' or 'regression'.")

    def _validate_fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
        Validate the fit parameters.

        Parameters:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.

        Raises:
            ValueError: If task is invalid.
        """
        if self.task not in ['classification', 'regression']:
            raise ValueError("Invalid task. Choose 'classification' or 'regression'.")

# Demonstration

In [2]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris, load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, r2_score, mean_squared_error
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
import time


def classification_comparison():
    # Load Iris dataset
    iris = load_iris()
    X, y = iris.data, iris.target

    # Split into train and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    print("\n--- Custom KNearestNeighborsModel ---")
    # Initialize custom KNN
    custom_knn = KNearestNeighborsModel(
        n_neighbors=5,
        weights='uniform',
        metric='euclidean',
        task='classification',
        scaler='standard'
    )

    # Train custom KNN
    start_time = time.time()
    custom_knn.fit(X_train, y_train)
    train_time_custom = time.time() - start_time
    print(f"Training Time: {train_time_custom:.4f} seconds")

    # Predict with custom KNN
    start_time = time.time()
    y_pred_custom = custom_knn.predict(X_test)
    if custom_knn.weights == 'uniform':
        y_pred_proba_custom = custom_knn.predict_proba(X_test)
    else:
        y_pred_proba_custom = custom_knn.predict_proba(X_test)
    predict_time_custom = time.time() - start_time
    accuracy_custom = accuracy_score(y_test, y_pred_custom) * 100.0
    print(f"Prediction Time: {predict_time_custom:.4f} seconds")
    print(f"Accuracy: {accuracy_custom:.2f}%")

    # ROC AUC Score
    roc_auc_custom = roc_auc_score(y_test, y_pred_proba_custom, multi_class='ovr')
    print(f"ROC AUC: {roc_auc_custom:.4f}")

    print("\n--- scikit-learn KNeighborsClassifier ---")
    # Initialize scikit-learn KNN
    sklearn_knn = KNeighborsClassifier(
        n_neighbors=5,
        weights='uniform',
        metric='euclidean',
        p=2,
        algorithm='auto',
        leaf_size=30
    )

    # Train scikit-learn KNN
    start_time = time.time()
    sklearn_knn.fit(X_train, y_train)
    train_time_sklearn = time.time() - start_time
    print(f"Training Time: {train_time_sklearn:.4f} seconds")

    # Predict with scikit-learn KNN
    start_time = time.time()
    y_pred_sklearn = sklearn_knn.predict(X_test)
    y_pred_proba_sklearn = sklearn_knn.predict_proba(X_test)
    predict_time_sklearn = time.time() - start_time
    accuracy_sklearn = accuracy_score(y_test, y_pred_sklearn) * 100.0
    print(f"Prediction Time: {predict_time_sklearn:.4f} seconds")
    print(f"Accuracy: {accuracy_sklearn:.2f}%")

    # ROC AUC Score
    roc_auc_sklearn = roc_auc_score(y_test, y_pred_proba_sklearn, multi_class='ovr')
    print(f"ROC AUC: {roc_auc_sklearn:.4f}")

    return accuracy_custom, roc_auc_custom, accuracy_sklearn, roc_auc_sklearn

def regression_comparison():
    # Load Diabetes dataset
    diabetes = load_diabetes()
    X, y = diabetes.data, diabetes.target

    # Split into train and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    print("\n--- Custom KNearestNeighborsModel ---")
    # Initialize custom KNN
    custom_knn = KNearestNeighborsModel(
        n_neighbors=5,
        weights='uniform',
        metric='euclidean',
        task='regression',
        scaler='standard'
    )

    # Train custom KNN
    start_time = time.time()
    custom_knn.fit(X_train, y_train)
    train_time_custom = time.time() - start_time
    print(f"Training Time: {train_time_custom:.4f} seconds")

    # Predict with custom KNN
    start_time = time.time()
    y_pred_custom = custom_knn.predict(X_test)
    predict_time_custom = time.time() - start_time
    r2_custom = r2_score(y_test, y_pred_custom)
    rmse_custom = np.sqrt(mean_squared_error(y_test, y_pred_custom))
    print(f"Prediction Time: {predict_time_custom:.4f} seconds")
    print(f"R² Score: {r2_custom:.4f}")
    print(f"RMSE: {rmse_custom:.4f}")

    print("\n--- scikit-learn KNeighborsRegressor ---")
    # Initialize scikit-learn KNN
    sklearn_knn = KNeighborsRegressor(
        n_neighbors=5,
        weights='uniform',
        metric='euclidean',
        p=2,
        algorithm='auto',
        leaf_size=30
    )

    # Train scikit-learn KNN
    start_time = time.time()
    sklearn_knn.fit(X_train, y_train)
    train_time_sklearn = time.time() - start_time
    print(f"Training Time: {train_time_sklearn:.4f} seconds")

    # Predict with scikit-learn KNN
    start_time = time.time()
    y_pred_sklearn = sklearn_knn.predict(X_test)
    predict_time_sklearn = time.time() - start_time
    r2_sklearn = r2_score(y_test, y_pred_sklearn)
    rmse_sklearn = np.sqrt(mean_squared_error(y_test, y_pred_sklearn))
    print(f"Prediction Time: {predict_time_sklearn:.4f} seconds")
    print(f"R² Score: {r2_sklearn:.4f}")
    print(f"RMSE: {rmse_sklearn:.4f}")

    return r2_custom, rmse_custom, r2_sklearn, rmse_sklearn

if __name__ == "__main__":
    print("=== Classification Comparison: Custom KNearestNeighborsModel vs. scikit-learn ===")
    accuracy_custom, roc_auc_custom, accuracy_sklearn, roc_auc_sklearn = classification_comparison()

    print("\n=== Regression Comparison: Custom KNearestNeighborsModel vs. scikit-learn ===")
    r2_custom, rmse_custom, r2_sklearn, rmse_sklearn = regression_comparison()

    # Create DataFrame
    metrics_df = pd.DataFrame({
        'Model': ['CUSTOM', 'SKLEARN'],
        'Accuracy': [accuracy_custom, accuracy_sklearn],
        'ROC AUC': [roc_auc_custom, roc_auc_sklearn],
        'R² Score': [r2_custom, r2_sklearn],
        'RMSE': [rmse_custom, rmse_sklearn]
    })
    print("\n")
    display(metrics_df)

=== Classification Comparison: Custom KNearestNeighborsModel vs. scikit-learn ===

--- Custom KNearestNeighborsModel ---
Training Time: 0.2613 seconds
Prediction Time: 0.2146 seconds
Accuracy: 100.00%
ROC AUC: 1.0000

--- scikit-learn KNeighborsClassifier ---
Training Time: 0.0008 seconds
Prediction Time: 0.0040 seconds
Accuracy: 100.00%
ROC AUC: 1.0000

=== Regression Comparison: Custom KNearestNeighborsModel vs. scikit-learn ===

--- Custom KNearestNeighborsModel ---
Training Time: 0.0014 seconds
Prediction Time: 0.0097 seconds
R² Score: 0.3928
RMSE: 57.2515

--- scikit-learn KNeighborsRegressor ---
Training Time: 0.0007 seconds
Prediction Time: 0.0019 seconds
R² Score: 0.4031
RMSE: 56.7637




Unnamed: 0,Model,Accuracy,ROC AUC,R² Score,RMSE
0,CUSTOM,100.0,1.0,0.392821,57.251522
1,SKLEARN,100.0,1.0,0.403124,56.763702
