In [46]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import math
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import precision_score, recall_score, f1_score, precision_recall_fscore_support
from sklearn.tree import DecisionTreeClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.preprocessing import StandardScaler

### Part 1: Cleveland Heart Attack Dataset

In [47]:
def load_cleveland_data(file_path):
    data = pd.read_csv(file_path)
    data['disease'] = data.num.apply(lambda x: min(x,1))
    
    for col in data.columns:
        data[col] = pd.to_numeric(data[col], errors='coerce')
        
    data = data.drop('num', axis=1)
    data = data.dropna()

    X = data.drop('disease', axis=1)
    y = data['disease']
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    X_scaled_df = pd.DataFrame(X_scaled, columns=X.columns, index=X.index)
    
    return X_scaled_df.values, y.values, X.columns


In [48]:
class kNN(BaseEstimator, ClassifierMixin):
    def __init__(self, k=3, feature_mask=None):
        self.k = k
        self.feature_mask = feature_mask

    def fit(self, X, y):
        X, y = check_X_y(X, y)
        if self.feature_mask is not None:
            X = X[:, self.feature_mask]
        self.X_ = X
        self.y_ = y
        self.classes_ = np.unique(y)
        return self

    def predict(self, X):
        check_is_fitted(self)
        X = check_array(X)
        if self.feature_mask is not None:
            X = X[:, self.feature_mask]
        return np.array([self._predict(x) for x in X])

    def _predict(self, x):
        distances = [np.sqrt(np.sum((x - x_train) ** 2)) for x_train in self.X_]
        k_indices = np.argsort(distances)[:self.k]
        k_nearest_labels = [self.y_[i] for i in k_indices]
        return np.argmax(np.bincount(k_nearest_labels))

In [49]:
def cross_validate(X, y, k, feature_mask, n_splits=10):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    
    f1_scores = []
    precisions = []
    recalls = []
    
    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]
        
        model = kNN(k=k, feature_mask=feature_mask)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        
        f1_scores.append(f1_score(y_test, y_pred))
        precisions.append(precision_score(y_test, y_pred))
        recalls.append(recall_score(y_test, y_pred))
        
        # print(f"Fold - Precision: {precisions[-1]:.3f}, Recall: {recalls[-1]:.3f}, F1: {f1_scores[-1]:.3f}")
    
    return f1_scores, precisions, recalls

In [50]:
def feature_importance(X, y, feature_names):
    dt = DecisionTreeClassifier(random_state=42)
    dt.fit(X, y)
    importances = dt.feature_importances_
    feature_importance = pd.Series(importances, index=feature_names).sort_values(ascending=False)
    return feature_importance

In [51]:
def plot_feature_separability(ax, X, y, feature_name, feature_names):
    # Get the index of the feature
    feature_index = list(feature_names).index(feature_name)
    
    # Separate the feature values for each class
    feature_values_0 = X[y == 0, feature_index]
    feature_values_1 = X[y == 1, feature_index]
    
    # Plot histograms
    sns.histplot(feature_values_0, kde=True, color='blue', alpha=0.5, label='No Heart Disease', ax=ax)
    sns.histplot(feature_values_1, kde=True, color='red', alpha=0.5, label='Heart Disease', ax=ax)
    
    ax.set_title(f'Distribution of {feature_name}')
    ax.set_xlabel(feature_name)
    ax.set_ylabel('Density')
    ax.legend()

def plot_multiple_features(X, y, feature_names, selected_features, save_dir='plots', filename='feature_distributions.png'):
    # Calculate grid dimensions
    n_features = len(selected_features)
    n_cols = min(3, n_features)  # Max 3 columns
    n_rows = math.ceil(n_features / n_cols)
    
    # Create subplots
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(5*n_cols, 4*n_rows))
    fig.suptitle('Feature Distributions for Each Class', fontsize=16)
    
    # Flatten axes
    axes = axes.flatten() if n_features > 1 else [axes]
    
    for i, feature in enumerate(selected_features):
        plot_feature_separability(axes[i], X, y, feature, feature_names)
    
    # Remove any unused subplots
    for j in range(i+1, len(axes)):
        fig.delaxes(axes[j])
    
    plt.tight_layout()
    
    os.makedirs(save_dir, exist_ok=True)
    
    # Save the plot
    full_path = os.path.join(save_dir, filename)
    plt.savefig(full_path, dpi=300, bbox_inches='tight')
    plt.show()
    plt.close()

In [52]:
def elbow(X, y, k_range, feature_mask, save_dir='plots', filename='elbow_plot.png'):
    # Apply the feature mask
    X_masked = X[:, feature_mask]

    X_train, X_test, y_train, y_test = train_test_split(X_masked, y, test_size=0.2, random_state=42)

    scores = []
    for k in k_range:
        knn = kNN(k=k)
        knn.fit(X_train, y_train)
        y_pred = knn.predict(X_test)
        
        precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted')
        scores.append((precision, recall, f1))

    precisions, recalls, f1_scores = zip(*scores)

    plt.figure(figsize=(10, 6))
    plt.plot(k_range, f1_scores)
    plt.xlabel('k')
    plt.ylabel('F1 Score')
    plt.title('Elbow Plot: F1 Score vs k')
    plt.grid(True)

    plt.tight_layout()
    
    # Create the save directory if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)
    
    # Save the plot
    full_path = os.path.join(save_dir, filename)
    plt.savefig(full_path)
    plt.show()
    plt.close()
    

In [None]:
# Load the data
X, y, feature_names = load_cleveland_data('data/cleveland.csv')

importance_results = feature_importance(X, y, feature_names)
print("Feature Importance:")
print(importance_results)

In [None]:
selected_features = ['thal', 'cp', 'ca', 'thalach', 'age']

# Examine the separability of the selected features
plot_multiple_features(X, y, feature_names, selected_features, 'plots/part1')

# Create feature mask
feature_mask = np.isin(feature_names, selected_features)

In [None]:
elbow(X, y, range(1, 250), feature_mask, 'plots/part1', 'elbow_plot.png')

In [None]:
# Set k based on our elbow plot
k = 150

# Perform cross-validation
f1_scores, precisions, recalls = cross_validate(X, y, k, feature_mask)

# Print results
print(f"Selected features: {selected_features}")
print(f"k value: {k}")
print("\nCross-validation results:")
for i in range(10):
    print(f"Fold {i+1}:")
    print(f"  Precision: {precisions[i]:.3f}")
    print(f"  Recall: {recalls[i]:.3f}")
    print(f"  F1 Score: {f1_scores[i]:.3f}")

print("\nMean scores:")
print(f"Precision: {np.mean(precisions):.3f} (+/- {np.std(precisions):.3f})")
print(f"Recall: {np.mean(recalls):.3f} (+/- {np.std(recalls):.3f})")
print(f"F1 Score: {np.mean(f1_scores):.3f} (+/- {np.std(f1_scores):.3f})")

# Train a model for the test set
final_model = kNN(k=k, feature_mask=feature_mask)
final_model.fit(X, y)

In [None]:
# Function to predict on challenge dataset
def predict_challenge(challenge_file, model):
    X_challenge, y_challenge, _ = load_cleveland_data(challenge_file)
    y_pred = model.predict(X_challenge)
    return f1_score(y_challenge, y_pred), precision_score(y_challenge, y_pred), recall_score(y_challenge, y_pred)

# Predict on the sample test dataset
challenge = predict_challenge('data/cleveland-test-sample.csv', final_model)
print(f"\nF1 Score on challenge dataset: {challenge[0]:.3f}")
print(f"Precision on challenge dataset: {challenge[1]:.3f}")
print(f"Recall on challenge dataset: {challenge[2]:.3f}")

### Part 2: CDC BRFSS 2015 Diabetes Dataset

In [58]:
def load_diabetes_data(file_path):
    data = pd.read_csv(file_path)
    
    for col in data.columns:
        data[col] = pd.to_numeric(data[col], errors='coerce')
        
    data = data.dropna()

    X = data.drop('Diabetes_binary', axis=1)
    y = data['Diabetes_binary']
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    X_scaled_df = pd.DataFrame(X_scaled, columns=X.columns, index=X.index)
    
    return X_scaled_df.values, y.values, X.columns

In [None]:
X, y, feature_names = load_diabetes_data('data/diabetes_sample.csv')
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

importance_results = feature_importance(X_train, y_train, feature_names)
print("Feature Importance:")
print(importance_results)

In [None]:
selected_features = ['BMI', 'GenHlth', 'PhysHlth', 'Age', 'HighBP', 'Sex', 'HvyAlcoholConsump', 'HighChol', 'Smoker', 'CholCheck']

plot_multiple_features(X, y, feature_names, selected_features, 'plots/part2')

# Create feature mask
feature_mask = np.isin(feature_names, selected_features)

In [None]:
elbow(X, y, range(1, 250), feature_mask, 'plots/part2', 'elbow_plot.png')

In [None]:
# Set k based on our elbow plot
k = 5

# Perform cross-validation
f1_scores, precisions, recalls = cross_validate(X, y, k, feature_mask)

# Print results
print(f"Selected features: {selected_features}")
print(f"k value: {k}")
print("\nCross-validation results:")
for i in range(10):
    print(f"Fold {i+1}:")
    print(f"  Precision: {precisions[i]:.3f}")
    print(f"  Recall: {recalls[i]:.3f}")
    print(f"  F1 Score: {f1_scores[i]:.3f}")

print("\nMean scores:")
print(f"Precision: {np.mean(precisions):.3f} (+/- {np.std(precisions):.3f})")
print(f"Recall: {np.mean(recalls):.3f} (+/- {np.std(recalls):.3f})")
print(f"F1 Score: {np.mean(f1_scores):.3f} (+/- {np.std(f1_scores):.3f})")

# Train a model for the test set
final_model = kNN(k=k, feature_mask=feature_mask)
final_model.fit(X, y)

In [None]:
final_predictions = final_model.predict(X_test)
test_f1 = f1_score(y_test, final_predictions)
test_precision = precision_score(y_test, final_predictions)
test_recall = recall_score(y_test, final_predictions)

print("\nFinal model performance on test set:")
print(f"F1 Score: {test_f1:.3f}")
print(f"Precision: {test_precision:.3f}")
print(f"Recall: {test_recall:.3f}")