In [1]:
import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
import os
import pandas as pd
from sklearn.model_selection import KFold

In [2]:
class CustomKNNRegressor:
    def __init__(self, margin=0, n_neighbors=5, custom_func=None):
        self.n_neighbors = n_neighbors
        self.custom_func = custom_func if custom_func is not None else self.getting_best_mu
        self.margin = margin
        self.knn = None
        self.scaler = None

    def fit(self, X, y):
        self.scaler = StandardScaler()  # Normalize features
        X_scaled = self.scaler.fit_transform(X)
        self.X = X_scaled
        self.y = y
        self.knn = NearestNeighbors(n_neighbors=self.n_neighbors)
        self.knn.fit(X_scaled)
        return self

    def predict(self, X_test):
        X_test_scaled = self.scaler.transform(X_test)  # Normalize test data
        distances, indices = self.knn.kneighbors(X_test_scaled)
        predictions = []
        for neighbors in indices:
            neighbor_intervals = self.y[neighbors]
            predictions.append(self.custom_func(neighbor_intervals))
        return np.array(predictions)

    def getting_best_mu(self, intervals):
        # Adjust intervals with the margin
        intervals = intervals + np.array([self.margin, -self.margin])
        
        # Ensure intervals are valid
        mask = intervals[:, 1] < intervals[:, 0]
        intervals[mask] = intervals[mask][:, ::-1]

        # Extract unique endpoints from the intervals
        endpoints = np.unique(intervals[np.isfinite(intervals)])
        y_min, y_max = intervals[:, 0], intervals[:, 1]

        # Calculate the loss when mu is below or above the interval
        lower_loss = np.maximum(0, y_min[:, None] - endpoints)**2  # Loss when mu is below y_min
        upper_loss = np.maximum(0, endpoints - y_max[:, None])**2  # Loss when mu is above y_max
        losses = np.sum(lower_loss + upper_loss, axis=0)

        # Find the mu that minimizes the loss
        min_loss_idx = np.argmin(losses)
        mu = endpoints[min_loss_idx]
        return mu
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def hinge_error(self, y_pred, y_low, y_up):
        y_pred, y_low, y_up = np.array(y_pred), np.array(y_low), np.array(y_up)
        return (
            self.relu(y_low - y_pred + self.margin) ** 2 +
            self.relu(y_pred - y_up + self.margin) ** 2
        )

In [3]:
folder_path = '../../data'
datasets = [name for name in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, name))]

In [4]:
for dataset in datasets:
    folds_df = pd.read_csv(f'../../data/{dataset}/folds.csv')
    features_df = pd.read_csv(f'../../data/{dataset}/features.csv').astype(np.float32)
    target_df = pd.read_csv(f'../../data/{dataset}/targets.csv').astype(np.float32)

    # Process folds sequentially
    predictions = []
    for test_fold in sorted(np.unique(folds_df['fold'])):
        # Split data into training and test sets
        train_indices = folds_df[folds_df['fold'] != test_fold].index
        test_indices = folds_df[folds_df['fold'] == test_fold].index

        # Filter the DataFrames by index
        X_train = features_df.loc[train_indices].values
        X_test = features_df.loc[test_indices].values
        y_train = target_df.loc[train_indices].values

        # Perform 5-fold cross-validation on the training set
        kf = KFold(n_splits=5, shuffle=True, random_state=42)
        fold_errors = {K: [] for K in np.arange(1, int(np.sqrt(X_train.shape[0])))}  # Initialize fold_errors dictionary

        # Loop over all possible values of k (number of neighbors)
        for K in np.arange(1, int(np.sqrt(X_train.shape[0]))):
            # Initialize error accumulation for this value of K
            total_fold_error = 0

            # Train and evaluate within the cross-validation loop
            for train_idx, val_idx in kf.split(X_train):
                X_subtrain, X_val = X_train[train_idx], X_train[val_idx]
                y_subtrain, y_val = y_train[train_idx], y_train[val_idx]

                # Train KNN model with the current value of K
                knn = CustomKNNRegressor(n_neighbors=K)
                knn.fit(X_subtrain, y_subtrain)

                # Predict on validation set
                y_val_pred = knn.predict(X_val)

                # Compute hinge error for validation set
                y_val_low = y_val[:, 0] + knn.margin
                y_val_up = y_val[:, 1] - knn.margin
                hinge_error = np.sum(knn.hinge_error(y_val_pred, y_val_low, y_val_up))

                # Accumulate the fold error for this value of K
                total_fold_error += hinge_error

            # Store the average error for this value of K
            fold_errors[K] = total_fold_error / 5  # Average over 5 folds

        # Select the best K with the smallest average error
        best_K = min(fold_errors, key=fold_errors.get)

        # Train the final model on the entire training set using the best K
        final_knn = CustomKNNRegressor(n_neighbors=best_K)
        final_knn.fit(X_train, y_train)

        # Predict on the test set
        target_mat_pred = final_knn.predict(X_test)
        prediction = pd.DataFrame({'pred': target_mat_pred})
        predictions.append(prediction)

    # Save predictions for each fold
    for test_fold, prediction in zip(sorted(np.unique(folds_df['fold'])), predictions):
        prediction.to_csv(f"predictions/{dataset}.{test_fold}.csv", index=False)