In [142]:
import numpy as np
import pandas as pd

In [147]:
# Define the KNN class
class KNN:
    def __init__(self, k=3, distance_metric='euclidean'):
        self.k = k
        self.distance_metric = distance_metric

    def fit(self, X, y):
        # TODO: Implement the fit method
        # basically take the input data that it's gonna train with and fit it/store it to the object
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        dist = self.compute_distance(X, self.X_train)
        neighbors = np.argsort(dist)[:, :self.k]
        neighbor_distances = np.take_along_axis(dist, neighbors, axis=1)
        knn_labels = self.y_train[neighbors]

    
        return np.array([self.classify(labels, distances) for labels, distances in zip(knn_labels, neighbor_distances)])

    def classify(self, labels, distances):
        # Returns the most common element in a list
        # Calculate weights based on distance (w = 1 / d^2)
        weights = 1 / (np.array(distances) ** 2)

        total_weight = np.sum(weights)
        weight1 = np.sum([weight for label, weight, in zip(labels, weights) if label == 1])
        return weight1 / total_weight

    
    def compute_distance(self, X1, X2):
        # TODO: Implement distance computation based on self.distance_metric
        # Hint: Use numpy operations for efficient computation
        if self.distance_metric == 'euclidean':
            return self.euclidean(X1, X2)
        elif self.distance_metric == 'minkowksi':
            return self.minkowski(X1, X2)
        elif self.distance_metric == 'cosine':
            return self.cosine(X1, X2)
        elif self.distance_metric == 'manhattan':
            return self.manhattan(X1, X2)
        elif self.distance_metric == 'chebyshev':
            return self.chebyshev(X1, X2)
        else:
            return ValueError("unknown distance metric")

    def euclidean(self, X1, X2):
        # euclidean
        distance = np.sqrt(np.sum((X1[:, np.newaxis, :] - X2[np.newaxis, :, :]) ** 2, axis=2))
        return distance
    
    def minkowski(self, X1, X2):
        p = 2 #??
        X1_expanded = X1[:, np.newaxis, :]
        X2_expanded = X2[np.newaxis, :, :]
        distance = np.power(np.sum(np.power(np.abs(X1_expanded - X2_expanded), p)), 1/p)
        return distance
    
    def cosine(self, X1, X2):
    # Compute dot product for each pair of points
        dot_product = np.sum(X1 * X2.T, axis=1)
        norm_X1 = np.linalg.norm(X1, axis=1)
        norm_X2 = np.linalg.norm(X2)
        return 1 - dot_product / (norm_X1 * norm_X2)

    def manhattan(self, X1, X2):
        X1_expanded = X1[:, np.newaxis, :]
        X2_expanded = X2[np.newaxis, :, :]
        distance = np.sum(np.abs(X1_expanded - X2_expanded), axis = 2)
        return distance

    def chebyshev(self, X1, X2):
        X1_expanded = X1[:, np.newaxis, :]
        X2_expanded = X2[np.newaxis, :, :]
        distance = np.max(np.abs(X1_expanded - X2_expanded))
        return distance


In [144]:


# Define data preprocessing function
def preprocess_data(train_path, test_path):
    # Load the datasets
    train_data = pd.read_csv(train_path)
    test_data = pd.read_csv(test_path)

    # TODO: Implement data preprocessing
    # Handle categorical variables, scale features, etc.
    train_data = train_data.drop(columns=['id', 'CustomerId', 'Surname'])
    X = train_data.drop('Exited', axis=1)
    y = train_data["Exited"]
    X_test = test_data.drop(columns=['id', 'CustomerId', 'Surname'])

    categorical_cols = ['Geography', 'Gender']
    numerical_cols = ['CreditScore', 'Age', 'Tenure', 'NumOfProducts', 'Balance', 'EstimatedSalary']

    for col in numerical_cols:
        X.fillna({col: X[col].mean()}, inplace=True)
        X_test.fillna({col: X_test[col].mean()}, inplace=True)

    for col in categorical_cols:
        X.fillna({col: X[col].mode()[0]}, inplace=True)
        X_test.fillna({col: X_test[col].mode()[0]}, inplace=True)

        X = pd.get_dummies(X, columns=[col], drop_first=True, dtype=float)
        X_test = pd.get_dummies(X_test, columns=[col], drop_first=True, dtype=float)

    X[numerical_cols] = (X[numerical_cols] - X[numerical_cols].mean()) / X[numerical_cols].std()
    X_test[numerical_cols] = (X_test[numerical_cols] - X_test[numerical_cols].mean()) / X_test[numerical_cols].std()

    X = X.to_numpy()
    y = y.to_numpy()
    X_test = X_test.to_numpy()

    return X, y, X_test

In [145]:

# Define cross-validation function
def cross_validate(X, y, knn, n_splits=5):
    # TODO: Implement cross-validation
    # Compute ROC AUC scores
    indices = np.arange(len(X))
    np.random.shuffle(indices)

    fold_size = len(X) // n_splits
    folds = [indices[i * fold_size:(i + 1) * fold_size] for i in range(n_splits)]

    auc_scores = []

    for i in range(n_splits):
        val_indices = folds[i]
        train_indices = np.concatenate([folds[j] for j in range(n_splits) if j != i])

        X_train, X_val = X[train_indices], X[val_indices]
        y_train, y_val = y[train_indices], y[val_indices]

        knn.fit(X_train, y_train)

        distances = knn.compute_distance(X_val, X_train)

        knn_indices = np.argsort(distances, axis=1)[:, :knn.k]

        k_nearest_labels = y_train[knn_indices]
        k_nearest_distances = np.take_along_axis(distances, knn_indices, axis=1)
        predictions = np.array([knn.classify(labels, dists) for labels, dists in zip(k_nearest_labels, k_nearest_distances)])

        min_distances = np.min(distances[np.arange(distances.shape[0])[:, np.newaxis], knn_indices], axis=1)

        scores = 1 / (min_distances + 1e-8)

        auc = calculate_roc_auc(y_val, scores)
        auc_scores.append(auc)

    return np.mean(auc_scores)

def calculate_roc_auc(y_true, y_scores):
    sorted_indices = np.argsort(y_scores)
    y_true_sorted = y_true[sorted_indices]

    tps = np.cumsum(y_true_sorted)
    fps = np.arange(1, len(y_true_sorted) + 1) - tps

    tpr = tps / tps[-1]
    fpr = fps / fps[-1]

    return np.trapz(tpr, fpr)

In [151]:
# Load and preprocess data
X, y, X_test = preprocess_data('./cs-506-predicting-customer-churn-using-knn/train.csv', './cs-506-predicting-customer-churn-using-knn/test.csv')

# Create and evaluate model
knn = KNN(k=5, distance_metric='euclidean')

# Perform cross-validation
cv_scores = cross_validate(X, y, knn)

print("Cross-validation scores:", cv_scores)

# TODO: hyperparamters tuning
# is this just by changign which distance method we use??
distance_metrics = ['euclidean', 'cosine', 'manhattan', 'chebyshev', 'minkowski']
k_values = range(1, 31, 10)

hi_score = 0
best_dist = ''
best_k = 0

# not workign so jsut use the optimal k instead for now
# for metric in distance_metrics:
#     #for k in k_values: --> found that this was my best k value so now i jsut plug to save time
#         print(metric)
#         knn = KNN(k = 20, distance_metric = metric)
#         knn.fit(X, y)
#         y_pred = knn.predict(X_test)
#         score = cross_validate(X, y, knn)
#         if score > hi_score:
#             hi_score = score
#             best_dist = metric
#             best_k = 20
#         print(score)
# print(f"Best distance metric: {best_dist}")
# print(f"Best k value: {best_k}")
# print(f"Best accuracy score: {hi_score:.4f}")

# TODO: Train on full dataset with optimal hyperparameters and make predictions on test set
knn = KNN(k = 20, distance_metric = 'euclidean')
knn.fit(X, y)
test_predictions = knn.predict(X_test)

# Save test predictions
pd.DataFrame({'id': pd.read_csv('./cs-506-predicting-customer-churn-using-knn/test.csv')['id'], 'Exited': test_predictions}).to_csv('submissions.csv', index=False)

SyntaxError: invalid syntax (173586180.py, line 44)