In [None]:
# Loading necessary modlules
import numpy as np
import os 
import os.path as osp
import itertools
import astropy.io.fits as fits
from sklearn import metrics
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from pprint import pprint
plt.style.use('seaborn-ticks')
import warnings
from sklearn.utils import resample

### Metrics
# from sources.ml_f1 import*
from sources.ml_precision import*
# from sources.recall_review import*
from sklearn import metrics
from sklearn.metrics import accuracy_score as accuracy
from sklearn.metrics import f1_score as f1
from sklearn.metrics import recall_score as recall
from sklearn.metrics import precision_score as precision
from sklearn.model_selection import train_test_split
from scipy.spatial import distance

# ML models  
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
import xgboost
from xgboost import plot_importance

In [None]:
catalogue = pd.read_csv('old-test-train/raw_data_original.csv')


In [None]:
X = catalogue.drop(['class_labels'], axis = 1)
labels = catalogue['class_labels']

# encoding target class
y, clas = pd.factorize(labels) #getting the class 0 = agn, 1 =notagn, 2 = no class
y_target = pd.DataFrame(y, columns = ['class_labels'])

data = pd.concat([X, y_target], axis=1)

In [None]:
len(data
   )

In [None]:
# Separate classes
class_counts = data['class_labels'].value_counts()
majority_class = class_counts.idxmax()
minority_class = class_counts.idxmin()

df_majority = data[data['class_labels'] == majority_class]
df_minority = data[data['class_labels'] == minority_class]

# Downsample majority class and keep the removed samples
df_majority_downsampled = resample(df_majority,
                                   replace=False,    # sample without replacement
                                   n_samples=len(df_minority),  # to match minority class
                                   random_state=42)  # reproducible results

# Get the removed samples by finding the difference between original and downsampled
removed_samples = df_majority[~df_majority.index.isin(df_majority_downsampled.index)]

# Combine minority class with downsampled majority class
df_balanced = pd.concat([df_majority_downsampled, df_minority])

# Split back into X and y
X_balanced = df_balanced.drop('class_labels', axis=1)
y_balanced = df_balanced['class_labels']
X_removed = removed_samples.drop('class_labels', axis=1)
y_removed = removed_samples['class_labels']

In [None]:
print("The Original Classes: ", class_counts)
print("The Balanced Classes: ", df_balanced['class_labels'].value_counts())
print('Removed sample: ', removed_samples['class_labels'].value_counts())


In [None]:
df_balanced

In [None]:
removed_samples

In [None]:
## random forest (RF)
# The Random Hyper parameter Grid

# number of trees in the forest
n_estimators = [50, 100, 150]

# Number of feature to consider at every split
max_features = [2, 3]

# Maximum number of levels in tree
max_depth = [5, 10]

# Minimum number of samples required to split a node
min_samples_split = [2, 5]

# Minimum number of samples required at each leaf node
min_samples_leaf = [1, 3]

# Method of selecting samples for training each tree
bootstrap = [True, False]


# Create the random grid
rf_par = {'n_estimators': n_estimators,
               'max_features': max_features,
               'max_depth': max_depth}
               # 'min_samples_split': min_samples_split,
               # 'min_samples_leaf': min_samples_leaf,
               # 'bootstrap': bootstrap}



rf_model= RandomForestClassifier(random_state=1)
rf_par = dict(n_estimators=n_estimators)

In [None]:
# KNN model
knn_model = KNeighborsClassifier()

## KNN parameters
knn_par = {'n_neighbors' : [5, 10, 15], 'p':[1, 2], 'weights' : ['uniform', 'distance'] }


In [None]:
# Set up models and Parameters for a "for loop"  

models = [[knn_model, 'knn'], [rf_model, 'rf']]

parameters = [ knn_par, rf_par]

In [None]:
ml_dicts = {}

In [None]:
features = [['qir', 'class_star', 'log(S8/S45)','log(S58/S36)', 'Mstar', 'log(S45/S36)']]

In [None]:
splits = [0.8, 0.6, 0.4, 0.2]
tr_sizes = [0.2, 0.4, 0.6, 0.8]

X_test = X_removed
y_test = y_removed

# Loop through different ML models coupled with thier hyper paramter (use the same splits for all features)
for m, par in zip(models, parameters):
    key0 = str(m[1])
    print(key0)
    ml_dicts[key0] = {} # defining The main subkeys, which are the machine learning models
        
    for s, tr in zip(splits, tr_sizes):
        X_train, X_vald, y_train, y_vald = train_test_split(X_balanced, y_balanced, test_size= s, random_state=1, stratify = y_balanced, shuffle = True)
        
        i = 1
        for f in features:
            xtr =  X_train[f]
            xva =  X_vald[f]
            xte =  X_test[f]
            
            # results = get_f1_ml (m[0], par, xtr, y_train, xva, y_vald, xte, y_test) # to get the f1 for the ml model
            results = get_precision_ml (m[0], par, xtr, y_train, xva, y_vald, xte, y_test, split=s) # to get the f1 for the ml model
            # results = get_recall_ml (m[0], par, xtr, y_train, xva, y_vald, xte, y_test) # to get the f1 for the ml model
            

            key = str(tr)+", F"+str((i)) # Create keys for the each feature set in order to reference results
            ml_dicts[key0][key] = {}

            ml_dicts[key0][key]['tot_f1_vald'] = results[0]
            ml_dicts[key0][key]['tot_f1_test'] = results[1]
            ml_dicts[key0][key]['jack_train'] = results[2]
            ml_dicts[key0][key]['jack_vald'] = results[3]
            ml_dicts[key0][key]['jack_test'] = results[4]
            i += 1
            
            
import json
with open('knn_rf_comparison_original.txt', 'w') as file:
    file.write(json.dumps(ml_dicts)) 

In [None]:
arr_all = []
for m, d in zip (models, ml_dicts.keys()):
    f1_arr_vald = []
    f1_arr_test = []
    sd_vald_arr = []
    sd_arr = [] 
    
    # print(ml_dicts[d])
    for key in ml_dicts[d].keys():
        f1_arr_vald.append(ml_dicts[d][key][ 'tot_f1_vald' ]) # append total valdation f1 score to an array
        f1_arr_test.append(ml_dicts[d][key][ 'tot_f1_test' ]) # append total test f1 score to an array
        
        sd_train = jack_SD(np.zeros( len(ml_dicts[d][key][ 'jack_train' ]) ), ml_dicts[d][key][ 'jack_train' ])[0]
        sd_vald = jack_SD(np.zeros( len(ml_dicts[d][key][ 'jack_vald' ]) ), ml_dicts[d][key][ 'jack_vald' ])[0]
        sd_test = jack_SD(np.zeros( len(ml_dicts[d][key][ 'jack_test' ]) ), ml_dicts[d][key][ 'jack_test' ])[0]
        
        sd_v = np.sqrt( np.array((sd_train**2)) + np.array((sd_vald**2)))
        sd = np.sqrt( np.array((sd_train**2)) + np.array((sd_test**2)))
       
        sd_vald_arr.append(sd_v)
        sd_arr.append(sd)
        # append the SD to the sd_arr
    arr_all.append([ list(ml_dicts[d].keys()), f1_arr_vald, f1_arr_test, sd_vald_arr, sd_arr])    


In [None]:
import csv

# Your data
data = arr_all
# Define headers
headers = [
    "Train Fraction",
    "KNN Validation Score", "KNN Test Score", "KNN Val Error", "KNN Test Error",
    "RF Validation Score", "RF Test Score", "RF Val Error", "RF Test Error"
]

# Extract data
train_fractions = [0.2, 0.4, 0.6, 0.8]
knn_val_scores = data[0][1]
knn_test_scores = data[0][2]
knn_val_errors = data[0][3]
knn_test_errors = data[0][4]
rf_val_scores = data[1][1]
rf_test_scores = data[1][2]
rf_val_errors = data[1][3]
rf_test_errors = data[1][4]

# Write to CSV
with open('normalised/recall_balanced_13_july_2.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(headers)  # Write header row
    for i, frac in enumerate(train_fractions):
        row = [
            frac,
            knn_val_scores[i], knn_test_scores[i], knn_val_errors[i], knn_test_errors[i],
            rf_val_scores[i], rf_test_scores[i], rf_val_errors[i], rf_test_errors[i]
        ]
        writer.writerow(row)

print("CSV file 'model_comparison.csv' has been created.")

In [None]:
arr_all

In [None]:
data = arr_all
# Extract x-axis values (fraction of train size)
x = [0.2, 0.4, 0.6, 0.8]

# Prepare data for KNN and Random Forest
knn_val_scores = data[0][1]
knn_test_scores = data[0][2]
knn_val_errors = data[0][3]
knn_test_errors = data[0][4]

rf_val_scores = data[1][1]
rf_test_scores = data[1][2]
rf_val_errors = data[1][3]
rf_test_errors = data[1][4]

# Plot settings
# plt.figure(figsize=(10, 6))
plt.figure(figsize=(16, 8))
plt.xlabel('Fraction of Train Size', fontsize=18)
plt.ylabel('F1 Score', fontsize=18)
# plt.title('Comparison of KNN and Random Forest')
plt.xticks(x, fontsize=18)
plt.yticks(fontsize=18)
plt.grid(True, linestyle='--', alpha=0.6)

# Plot KNN
plt.errorbar(x, knn_val_scores, yerr=knn_val_errors, fmt='s', label='kNN', capsize=5, color='blue')
# plt.errorbar(x, knn_test_scores, yerr=knn_test_errors, fmt='--o', label='KNN (Test)', capsize=5, color='lightblue')

# Plot Random Forest
plt.errorbar(x, rf_val_scores, yerr=rf_val_errors, fmt='o', label='RF', capsize=5, color='green')
# plt.errorbar(x, rf_test_scores, yerr=rf_test_errors, fmt='o', label='Random Forest (Test)', capsize=5, color='lightgreen')

# Add grid and adjust layout
plt.ylim(0.8, 1.)
plt.grid(True, linestyle='--', alpha=0.4)
plt.tight_layout()
plt.legend(loc="lower right", fontsize=16)
plt.show()

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

# Example data (replace with your actual data)
y_true = y_removed  # Only positive class (1)
y_pred = pd.read_csv('normalised/yPred_SFGs.csv')  # Contains both classes
y_pred_knn_08 = pd.read_csv('normalised/KNeighborsClassifier()0.2_yPred_SFGs.csv')  # Contains both classes
y_pred_knn_02 = pd.read_csv('normalised/KNeighborsClassifier()0.8_yPred_SFGs.csv')  # Contains both classes

# Calculate metrics
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f"Precision: {precision:.4f}")  # TP / (TP + FP)
print(f"Recall: {recall:.4f}")       # TP / (TP + FN)
print(f"F1 Score: {f1:.4f}")         # 2 * (precision * recall) / (precision + recall)

In [None]:
# y_pred.value_counts()
print("02 Train Size: ",y_pred_knn_02.value_counts())
print("08 Train Size: ",y_pred_knn_08.value_counts())

In [None]:
def man_confusion_matrix(cm, classes, 
                        name = '',
                        normalize=False,
                        title='Confusion Matrix',
                        cmap=plt.cm.Greens):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title, fontsize=26)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, fontsize=20,rotation=45)
    plt.yticks(tick_marks, classes, fontsize=20)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)
    
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black",
                 fontsize=28)

    plt.tight_layout()
    plt.ylabel('True Label', fontsize=20)
    plt.xlabel('Predicted Label', fontsize=20)
    plt.savefig(name)
    plt.show()# Feature importance for the experiment

In [None]:
# Compute confusion matrix for kNN classifier
cm_test = metrics.confusion_matrix( y_true, y_pred)
np.set_printoptions(precision=2)

# Plot normalized confusion matrix
plt.figure(figsize=(11,11))
man_confusion_matrix(cm_test,classes=['AGN','SFG'], name = 'cm_sfgs', normalize=True,
                      title='Normalized confusion matrix')

In [None]:
# Compute confusion matrix for kNN classifier
cm_test = metrics.confusion_matrix( y_true, y_pred_knn_02)
np.set_printoptions(precision=2)

# Plot normalized confusion matrix
plt.figure(figsize=(11,11))
man_confusion_matrix(cm_test,classes=['AGN','SFG'], name = 'cm_sfgs_02', normalize=True,
                      title='02 Train Size confusion matrix')

In [None]:
# Compute confusion matrix for kNN classifier
cm_test = metrics.confusion_matrix( y_true, y_pred_knn_08)
np.set_printoptions(precision=2)

# Plot normalized confusion matrix
plt.figure(figsize=(11,11))
man_confusion_matrix(cm_test,classes=['AGN','SFG'], name = 'cm_sfgs_08', normalize=True,
                      title='08 Train Size confusion matrix')