In [171]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split,StratifiedKFold
from numba import njit,prange
from joblib import Parallel, delayed
from itertools import product
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA
import pickle

In [172]:
df=pd.read_csv('creditcard.csv')

In [173]:
target_correlations=df.corr()['Class'].sort_values(ascending=False)

In [158]:
target_correlations

Class     1.000000
V11       0.154876
V4        0.133447
V2        0.091289
V21       0.040413
V19       0.034783
V20       0.020090
V8        0.019875
V27       0.017580
V28       0.009536
Amount    0.005632
V26       0.004455
V25       0.003308
V22       0.000805
V23      -0.002685
V15      -0.004223
V13      -0.004570
V24      -0.007221
Time     -0.012323
V6       -0.043643
V5       -0.094974
V9       -0.097733
V1       -0.101347
V18      -0.111485
V7       -0.187257
V3       -0.192961
V16      -0.196539
V10      -0.216883
V12      -0.260593
V14      -0.302544
V17      -0.326481
Name: Class, dtype: float64

In [174]:
df_clean=df[['V11','V4','V1','V18','V7','V3','V16','V10','V12','V14','V17','Class']]

In [175]:
X=df_clean.drop('Class',axis=1).values
y=df_clean['Class'].values

In [176]:
X_train,X_test,y_train,y_test=train_test_split(X,y,stratify=y,test_size=0.2,random_state=77)

In [177]:
scaler=RobustScaler()
X_train_s=scaler.fit_transform(X_train)
X_test_s=scaler.transform(X_test)

In [179]:
@njit(parallel=True)
def euclidean_distance(X_train, X_test):
    num_train, num_features = X_train.shape  # Get the number of training samples and features
    num_test = X_test.shape[0]  # Get the number of test samples
    distances = np.zeros((num_test, num_train))  # Initialize the distance matrix
    
    # Loop over each test sample
    for i in prange(num_test):
        distances[i] = np.sqrt(np.sum((X_train - X_test[i]) ** 2, axis=1))
    
    return distances  # Return the distance matrix

# Define a function to predict labels based on the distances
@njit(parallel=True)
def predict_labels(distances, y_train, k, threshold):
    num_test = distances.shape[0]  # Get the number of test samples
    predictions = np.zeros(num_test, dtype=np.int32)  # Initialize the predictions array
    
    # Loop over each test sample
    for i in prange(num_test):
        neighbors_indices = np.argsort(distances[i])[:k]
        neighbor_labels = y_train[neighbors_indices]
        count_1 = np.sum(neighbor_labels == 1)
        predictions[i] = 1 if count_1 > k * threshold else 0
    
    return predictions  # Return the predictions

# Define a class for the k-Nearest Neighbors classifier
class KNNClassifier:
    
    def __init__(self, k=3, threshold=0.5):
        self.k = k  # Set the number of neighbors
        self.threshold = threshold  # Set the threshold for prediction
    
    def fit(self, X_train, y_train):
        self.X_train = X_train  # Store the training data
        self.y_train = y_train  # Store the training labels
    
    def predict(self, X_test, batch_size=100):
        num_samples = X_test.shape[0]  # Get the number of test samples
        predictions = np.zeros(num_samples, dtype=np.int32)  # Initialize the predictions array
        
        # Loop over test samples in batches
        for i in range(0, num_samples, batch_size):
            end_index = min(i + batch_size, num_samples)  # Determine the end index for the current batch
            batch_X_test = X_test[i:end_index]  # Get the current batch of test samples
            distances = euclidean_distance(self.X_train, batch_X_test)  # Compute distances
            batch_predictions = predict_labels(distances, self.y_train, self.k, self.threshold)  # Predict labels for the batch
            predictions[i:end_index] = batch_predictions  # Store the batch predictions
        
        return predictions  # Return the predictions
    
    def evaluate(self, X_test, y_test):
        self._predictions = self.predict(X_test)  # Get predictions for the test data
        self._accuracy = np.sum(self._predictions == y_test) / len(y_test)  # Compute accuracy
        
        # Compute the number of true positives, false positives, and false negatives
        true_positives = np.sum((self._predictions == 1) & (y_test == 1))
        false_positives = np.sum((self._predictions == 1) & (y_test == 0))
        false_negatives = np.sum((self._predictions == 0) & (y_test == 1))
        
        # Compute precision and recall
        self._precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        self._recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    
    def predictions(self):
        return self._predictions  # Return the predictions
    
    def metrics(self):
        return np.array([self._accuracy, self._precision, self._recall])  # Return accuracy, precision, and recall

In [127]:
kfold=StratifiedKFold(n_splits=3,random_state=98,shuffle=True)
hyper_parameter_search=np.array([])
k_array=np.arange(5,30,5)


In [168]:
model=KNNClassifier()

In [169]:
model.fit(X_train_s,y_train)
model.evaluate(X_test_s,y_test)

In [170]:
model.predictions()

array([         0, -875836469, -875836469, ..., -875836469, -875836469,
       -875836469], dtype=int32)