<a href="https://colab.research.google.com/github/mansimar11/Asteroid_Spectra_ml_dl/blob/main/ml_svm_binary_search_multiclass.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import standard libraries
import os

# Import installed libraries
import numpy as np
import pandas as pd
import sklearn

In [None]:
# Let's mount the Google Drive, where we store files and models (if applicable, otherwise work
# locally)
try:
    from google.colab import drive
    drive.mount('/gdrive')
    core_path = "/gdrive/MyDrive/Colab/asteroid_taxonomy/"
except ModuleNotFoundError:
    core_path = ""

Mounted at /gdrive


In [None]:
# Load the level 2 asteroid data
asteroids_df = pd.read_pickle(os.path.join(core_path, "data/lvl2/", "asteroids.pkl"))

In [None]:
# Now we add a binary classification schema, where we distinguish between e.g., X and non-X classes
asteroids_df.loc[:, "Class"] = asteroids_df["Main_Group"].apply(lambda x: 1 if x=="X" else 0)

In [None]:
# Allocate the spectra to one array and the classes to another one
asteroids_X = np.array([k["Reflectance_norm550nm"].tolist() for k in asteroids_df["SpectrumDF"]])
asteroids_y = np.array(asteroids_df["Class"].to_list())

In [None]:
# In this example we create a single test-training split with a ratio of 0.8 / 0.2
# The StratifiedShuffleSplit is needed to preserve the ratio of the classes!
from sklearn.model_selection import StratifiedShuffleSplit
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)

for train_index, test_index in sss.split(asteroids_X, asteroids_y):
    X_train, X_test = asteroids_X[train_index], asteroids_X[test_index]
    y_train, y_test = asteroids_y[train_index], asteroids_y[test_index]

In [None]:
# Let's take a look whether the unbalanced ratio has been preserved
print(f"Ratio of positive training classes: {round(sum(y_train) / len(X_train), 2)}")
print(f"Ratio of positive test classes: {round(sum(y_test) / len(X_test), 2)}")

Ratio of positive training classes: 0.18
Ratio of positive test classes: 0.18


In [None]:
# Compute class weightning
positive_class_weight = int(1.0 / (sum(y_train) / len(X_train)))
print(f"Positive Class weightning: {positive_class_weight}")

Positive Class weightning: 5


In [None]:
# Import the preprocessing module
from sklearn import preprocessing

# Instantiate the StandardScaler (mean 0, standard deviation 1) and use the training data to fit
# the scaler
scaler = preprocessing.StandardScaler().fit(X_train)

# Transform now the training data
X_train_scaled = scaler.transform(X_train)

In [None]:
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.svm import SVC
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import classification_report, accuracy_score
import numpy as np
positive_class_weight = 2

# Create an SVM classifier with a radial basis function (RBF) kernel
wclf = make_pipeline(StandardScaler(), SVC(kernel='rbf', class_weight={1: positive_class_weight}, C=100))

# Specify the number of folds for cross-validation
num_folds = 5

# Create a StratifiedKFold object to ensure class balance in each fold
kf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

# Perform cross-validation

cross_val_scores = cross_val_score(wclf, X_train_scaled, y_train, cv=kf, scoring='accuracy')

wclf.fit(X_train_scaled, y_train)
# Display the cross-validation scores
print("Cross-validation scores:", cross_val_scores)

# Calculate and display the mean accuracy
mean_accuracy = np.mean(cross_val_scores)
print("Mean accuracy:", mean_accuracy)

Cross-validation scores: [0.97209302 0.97663551 0.95794393 0.97663551 0.97663551]
Mean accuracy: 0.9719886981091067


In [None]:
# Scale the testing data ...
X_test_scaled = scaler.transform(X_test)

# ... and perform a predicition
y_test_pred = wclf.predict(X_test_scaled)

In [None]:
# Import the confusion matrix and perform the computation
from sklearn.metrics import confusion_matrix
conf_mat = confusion_matrix(y_test, y_test_pred)

print(conf_mat)

# The order of the confusion matrix is:
#     - true negative (top left, tn)
#     - false positive (top right, fp)
#     - false negative (bottom left, fn)
#     - true positive (bottom right, tp)
tn, fp, fn, tp = conf_mat.ravel()

[[217   4]
 [  2  45]]


In [None]:
# Recall: ratio of correctly classified X Class spectra, considering the false negatives
# (recall = tp / (tp + fn))
recall_score = round(sklearn.metrics.recall_score(y_test, y_test_pred), 3)
print(f"Recall Score: {recall_score}")

# Precision: ratio of correctly classified X Class spectra, considering the false positives
# (precision = tp / (tp + fp))
precision_score = round(sklearn.metrics.precision_score(y_test, y_test_pred), 3)
print(f"Precision Score: {precision_score}")

# A combined score
f1_score = round(sklearn.metrics.f1_score(y_test, y_test_pred), 3)
print(f"F1 Score: {f1_score}")

Recall Score: 0.957
Precision Score: 0.918
F1 Score: 0.938


In [None]:
# We copy the original labelling and shuffle it randomly
asteroids_random_y = asteroids_y.copy()
np.random.shuffle(asteroids_random_y)

In [None]:
# Now we can apply e.g., the F1 score on the random-classifier. Please note: theoretically this
# should have been done before the training! But in this initial video we keep it simple in a more
# "storytelling" way
f1_score_naive = round(sklearn.metrics.f1_score(asteroids_y, asteroids_random_y), 3)
print(f"Naive F1 Score: {f1_score_naive}")

Naive F1 Score: 0.194


In [None]:
# Import standard libraries
import os

# Import installed libraries
import numpy as np
import pandas as pd
import sklearn

from sklearn import preprocessing
from sklearn import svm
from sklearn.model_selection import GridSearchCV

In [None]:
# Load the level 2 asteroid data
asteroids_df = pd.read_pickle(os.path.join(core_path, "data/lvl2/", "asteroids.pkl"))

In [None]:
# Now we add a binary classification schema, where we distinguish between e.g., X and non-X classes
asteroids_df.loc[:, "Class"] = asteroids_df["Main_Group"].apply(lambda x: 1 if x=="X" else 0)

In [None]:
# Allocate the spectra to one array and the classes to another one
asteroids_X = np.array([k["Reflectance_norm550nm"].tolist() for k in asteroids_df["SpectrumDF"]])
asteroids_y = np.array(asteroids_df["Class"].to_list())

In [None]:
# In this example we create a single test-training split with a ratio of 0.8 / 0.2
from sklearn.model_selection import StratifiedShuffleSplit
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2)

# Create a simple, single train / test split
for train_index, test_index in sss.split(asteroids_X, asteroids_y):

    X_train, X_test = asteroids_X[train_index], asteroids_X[test_index]
    y_train, y_test = asteroids_y[train_index], asteroids_y[test_index]

# Compute class weightning
positive_class_weight = int(1.0 / (sum(y_train) / len(X_train)))

In [None]:
# Perform now a GridSearch with the following parameter range and kernels
param_grid = [
  {'C': [1, 10, 100], 'kernel': ['linear']},
  {'C': [1, 10, 100], 'kernel': ['rbf']},
 ]

# Set the SVM classifier
svc = svm.SVC(class_weight={1: positive_class_weight})

# Instantiate the StandardScaler (mean 0, standard deviation 1) and use the training data to fit
# the scaler
scaler = preprocessing.StandardScaler().fit(X_train)

# Transform now the training data
X_train_scaled = scaler.transform(X_train)

# Set the GridSearch and ...
wclf = GridSearchCV(svc, param_grid, scoring='f1', verbose=3, cv=5)

# ... perform the training!
wclf.fit(X_train_scaled, y_train)

Fitting 5 folds for each of 6 candidates, totalling 30 fits
[CV 1/5] END ................C=1, kernel=linear;, score=0.541 total time=   0.2s
[CV 2/5] END ................C=1, kernel=linear;, score=0.492 total time=   0.1s
[CV 3/5] END ................C=1, kernel=linear;, score=0.511 total time=   0.2s
[CV 4/5] END ................C=1, kernel=linear;, score=0.565 total time=   0.2s
[CV 5/5] END ................C=1, kernel=linear;, score=0.532 total time=   0.1s
[CV 1/5] END ...............C=10, kernel=linear;, score=0.589 total time=   0.7s
[CV 2/5] END ...............C=10, kernel=linear;, score=0.504 total time=   1.0s
[CV 3/5] END ...............C=10, kernel=linear;, score=0.523 total time=   1.1s
[CV 4/5] END ...............C=10, kernel=linear;, score=0.587 total time=   0.9s
[CV 5/5] END ...............C=10, kernel=linear;, score=0.557 total time=   0.7s
[CV 1/5] END ..............C=100, kernel=linear;, score=0.587 total time=   7.4s
[CV 2/5] END ..............C=100, kernel=linear;,

In [None]:
# Optional: get the best estimator
final_clf = wclf.best_estimator_

In [None]:
# Scale the testing data ...
X_test_scaled = scaler.transform(X_test)

# ... and perform a predicition
y_test_pred = final_clf.predict(X_test_scaled)

In [None]:
# Import the confusion matrix and perform the computation
from sklearn.metrics import confusion_matrix
conf_mat = confusion_matrix(y_test, y_test_pred)

print(conf_mat)

# The order of the confusion matrix is:
#     - true negative (top left, tn)
#     - false positive (top right, fp)
#     - false negative (bottom left, fn)
#     - true positive (bottom right, tp)
tn, fp, fn, tp = conf_mat.ravel()

[[218   3]
 [  3  44]]


In [None]:
# Recall: ratio of correctly classified X Class spectra, considering the false negatives
# (recall = tp / (tp + fn))
recall_score = round(sklearn.metrics.recall_score(y_test, y_test_pred), 3)
print(f"Recall Score: {recall_score}")

# Precision: ratio of correctly classified X Class spectra, considering the false positives
# (precision = tp / (tp + fp))
precision_score = round(sklearn.metrics.precision_score(y_test, y_test_pred), 3)
print(f"Precision Score: {precision_score}")

# A combined score
f1_score = round(sklearn.metrics.f1_score(y_test, y_test_pred), 3)
print(f"F1 Score: {f1_score}")

Recall Score: 0.936
Precision Score: 0.936
F1 Score: 0.936


In [None]:
asteroids_df = pd.read_pickle(os.path.join(core_path, "data/lvl2/", "asteroids.pkl"))

In [None]:
# Allocate the spectra to one array and the classes to another one
asteroids_X = np.array([k["Reflectance_norm550nm"].tolist() for k in asteroids_df["SpectrumDF"]])
asteroids_y = np.array(asteroids_df["Main_Group"].to_list())

In [None]:
# In this example we create a single test-training split with a ratio of 0.8 / 0.2
from sklearn.model_selection import StratifiedShuffleSplit
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2)

# Create a simple, single train / test split
for train_index, test_index in sss.split(asteroids_X, asteroids_y):

    X_train, X_test = asteroids_X[train_index], asteroids_X[test_index]
    y_train, y_test = asteroids_y[train_index], asteroids_y[test_index]

# Compute class weightnings
weight_dict = {}
for ast_type in np.unique(y_train):
    weight_dict[ast_type] = int(1.0 / (len(y_train[y_train == ast_type]) / (len(y_train))))

In [None]:
from sklearn.metrics import make_scorer, f1_score

# Perform now a GridSearch with the following parameter range and kernels
param_grid = [
  {'C': np.logspace(0, 3.5, 25), 'kernel': ['linear']},
  {'C': np.logspace(0, 3.5, 25), 'kernel': ['rbf']},
 ]

# Set the SVM classifier
svc = svm.SVC(class_weight=weight_dict)

# Instantiate the StandardScaler (mean 0, standard deviation 1) and use the training data to fit
# the scaler
scaler = preprocessing.StandardScaler().fit(X_train)

# Transform now the training data
X_train_scaled = scaler.transform(X_train)

# Set the GridSearch. Use the f1 "weighted" score in a maker_scorer function. And ...
wclf = GridSearchCV(svc, param_grid, scoring=make_scorer(f1_score, average="weighted"), verbose=3, cv=5)

# ... perform the training!
wclf.fit(X_train_scaled, y_train)

Fitting 5 folds for each of 50 candidates, totalling 250 fits
[CV 1/5] END ..............C=1.0, kernel=linear;, score=0.810 total time=   0.1s
[CV 2/5] END ..............C=1.0, kernel=linear;, score=0.846 total time=   0.1s
[CV 3/5] END ..............C=1.0, kernel=linear;, score=0.840 total time=   0.1s
[CV 4/5] END ..............C=1.0, kernel=linear;, score=0.840 total time=   0.1s
[CV 5/5] END ..............C=1.0, kernel=linear;, score=0.845 total time=   0.1s
[CV 1/5] END C=1.3990503141372939, kernel=linear;, score=0.822 total time=   0.1s
[CV 2/5] END C=1.3990503141372939, kernel=linear;, score=0.853 total time=   0.1s
[CV 3/5] END C=1.3990503141372939, kernel=linear;, score=0.844 total time=   0.1s
[CV 4/5] END C=1.3990503141372939, kernel=linear;, score=0.828 total time=   0.1s
[CV 5/5] END C=1.3990503141372939, kernel=linear;, score=0.853 total time=   0.1s
[CV 1/5] END C=1.9573417814876604, kernel=linear;, score=0.822 total time=   0.1s
[CV 2/5] END C=1.9573417814876604, kernel

In [None]:
# Let's print some SVM results
final_clf = wclf.best_estimator_

print(f"Kernel with the best result: {final_clf.kernel}")
print(f"SVM information: {final_clf}")

Kernel with the best result: rbf
SVM information: SVC(C=20.53525026457146, class_weight={'C': 3, 'Other': 8, 'S': 2, 'X': 5})


In [None]:
# Scale the testing data ...
X_test_scaled = scaler.transform(X_test)

# ... and perform a predicition
y_test_pred = final_clf.predict(X_test_scaled)

In [None]:
# Import the confusion matrix and perform the computation
from sklearn.metrics import confusion_matrix
conf_mat = confusion_matrix(y_test, y_test_pred, labels=["C", "S", "X", "Other"])

print(conf_mat)

[[ 78   0   1   0]
 [  0 106   0   4]
 [  1   0  46   1]
 [  0   0   2  29]]


In [None]:
# A combined score
f1_score = round(sklearn.metrics.f1_score(y_test, y_test_pred, average="weighted"), 3)
print(f"F1 Score: {f1_score}")

F1 Score: 0.967
