# Libraries

In [1]:
import numpy as np
from scipy.io import arff
from sklearn import preprocessing as prepro
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
from sklearn.model_selection import train_test_split
from collections import Counter
from sklearn.metrics import f1_score
from sklearn.model_selection import KFold

# Dataset

https://openml.org/search?type=data&sort=runs&status=active&id=37

In [2]:
!wget https://api.openml.org/data/v1/download/37/diabetes.arff

--2024-11-04 13:06:06--  https://api.openml.org/data/v1/download/37/diabetes.arff
Resolving api.openml.org (api.openml.org)... 131.155.11.11
Connecting to api.openml.org (api.openml.org)|131.155.11.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 37419 (37K) [text/plain]
Saving to: ‘diabetes.arff’


2024-11-04 13:06:07 (360 KB/s) - ‘diabetes.arff’ saved [37419/37419]



In [3]:
data = arff.loadarff('diabetes.arff')
df = pd.DataFrame(data[0])
print(df.head())

   preg   plas  pres  skin   insu  mass   pedi   age               class
0   6.0  148.0  72.0  35.0    0.0  33.6  0.627  50.0  b'tested_positive'
1   1.0   85.0  66.0  29.0    0.0  26.6  0.351  31.0  b'tested_negative'
2   8.0  183.0  64.0   0.0    0.0  23.3  0.672  32.0  b'tested_positive'
3   1.0   89.0  66.0  23.0   94.0  28.1  0.167  21.0  b'tested_negative'
4   0.0  137.0  40.0  35.0  168.0  43.1  2.288  33.0  b'tested_positive'


In [4]:
df['class'].replace([b'tested_negative',b'tested_positive'], [0,1], inplace=True)
print(df.head())

   preg   plas  pres  skin   insu  mass   pedi   age  class
0   6.0  148.0  72.0  35.0    0.0  33.6  0.627  50.0      1
1   1.0   85.0  66.0  29.0    0.0  26.6  0.351  31.0      0
2   8.0  183.0  64.0   0.0    0.0  23.3  0.672  32.0      1
3   1.0   89.0  66.0  23.0   94.0  28.1  0.167  21.0      0
4   0.0  137.0  40.0  35.0  168.0  43.1  2.288  33.0      1


In [5]:
df.isnull().sum()

Unnamed: 0,0
preg,0
plas,0
pres,0
skin,0
insu,0
mass,0
pedi,0
age,0
class,0


In [6]:
scaler = prepro.MinMaxScaler().fit(df)
normalized_attr = scaler.transform(df)
normalized_df = pd.DataFrame(normalized_attr, columns=df.columns)
print(normalized_df.head())

       preg      plas      pres      skin      insu      mass      pedi  \
0  0.352941  0.743719  0.590164  0.353535  0.000000  0.500745  0.234415   
1  0.058824  0.427136  0.540984  0.292929  0.000000  0.396423  0.116567   
2  0.470588  0.919598  0.524590  0.000000  0.000000  0.347243  0.253629   
3  0.058824  0.447236  0.540984  0.232323  0.111111  0.418778  0.038002   
4  0.000000  0.688442  0.327869  0.353535  0.198582  0.642325  0.943638   

        age  class  
0  0.483333    1.0  
1  0.166667    0.0  
2  0.183333    1.0  
3  0.000000    0.0  
4  0.200000    1.0  


In [71]:
#normalized_df.plot(kind='box', subplots=True, layout=(3,3), sharex=False, sharey=False)
#plt.show()

In [70]:
#normalized_df.hist()
#plt.show()

# KNN

In [9]:
# kernel functions
def uniform(d, h):
    return np.where(np.abs(d / h)<= 0.5, 1, 0)

def triangular(d, h):
    return np.where(np.abs(d) <= h, 1 - np.abs(d / h), 0)

def epanechnikov(d, h):
    return np.where(np.abs(d) <= h, 3 / 4 * (1 - (d / h) ** 2), 0)

def gaussian(d, h):
    return 1/np.sqrt(2 * np.pi)*np.exp(-0.5*(d / h)**2)

In [10]:
class KNN:
    def __init__(self, k=3, kernel=uniform, window='fixed'):
        self.k = k
        self.kernel = kernel
        self.window = window

    def fit(self, X, y):
        self.X_train = np.asarray(X)
        self.y_train = np.asarray(y)

    def predict(self, X):
        X = np.asarray(X)
        predictions = []

        for x in X:
            # Calculate distances
            distances = np.linalg.norm(self.X_train - x, axis=1)

            # window size
            if self.window == 'fixed': # the window size is constant for all points.
                size = np.mean(distances)
            else: # the window size can change depending on the local data distribution.
                size = np.partition(distances, self.k)[self.k]  # k-th smallest distance

            # Apply kernel weights
            weights = self.kernel(distances, size)

            # Get the indices of the k nearest neighbors
            k_indices = np.argsort(distances)[:self.k]
            k_weights = weights[k_indices]
            # Extract the labels of the k nearest neighbors
            k_labels = self.y_train[k_indices]
            k_labels = np.array(k_labels).flatten()
            # Weighted voting (most common class label)
            label_sum = np.bincount(k_labels, weights=k_weights, minlength=2)
            predicted_label = np.argmax(label_sum)
            predictions.append(predicted_label)

        return np.array(predictions)

Test

In [23]:
train, test = train_test_split(normalized_df,test_size=0.3,random_state=0,stratify=normalized_df['class'])

train_X = train[train.columns[:8]]
train_Y = train[train.columns[8:]]

test_X = test[test.columns[:8]]
test_Y = test[test.columns[8:]]

X = normalized_df[normalized_df.columns[:8]]
Y = normalized_df['class']

train_X = train_X.to_numpy().astype(float)
train_Y = train_Y.to_numpy().astype(int).flatten()
test_X = test_X.to_numpy().astype(float)
test_Y = test_Y.to_numpy().astype(int).flatten()
X = X.to_numpy().astype(float)
Y = Y.to_numpy().astype(int)

knn = KNN()
knn.fit(train_X, train_Y)

predictions = knn.predict(test_X)

print("Predictions:", predictions)

Predictions: [0 0 0 1 0 0 1 0 0 0 1 0 1 0 1 0 0 0 1 1 0 0 1 0 1 0 0 0 1 1 1 1 0 0 0 1 0
 0 0 0 0 0 1 1 1 1 1 1 0 0 0 0 0 0 1 0 0 1 0 0 0 0 1 0 0 1 1 0 0 0 0 0 0 1
 1 0 0 0 1 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 1 1 1 0 1 1 0 1 0 0
 0 0 0 0 1 0 1 0 0 0 0 0 1 1 0 0 1 0 0 1 0 1 1 0 0 0 0 1 0 0 0 0 1 0 0 1 1
 1 0 1 1 0 0 0 1 0 1 1 0 1 1 1 0 1 0 0 0 0 0 0 0 1 0 0 0 1 1 1 1 1 0 0 1 0
 0 0 0 0 1 0 0 0 0 1 0 0 1 0 0 0 0 0 1 1 0 0 0 0 0 0 1 0 0 0 1 1 0 0 0 0 0
 0 0 0 0 0 0 0 0 0]


Tuning

In [None]:
# Cross-validation function
def cross_validate(model, X, y, folds=6):
    kfold = KFold(n_splits=folds, shuffle=True, random_state=22) # k=folds, split the data into k equal parts
    f1_scores = []
    for train_index, valid_index in kfold.split(X):
      # Splitting Data
        X_train, X_test = X[train_index], X[valid_index]
        y_train, y_test = y[train_index], y[valid_index]

        model.fit(X_train, y_train)
        f1_scores.append(f1_score(y_test, model.predict(X_test)))

    return np.mean(f1_scores)

# hyperparameter_tuning
def hyperparameter_tuning_knn(X, y):
    best_score, best_params, scores = 0, None, []
    kernels = [uniform, triangular, epanechnikov, gaussian]
    windows = ['fixed', 'neighbor']
    k_values = [i for i in range(1,50,2)]

    for kernel in kernels:
        for window in windows:
            for k in k_values:
                model = KNN(k=k, kernel=kernel, window=window)
                score = cross_validate(model, X, y)
                scores.append({'k': k, 'kernel': kernel.__name__, 'window': window, 'f1': float(score)})

                if score > best_score:
                    best_score = score
                    best_params = {'k': k, 'kernel': kernel.__name__, 'window': window}

    return best_params, best_score, scores

In [None]:
best_params, best_score, scores = hyperparameter_tuning_knn(X, Y)

print("Best hyperparameters:", best_params)
print("Best F1 score:", best_score)

Best hyperparameters: {'k': 15, 'kernel': 'triangular', 'window': 'fixed'}
Best F1 score: 0.5995206609860827


# Gradient Descent

Gradient descent is an optimization algorithm used to minimize the cost function in machine learning models. It works by iteratively adjusting the model parameters in the direction of the negative gradient of the cost function.

• Batch Gradient Descent: The entire dataset is used to compute the gradients once per iteration.

• Stochastic Gradient Descent: The model updates its parameters after calculating the gradient for each individual sample.

• Mini-Batch Gradient Descent: The dataset is shuffled at the beginning of each iteration, and gradients are computed using small batches of data.

In [21]:
class GradientDescent:
    def __init__(self, learning_rate=0.01, n_iterations=1000, method='batch', batch_size=32):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.method = method
        self.batch_size = batch_size
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        # Number of samples and features
        n_samples, n_features = X.shape

        # Initialize weights and bias
        self.weights = np.zeros(n_features)
        self.bias = 0

        # Gradient Descent
        for _ in range(self.n_iterations):
            if self.method == 'batch':
                # Calculate the linear model prediction for all data
                y_predicted = np.dot(X, self.weights) + self.bias

                # Compute gradients
                dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y))
                db = (1 / n_samples) * np.sum(y_predicted - y)

                # Update weights and bias
                self.weights -= self.learning_rate * dw
                self.bias -= self.learning_rate * db

            elif self.method == 'stochastic':
                # Update for each sample
                for i in range(n_samples):
                    y_predicted = np.dot(X[i], self.weights) + self.bias

                    # Compute gradients for the current sample
                    dw = (y_predicted - y[i]) * X[i]
                    db = y_predicted - y[i]

                    # Update weights and bias
                    self.weights -= self.learning_rate * dw
                    self.bias -= self.learning_rate * db

            elif self.method == 'mini-batch':
              # Shuffle the data
                indices = np.random.permutation(n_samples)
                for i in range(0, n_samples, self.batch_size):
                    batch_indices = indices[i:i + self.batch_size]
                    X_batch = X[batch_indices]
                    y_batch = y[batch_indices]
                    predictions = X_batch.dot(self.weights)
                    errors = predictions - y_batch.flatten()
                    gradient = (X_batch.T.dot(errors)) / len(y_batch)
                    self.weights -= self.learning_rate * gradient

    def predict(self, X):
        return np.dot(X, self.weights) + self.bias

Test

In [22]:
model = GradientDescent(learning_rate=0.01, n_iterations=100, method='mini-batch', batch_size=2)
model.fit(train_X, train_Y)

# Make predictions
predictions = model.predict(test_X)
predictions = (predictions > 0.5).astype(int)
print("Predictions:", predictions)

Predictions: [0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 1 0 0 1 1 0 0 1 0 0 1 0 0 0 0 1 1 1 0 0 1 0
 0 0 0 0 0 1 1 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1
 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 1 1 0 1 0 0 0 0 0
 0 0 0 0 1 0 0 0 0 0 1 0 1 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0
 0 0 1 1 0 0 0 1 0 0 1 0 1 0 0 0 1 0 1 0 0 1 0 0 0 0 0 0 0 1 1 0 1 0 0 0 0
 0 0 0 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0
 0 0 0 0 1 0 0 1 0]


Tuning

In [None]:
# Cross-validation function
def cross_validate(model, X, y, folds=6):
    kfold = KFold(n_splits=folds, shuffle=True, random_state=22) # k=folds, split the data into k equal parts
    f1_scores = []
    for train_index, valid_index in kfold.split(X):
      # Splitting Data
        X_train, X_test = X[train_index], X[valid_index]
        y_train, y_test = y[train_index], y[valid_index]

        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
        predictions = (predictions > 0.5).astype(int)
        f1_scores.append(f1_score(y_test, predictions))

    return np.mean(f1_scores)

# hyperparameter_tuning
def hyperparameter_tuning_GD(X, y):
    best_score, best_params, scores = 0, None, []
    methods = ['mini-batch', 'stochastic', 'batch']
    learning_rates = [0.01, 0.1]
    batch_sizes = [8, 16, 32]

    for method in methods:
        for batch_size in batch_sizes:
            for learning_rate in learning_rates:
                model = GradientDescent(learning_rate=learning_rate, method=method, batch_size=batch_size)
                score = cross_validate(model, X, y)
                scores.append({'learning_rate': learning_rate, 'method': method, 'batch_size':batch_size, 'f1': float(score)})

                if score > best_score:
                    best_score = score
                    best_params = {'learning_rate': learning_rate, 'method': method, 'batch_size':batch_size}

    return best_params, best_score, scores

In [None]:
best_params, best_score, scores = hyperparameter_tuning_GD(X, Y)

print("Best hyperparameters:", best_params)
print("Best F1 score:", best_score)

Best hyperparameters: {'learning_rate': 0.01, 'method': 'stochastic', 'batch_size': 8}
Best F1 score: 0.6423380716726421


# SVD

In [24]:
class SVD:
    def __init__(self, n_components):
        self.n_components = n_components
        self.U = None  # Left singular vectors
        self.S = None  # Singular values (as a diagonal matrix)
        self.Vt = None  # Right singular vectors (transposed)
        self.coef = None  # Coefficients for the linear classifier

    def fit(self, X, y):
        # Center the data
        X_centered = X - np.mean(X, axis=0)

        # Compute SVD
        self.U, self.S, self.Vt = np.linalg.svd(X_centered, full_matrices=False)

        # Select the top n_components
        self.U = self.U[:, :self.n_components]
        self.S = np.diag(self.S[:self.n_components])
        self.Vt = self.Vt[:self.n_components, :]

        # Fit a linear regression model to the transformed data
        # We'll use the normal equation for linear regression
        self.coef = np.linalg.pinv(self.U) @ y  # Use pseudo-inverse for linear regression

    def predict(self, X):
        X_centered = X - np.mean(X, axis=0)
        U_new = X_centered @ self.Vt.T[:, :self.n_components]

        # Using the linear model to make predictions
        return (U_new @ self.coef > 0).astype(int)

Testing

In [25]:
model = SVD(n_components=20)
model.fit(train_X, train_Y)

# Make predictions
predictions = model.predict(test_X).flatten()
print("Predictions:", predictions)

Predictions: [1 0 0 1 0 0 1 0 0 0 1 0 1 0 1 1 0 0 1 1 0 1 1 0 1 1 0 0 1 1 1 1 0 0 1 1 0
 1 0 1 0 0 1 1 0 1 1 1 1 0 0 0 0 0 1 0 0 1 0 0 1 0 1 0 0 1 1 0 0 0 0 0 0 1
 1 0 1 0 1 0 1 0 0 0 0 0 1 0 0 1 1 1 0 1 1 0 0 0 0 0 1 1 1 1 0 1 1 0 0 0 1
 1 1 0 0 1 0 1 0 0 1 1 1 1 1 1 1 1 0 0 1 0 1 1 0 0 1 1 0 1 0 0 1 0 1 0 1 1
 1 0 1 1 0 1 0 1 1 1 1 0 1 1 1 1 1 0 1 0 1 1 0 0 1 0 0 0 1 1 1 1 1 1 0 1 0
 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0
 0 0 0 1 1 0 0 1 0]


Tuning

In [None]:
# Cross-validation function
def cross_validate(model, X, y, folds=6):
    kfold = KFold(n_splits=folds, shuffle=True, random_state=22) # k=folds, split the data into k equal parts
    f1_scores = []
    for train_index, valid_index in kfold.split(X):
      # Splitting Data
        X_train, X_test = X[train_index], X[valid_index]
        y_train, y_test = y[train_index], y[valid_index]

        model.fit(X_train, y_train)
        predictions = model.predict(X_test).flatten()
        f1_scores.append(f1_score(y_test, predictions))

    return np.mean(f1_scores)

# hyperparameter_tuning
def hyperparameter_tuning_SVD(X, y):
    best_score, best_params, scores = 0, None, []
    components = [i for i in range(1,20)]

    for n_components in components:
        model = SVD(n_components=n_components)
        score = cross_validate(model, X, y)
        scores.append({'n_components': n_components, 'f1': float(score)})

        if score > best_score:
            best_score = score
            best_params = {'n_components': n_components}

    return best_params, best_score, scores

In [None]:
best_params, best_score, scores = hyperparameter_tuning_SVD(X, Y)

print("Best hyperparameters:", best_params)
print("Best F1 score:", best_score)

Best hyperparameters: {'n_components': 8}
Best F1 score: 0.6758723253703632


# AdaBoost

1. Define a Weak Classifier: a decision stump as weak classifier.

2. Implement the AdaBoost Algorithm: iteratively train weak classifiers and adjust their weights based on their performance.

In [68]:
class DecisionStump:
    def fit(self, X, y, weights):
        n_samples, n_features = X.shape
        best_stump = {}
        min_error = float('inf')

        for feature_index in range(n_features):
            thresholds = np.unique(X[:, feature_index])
            for threshold in thresholds:
                for inequality in ['lt', 'gt']:
                    predictions = self.predict(X, feature_index, threshold, inequality)
                    errors = predictions != y
                    weighted_error = np.dot(weights, errors)

                    if weighted_error < min_error:
                        min_error = weighted_error
                        best_stump['feature_index'] = feature_index
                        best_stump['threshold'] = threshold
                        best_stump['inequality'] = inequality

        return best_stump

    def predict(self, X, feature_index, threshold, inequality):
        predictions = np.ones(X.shape[0])
        if inequality == 'lt':
            predictions[X[:, feature_index] <= threshold] = -1
        else:
            predictions[X[:, feature_index] > threshold] = -1
        return predictions

class AdaBoost:
    def __init__(self, n_estimators=50):
        self.n_estimators = n_estimators
        self.alphas = []
        self.stumps = []

    def fit(self, X, y):
        # Convert y from {0, 1} to {-1, 1}
        y = np.where(y == 0, -1, 1)

        n_samples = X.shape[0]
        weights = np.ones(n_samples) / n_samples  # Initialize weights

        for _ in range(self.n_estimators):
            stump = DecisionStump()
            best_stump = stump.fit(X, y, weights)
            predictions = stump.predict(X, best_stump['feature_index'], best_stump['threshold'], best_stump['inequality'])

            # Calculate the error and alpha (weight of the weak classifier)
            errors = predictions != y
            error_rate = np.dot(weights, errors)

            # Avoid division by zero
            if error_rate == 0:
                error_rate = 1e-10

            alpha = 0.5 * np.log((1 - error_rate) / error_rate)
            self.alphas.append(alpha)
            self.stumps.append(best_stump)

            # Update weights
            weights *= np.exp(-alpha * y * predictions)
            weights /= np.sum(weights)  # Normalize to sum to 1

    def predict(self, X):
        final_predictions = np.zeros(X.shape[0])
        for alpha, stump in zip(self.alphas, self.stumps):
            # Create a new instance of DecisionStump for prediction
            stump_instance = DecisionStump()
            predictions = stump_instance.predict(X, stump['feature_index'], stump['threshold'], stump['inequality'])
            final_predictions += alpha * predictions

        return np.where(final_predictions > 0, 1, 0)  # Convert back to {0, 1}


Testing

In [69]:
 # Initialize and fit AdaBoost
ada_boost = AdaBoost(n_estimators=1)
ada_boost.fit(train_X, train_Y)

 # Predictions
predictions = ada_boost.predict(test_X)
print("Predictions:", predictions.astype(int))

Predictions: [1 0 0 1 0 0 0 0 0 0 0 0 1 0 1 1 0 0 1 1 0 1 1 0 1 1 0 0 1 0 1 0 0 0 0 1 0
 0 0 1 0 0 1 1 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 0 0 1 1 0 0 0 0 0 0 0
 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0
 0 1 0 0 0 0 1 0 0 0 1 0 1 0 1 1 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 0 0 0 0 1 0
 0 0 1 1 0 1 0 1 0 1 1 0 1 1 1 0 1 0 0 0 0 0 0 0 1 0 0 0 0 1 1 0 1 0 0 1 0
 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0
 0 0 0 0 1 0 0 1 0]


Tuning

In [64]:
# Cross-validation function
def cross_validate(model, X, y, folds=6):
    kfold = KFold(n_splits=folds, shuffle=True, random_state=22) # k=folds, split the data into k equal parts
    f1_scores = []
    for train_index, valid_index in kfold.split(X):
      # Splitting Data
        X_train, X_test = X[train_index], X[valid_index]
        y_train, y_test = y[train_index], y[valid_index]

        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
        f1_scores.append(f1_score(y_test, predictions))

    return np.mean(f1_scores)

# hyperparameter_tuning
def hyperparameter_tuning_Ada(X, y):
    best_score, best_params, scores = 0, None, []
    clf = [i for i in range(1,50)]

    for n_estimators in clf:
        model = AdaBoost(n_estimators=n_estimators)
        score = cross_validate(model, X, y)
        scores.append({'n_estimators':n_estimators, 'f1': float(score)})

        if score > best_score:
            best_score = score
            best_params = {'n_estimators': n_estimators}

    return best_params, best_score, scores

In [65]:
best_params, best_score, scores = hyperparameter_tuning_Ada(X, Y)

print("Best hyperparameters:", best_params)
print("Best F1 score:", best_score)

Best hyperparameters: {'n_estimators': 49}
Best F1 score: 0.6903474039074696
