### **Machine Learning Star Classification**
Classification of stars based on their spectral characteristics

**Authors:**
- *Stefano Quaggio 866504*
- *Stefano Andreotti 851596*
- *Alberto Varisco 866109*

**Classification models used:**
- <u>Neural Networks</u>
- <u>Support Vector Machine</u>
- <u>Decision Tree</u>

## <u>Initial Analysis</u>

In [None]:
# All libraries imports
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import keras

from sklearn.preprocessing import LabelEncoder, label_binarize,StandardScaler
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import recall_score, accuracy_score, classification_report, confusion_matrix, roc_curve, roc_auc_score
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn.multiclass import OneVsRestClassifier
from sklearn import svm,calibration

from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation
from tensorflow.keras.callbacks import EarlyStopping 
from keras.optimizers import Adam

from scipy import stats

#Diamo accesso al nostro google drive che conterrà il dataset che utilizzeremo in questo laboratorio
# from google.colab import drive

# drive.mount('/content/drive/')

In [None]:
def define_NN_model():
    nn_model=Sequential()
    nn_model.add(Dense(32, input_dim=input_features, activation='relu'))
    nn_model.add(Dense(32, activation='relu'))
    nn_model.add(Dropout(0.2))
    nn_model.add(Dense(3, activation='softmax'))
    nn_model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])
    return nn_model

# accuracy_stratified: list of classification_report_results as dict
# returns dict like scores
def extractScoresFromClassificationReport(accuracy_stratified : list):
  scores = {
    'accuracy': [],
    'precision': [],
    'recall': [],
    'f1-score': [],
  }
  for index, val in enumerate(accuracy_stratified):
    scores['accuracy'].append(val['accuracy'])
    scores['precision'].append(val['macro avg']['precision'])
    scores['recall'].append(val['macro avg']['recall'])
    scores['f1-score'].append(val['macro avg']['f1-score'])
  return scores

# Imput list of data
# Returns tuple (confidence_interval, mean_value)
def calcConfidenceInterval(data : list):
  mean_value = np.mean(data)
  return (stats.t.interval(0.95, len(data)-1, loc=mean_value, scale=stats.sem(data)), mean_value)


# Parameter accuracy_stratified: list of classification_report returned as dict
# Run confidence_intervals on metrics output plot, legend print legend
def metricGraph(accuracy_stratified : list, legend : bool = True, title = "Metrics with Confidence Interval (0.95)") -> None:
  scores = extractScoresFromClassificationReport(accuracy_stratified)
  
  for index, key in enumerate(scores):
    val = scores[key]
    confidence_interval, mean_value = calcConfidenceInterval(val)
    plt.errorbar(index, mean_value, yerr=(confidence_interval[1] - confidence_interval[0])/2, fmt='o', label=key)
  # ticks on x axis with labels
  plt.xticks(range(0, len(scores.keys())), scores.keys())

  # Add labels and title
  plt.xlabel('Metrics')
  plt.ylabel('Values')
  plt.title(title)

  # Show the plot
  if legend:
    plt.legend()
  plt.show()

In [None]:
full_df = pd.read_csv('../dataset/star_classification.csv')

full_df.head()

In [None]:
full_df.info()
# Check number of missing values in columns
full_df.isnull().sum()

In [None]:
full_df.hist(figsize=(18,18))

In [None]:
# Initialize LabelEncoder object
label_encoder = LabelEncoder()

# Apply LabelEncoder on 'class' column (target) -> 0 = Galaxy, 1 = Quasar, 2 = Star
full_df['class'] = label_encoder.fit_transform(full_df['class'])
label_mapping = { 'Galaxy': 0, 'Quasar': 1, 'Star': 2 }

#Check distribution of target variable
sns.countplot(x = full_df['class'])
plt.show()

In [None]:
# Check correlation between features
plt.figure(figsize=(15,8))
sns.heatmap(full_df.corr(), annot=True, cmap='YlGnBu')

In [None]:
# Remove 'rerun_ID' column as it has only one value and is not useful for classification
if 'return_ID' in full_df:
  full_df.drop(['rerun_ID'], axis=1, inplace=True)

# Split dataset into train and test sets
x = full_df.drop(['class'], axis=1)
y = full_df['class']
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
scaler = StandardScaler()


## Rete neurale

In [None]:
# Feature scaling (standardization)
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

X_train

In [None]:
# Convert target variable to categorical, as it is a multi-class classification problem
y_train_neural = keras.utils.to_categorical(y_train)
y_test_neural = keras.utils.to_categorical(y_test)
print(X_test.shape, y_test_neural.shape)

print(y_train_neural)

In [None]:
input_features = X_train.shape[1]

neural_model=define_NN_model()

# Early stopping 
callback = EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)

# Train the model
history = neural_model.fit(X_train, y_train_neural, epochs=50, batch_size=100, verbose=1, validation_data=(X_test, y_test_neural), callbacks=[callback])

# Evaluate the model on the test set
_, train_acc = neural_model.evaluate(X_train, y_train_neural, verbose=0)
_, test_acc = neural_model.evaluate(X_test, y_test_neural, verbose=0)
print('Train: %.3f, Test: %.3f' % (train_acc, test_acc))

# plot loss during training
plt.figure(figsize=(18,8))
plt.subplot(121)
plt.title('Loss')
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
# plot accuracy during training
plt.subplot(122)
plt.title('Accuracy')
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='test')
plt.legend()
plt.show()

In [None]:
# predict probabilities for test set
yhat_probs = neural_model.predict(X_test, verbose=0)
# predict crisp classes for test set
yhat_classes=np.argmax(yhat_probs,axis=1)
# reduce to 1d array
yhat_probs = yhat_probs[:, 0]

y_test_unidimension = [0 if val[0] else 1 if val[1] else 2 for val in y_test_neural]
    
print(classification_report(y_test_unidimension, yhat_classes, target_names=label_mapping.keys()))

In [None]:
keras.utils.plot_model(neural_model, show_shapes=True)

10-Fold

In [None]:
n_fold = 10
epochs = 30
folds = KFold(n_splits=n_fold, shuffle=True)

nn_k_fold_metrics = []

scaler = StandardScaler()

fold_neural_model=define_NN_model()
fold_neural_model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])


for n_fold, (train_idx, valid_idx) in enumerate(folds.split(x, y)):
    # split data with corss validation indexes
    X_t, X_valid = x.iloc[train_idx], x.iloc[valid_idx]
    X_t = scaler.fit_transform(X_t)
    X_valid = scaler.fit_transform(X_valid)
    y_t, y_valid = y[train_idx], y[valid_idx]
    y_valid = keras.utils.to_categorical(y_valid)
    y_t = keras.utils.to_categorical(y_t)

    # train model on fold
    history = fold_neural_model.fit(X_t, y_t, epochs=epochs, batch_size=100, verbose=1, validation_data=(X_valid, y_valid), callbacks=[callback])

    # predict probabilities for test set
    y_pred_fold_neural_model = fold_neural_model.predict(X_valid, verbose=0)
    # predict crisp classes for test set
    yhat_classes=np.argmax(y_pred_fold_neural_model, axis=1)
    # reduce to 1d array
    y_pred_fold_neural_model = y_pred_fold_neural_model[:, 0]

    y_test_unidimension = [0 if val[0] else 1 if val[1] else 2 for val in y_valid]
        
    nn_k_fold_metrics.append(classification_report(y_test_unidimension, yhat_classes, target_names=label_mapping.keys(), output_dict=True))

In [None]:
metricGraph(nn_k_fold_metrics)

## SVM

In [None]:
# Split dataset into train and test sets
x = full_df.drop(['class'], axis=1)
y = full_df['class']
# Feature scaling (standardization)
# NEEDED for LinearSVC
x_scaled = scaler.fit_transform(x)
x_scaled = pd.DataFrame(x_scaled)
X_train, X_test, y_train, y_test = train_test_split(x_scaled, y, test_size=0.2)

# Print class distribution over training set
print(f"#0: {np.sum(y_train == 0)}")
print(f"#1: {np.sum(y_train == 1)}")
print(f"#2: {np.sum(y_train == 2)}")

In [None]:
# Crea il classificatore SVM
svm_model =  OneVsRestClassifier(svm.LinearSVC(dual="auto", tol=1e-5, C=1))
# per avere probability_distribution sv = CalibratedClassifierCV(sv) (curva roc)
svm_model = calibration.CalibratedClassifierCV(svm_model) 

svm_model.fit(X_train, y_train)
y_pred_svm = svm_model.predict(X_test)

# calcolo dell'accuratezza
# label_mapping dict: Galaxy = 0, Quasar = 1, Star = 2
accuracy = accuracy_score(y_test, y_pred_svm)
print(f"Accuratezza: {accuracy}\n\n")
print(classification_report(y_test, y_pred_svm, target_names=label_mapping.keys()))

In [None]:
#binarize the y_values
y_pred_prob_svm = svm_model.predict_proba(X_test)
classes=np.unique(y_test)
y_test_binarized=label_binarize(y_test,classes=classes)

# roc curve for classes
fpr = {}
tpr = {}
thresh ={}
roc_auc = dict()

n_class = classes.shape[0]

for i in range(n_class):    
    fpr[i], tpr[i], thresh[i] = roc_curve(y_test_binarized[:,i], y_pred_prob_svm[:,i])
    roc_auc[i] = roc_auc_score(y_test_binarized[:,i], y_pred_prob_svm[:,i])
    
# plotting    
plt.plot(fpr[0], tpr[0],label='Galaxy vs Rest (AUC = %0.3f)'%(roc_auc[0]))
plt.plot(fpr[1], tpr[1], label='Quasar vs Rest (AUC = %0.3f)'%(roc_auc[1]))
plt.plot(fpr[2], tpr[2], label='Stars vs Rest (AUC = %0.3f)'%(roc_auc[2]))

plt.plot([0,1],[0,1],'b--')

plt.title('Multiclass ROC curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive rate')
plt.legend(loc='lower right')
plt.show()


In [None]:
conf_matrix_dt = confusion_matrix(y_test, y_pred_svm)
plt.figure(figsize=(3, 3))
sns.heatmap(conf_matrix_dt, annot=True, fmt='d', cmap="Blues", xticklabels=list(("Galaxy", "Quasar", "Star")), yticklabels=list(("Galaxy", "Quasar", "Star")))
plt.xlabel('Predicted')
plt.ylabel('Real')
plt.title('Confusion Matrix SVM')
plt.show()

K-fold

In [None]:
n_fold = 10
folds = KFold(n_splits=n_fold, shuffle=True)

svm_k_fold_metrics = []

for n_fold, (train_idx, valid_idx) in enumerate(folds.split(x_scaled, y)):
  # split data with corss validation indexes
  X_train, X_valid = x_scaled.iloc[train_idx], x_scaled.iloc[valid_idx]
  y_train, y_valid = y[train_idx], y[valid_idx]

  # train model on fold
  svm_model.fit(X_train, y_train)
  y_pred_svm_fold = svm_model.predict(X_valid)
  # save scores of fold
  svm_k_fold_metrics.append(classification_report(y_valid, y_pred_svm_fold, target_names=label_mapping.keys(), output_dict=True))

metricGraph(svm_k_fold_metrics)

## Decision Tree


In [None]:
# Pre-pruning
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV

# Define parameters grid
param_grid = {'criterion': ['gini', 'entropy', 'log_loss'],
              'max_depth': [9, 11, 13, 15, 17],
              'min_samples_leaf': [20, 40, 60],
              'min_samples_split': [40, 60, 80]
              }

clf = DecisionTreeClassifier()
# Use 5-fold cross validation
grid_search = GridSearchCV(clf, param_grid, cv=5)
grid_search.fit(X_train, y_train)

print("Best Parameters: ", grid_search.best_params_)

In [None]:
# Decision Tree
decision_tree_model = DecisionTreeClassifier(random_state = 1000, criterion = 'entropy', max_depth=15, min_samples_leaf=20, min_samples_split=80, class_weight = {0:1, 1:1, 2:2})
decision_tree_model.fit(X_train, y_train)
y_pred_dt = decision_tree_model.predict(X_test)

dtree_score = recall_score(y_test, y_pred_dt, average='weighted')
print(dtree_score)

In [None]:
# Print Tree
fig, ax = plt.subplots(figsize=(150, 100))
plot_tree(decision_tree_model, filled=True, ax=ax)
plt.plot()

In [None]:
y_pred_prob_dt = decision_tree_model.predict_proba(X_test)
classes=np.unique(y_test)
y_test_binarized=label_binarize(y_test,classes=classes)

# ROC curve for classes
fpr = {}
tpr = {}
thresh ={}
roc_auc_dt = dict()
n_class = 3

for i in range(n_class):    
    fpr[i], tpr[i], thresh[i] = roc_curve(y_test_binarized[:,i], y_pred_prob_dt[:,i])
    roc_auc_dt[i] = roc_auc_score(y_test_binarized[:,i], y_pred_prob_dt[:,i])
    
# Plotting    
plt.plot(fpr[0], tpr[0],label='Galaxy vs Rest (AUC = %0.3f)'%(roc_auc_dt[0]))
plt.plot(fpr[1], tpr[1], label='Quasar vs Rest (AUC = %0.3f)'%(roc_auc_dt[1]))
plt.plot(fpr[2], tpr[2], label='Stars vs Rest (AUC = %0.3f)'%(roc_auc_dt[2]))

plt.plot([0,1],[0,1],'b--')

plt.title('Multiclass ROC curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive rate')
plt.legend(loc='lower right')
plt.show()

In [None]:
conf_matrix_dt = confusion_matrix(y_test, y_pred_dt)
plt.figure(figsize=(3, 3))
sns.heatmap(conf_matrix_dt, annot=True, fmt='d', cmap="Blues", xticklabels=list(("Galaxy", "Quasar", "Star")), yticklabels=list(("Galaxy", "Quasar", "Star")))
plt.xlabel('Predicted')
plt.ylabel('Real')
plt.title('Confusion Matrix Decision Tree')
plt.show()

In [None]:
# Complexity vs Accuracy
path = decision_tree_model.cost_complexity_pruning_path(X_train, y_train)

# Use different complexity values
ccp_alphas = path.ccp_alphas

# Train with different complexity
train_accuracy = []
test_accuracy = []
for complexity in ccp_alphas:
    clf = DecisionTreeClassifier(max_depth=3, ccp_alpha=complexity)
    clf.fit(X_train, y_train)
    train_accuracy.append(clf.score(X_train, y_train))
    test_accuracy.append(clf.score(X_test, y_test))

# Make plots
plt.plot(ccp_alphas, train_accuracy, label='Training Accuracy')
plt.plot(ccp_alphas, test_accuracy, label='Test Accuracy')
plt.xlabel('Complexity Parameter')
plt.ylabel('Accuracy')
plt.title('Accuracy vs. Complexity Parameter')
plt.xscale('log')
plt.legend()
plt.show()

In [None]:
# Post pruning
clfs = []
for ccp_alpha in ccp_alphas:
    clf = DecisionTreeClassifier(random_state=0, ccp_alpha=ccp_alpha)
    clf.fit(X_train, y_train)
    clfs.append(clf)
print(
    "Number of nodes in the last tree is: {} with ccp_alpha: {}".format(
        clfs[-1].tree_.node_count, ccp_alphas[-1]
    )
)

clfs = clfs[:-1]
complexity_values = ccp_alphas[:-1]

node_counts = [clf.tree_.node_count for clf in clfs]
depth = [clf.tree_.max_depth for clf in clfs]
fig, ax = plt.subplots(2, 1)
ax[0].plot(complexity_values, node_counts, marker="o", drawstyle="steps-post")
ax[0].set_xlabel("Alpha")
ax[0].set_ylabel("Node number")
ax[0].set_title("Node number vs Alpha")
ax[1].plot(complexity_values, depth, marker="o", drawstyle="steps-post")
ax[1].set_xlabel("Alpha")
ax[1].set_ylabel("Tree depth")
ax[1].set_title("Depth vs Alpha")
fig.tight_layout()

K-fold

In [None]:
n_fold = 10
folds = KFold(n_splits=n_fold, shuffle=True)

tree_k_fold_metrics = []
decision_tree_model = DecisionTreeClassifier()


for n_fold, (train_idx, valid_idx) in enumerate(folds.split(x, y)):
  # Split data with corss validation indexes
  X_train, X_valid = x_scaled.iloc[train_idx], x_scaled.iloc[valid_idx]
  y_train, y_valid = y[train_idx], y[valid_idx]

  # Train model on fold
  decision_tree_model.fit(X_train, y_train)
  y_pred_dt_fold = decision_tree_model.predict(X_valid)
  # Save scores of fold
  tree_k_fold_metrics.append(classification_report(y_valid, y_pred_dt_fold, target_names=label_mapping.keys(), output_dict=True))

metricGraph(tree_k_fold_metrics)

In [None]:
print(classification_report(y_test_unidimension, yhat_classes, target_names=label_mapping.keys()))

# Compare

In [None]:
nn_scores = extractScoresFromClassificationReport(nn_k_fold_metrics)
svm_scores = extractScoresFromClassificationReport(svm_k_fold_metrics)
tree_score = extractScoresFromClassificationReport(tree_k_fold_metrics)

for key in nn_scores:
  nn_confint, nn_mean_val = calcConfidenceInterval(nn_scores[key])
  plt.errorbar(0, nn_mean_val, yerr=(nn_confint[1] - nn_confint[0])/2, fmt='o', label='nn')

  svm_confint, svm_mean_val = calcConfidenceInterval(svm_scores[key])
  plt.errorbar(1, svm_mean_val, yerr=(svm_confint[1] - svm_confint[0])/2, fmt='o', label='svm')

  tree_confint, tree_mean_val = calcConfidenceInterval(tree_score[key])
  plt.errorbar(2, tree_mean_val, yerr=(tree_confint[1] - tree_confint[0])/2, fmt='o', label='tree')

  # ticks on x axis with labels
  plt.xticks(range(0, 3), ['nn', 'svm', 'tree'])

  # Add labels and title
  plt.xlabel('Metrics')
  plt.ylabel('Values')
  plt.title(f"Comparing {key} metric between models")
  # Show the plot
  plt.show()
