In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold
from scipy.stats import mode
from sklearn.metrics import roc_auc_score
from collections import Counter

In [2]:
# Define the KNN class
class KNN:
    def __init__(self, k=3, distance_metric='euclidean'):
        self.k = k
        self.distance_metric = distance_metric

    def fit(self, X, y):
        # TODO: Implement the fit method
        self.X_train = X
        self.y_train = y
        return self

    def predict(self, X):
        # TODO: Implement the predict method
        # 一次性计算所有测试点与所有训练点的距离
        predictions = []
        
        # Compute distance for each test point in X
        for x in X:
            distances = self.compute_distances(x, self.X_train)
            k_indices = np.argpartition(distances, self.k)[:self.k]  # Partial sort for top-k neighbors
            k_nearest_labels = [self.y_train[i] for i in k_indices]
            
            # Voting for classification
            most_common = Counter(k_nearest_labels).most_common(1)
            predictions.append(most_common[0][0])
        
        return np.array(predictions)

    def compute_distance(self, X1, X2):
        # TODO: Implement distance computation based on self.distance_metric
        # Hint: Use numpy operations for efficient computation
        if self.distance_metric == 'euclidean':
            # Efficient Euclidean distance calculation
            return np.sqrt(np.sum((X_train - x) ** 2, axis=1))
        elif self.distance_metric == 'manhattan':
            # Efficient Manhattan distance calculation
            return np.sum(np.abs(X_train - x), axis=1)
        else:
            raise ValueError(f"Unsupported distance metric: {self.distance_metric}")

In [3]:
X_train = np.array([[1, 2], [2, 3], [3, 4]])
y_train = np.array([0, 1, 0])

X_test = np.array([[1, 2], [3, 3]])

# 创建KNN模型
knn = KNN(k=2, distance_metric='euclidean')
knn.fit(X_train, y_train)

# 预测新数据
predictions = knn.predict(X_test)
print(predictions)

knn = KNN(k=5, distance_metric='euclidean')
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)
print(predictions)

[0 1]
[0 0]


In [4]:
# Define data preprocessing function
def preprocess_data(train_path, test_path):
    train_data = pd.read_csv(train_path)
    test_data = pd.read_csv(test_path)

    # TODO: Implement data preprocessing
    # Handle categorical variables, scale features, etc.
    y = train_data['Exited'].values
    X = train_data.drop(['Exited', 'id'], axis=1)
    X_test = test_data.drop(['id'], axis=1)
    # 分离特征和标签
    categorical_cols = X.select_dtypes(include=['object']).columns
    
    # 使用pandas的get_dummies替代LabelEncoder
    X_encoded = pd.get_dummies(X, columns=categorical_cols)
    X_test_encoded = pd.get_dummies(X_test, columns=categorical_cols)
    
    # 确保训练集和测试集有相同的列
    missing_cols = set(X_encoded.columns) - set(X_test_encoded.columns)
    for col in missing_cols:
        X_test_encoded[col] = 0
    X_test_encoded = X_test_encoded[X_encoded.columns]
    
    # 特征缩放
    scaler = StandardScaler()
    X = scaler.fit_transform(X_encoded)
    X_test = scaler.transform(X_test_encoded)
    
    return X, y, X_test

In [5]:
# Define cross-validation function
def cross_validate(X, y, knn, n_splits=5):
    # TODO: Implement cross-validation
    # Compute ROC AUC scores
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    scores = []
    
    for train_idx, val_idx in skf.split(X, y):
        # 分割数据
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]
        
        # 训练模型
        knn.fit(X_train, y_train)
        
        # 预测并计算ROC AUC分数
        y_pred = knn.predict(X_val)
        score = roc_auc_score(y_val, y_pred)
        scores.append(score)
    
    return np.array(scores)

In [None]:
# Load and preprocess data
X, y, X_test = preprocess_data('train.csv', 'test.csv')

# Create and evaluate model
knn = KNN(k=5, distance_metric='euclidean')

# Perform cross-validation
cv_scores = cross_validate(X, y, knn)

print("Cross-validation scores:", cv_scores)

# TODO: hyperparamters tuning
def tune_hyperparameters(X, y, k_values=None, metrics=None):
    """
    超参数调优
    """
    if k_values is None:
        k_values = [3, 5, 7, 9, 11]
        
    best_score = 0
    best_k = 0
    
    
    for k in k_values:
        print(f"Testing k={k}")
        knn = KNN(k=k)
        scores = cross_validate(X, y, knn)
        mean_score = scores.mean()
        
        print(f"Mean score: {mean_score:.4f}")
        
        if mean_score > best_score:
            best_score = mean_score
            best_k = k
    
    print(f"\nBest k found: {best_k}")
    print(f"Best score: {best_score:.4f}")
    return best_k, best_score


# TODO: Train on full dataset with optimal hyperparameters and make predictions on test set
best_params, best_score = tune_hyperparameters(X, y)
knn = KNN(k=best_k)
knn.fit(X, y)
test_predictions = knn.predict(X_test)

# Save test predictions
pd.DataFrame({'id': pd.read_csv('test.csv')['id'], 'Exited': test_predictions}).to_csv('submissions.csv', index=False)