In [None]:
import os, sys

import pandas as pd
import numpy as np
import random
import seaborn as sns

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.ticker import MaxNLocator


from sklearn import preprocessing
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay

import scipy
from scipy import stats

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

In [None]:
"""
Reading in the spectroscopic and taxonomy data as well as the list of usable asteroids.
"""
asteroid_list = pd.read_excel('asteroids.xlsx')
spectradata_full = pd.read_excel('gaia-dr3-sso-spectra.xlsx')
taxonomy_full = pd.read_excel('EAR-simplified-taxonomy.xlsx')

In [None]:
"""
Preprocess the spectra by taking only asteroids that can be found in photometric dataset as well,
and dropping all of the bad quality spectra.
"""
spectradata_full['inboth'] = spectradata_full['ast_num'].isin(asteroid_list).astype(int)
spectradata = spectradata_full[spectradata_full.inboth != 0]
spectradata.replace(-99.9, np.nan, inplace=True)
spectradata = spectradata.dropna()
spectradata_temp = spectradata[spectradata.no_of_1_flags == 0]
spectradata_temp = spectradata[spectradata.no_of_2_flags == 0]

In [None]:
"""
Parameters that define which input variable group to run.

'allspectra' can be either 0 - treat as one spectra, or 1 - divide into red and blue passbands
'spec_slope_param' can be either 0 - run only spectra, 1 - run spectra + slopes, or 2 - run only slopes
'onlyCXI' can be either 0 - run CXI and EI slopes, or 1 - run only CXI slopes
'pca_param' can be either 0 - run nn in original data dimensions, or 1 - run nn in pca dimensions
"""

allspectra = 0 # if 0 treat as one spectra, if 1 treat as 2 blocks
specslopparam = 1 # if 0 just spectra, if 1 both spectra and slope, if 2 only slope
onlyCXI = 0 # remove EI slopes
pcaparam = 0 # if 0 nn on spectra, if 1 nn on pca

In [None]:
"""
Performing normalization and standardization of the spectra. For two-block spectra the red block is normalized to 1 at 682 nm.
"""

if allspectra == 0:
    spectradata_asone = spectradata_temp[['ast_num', 'spectra 418', 'spectra 462', 'spectra 506', 'spectra 550', 'spectra 594', 'spectra 638', 'spectra 682', 'spectra 726', 'spectra 770']]
    spectradata_asone.set_index('ast_num', drop = True, inplace=True)
    print('Computing as one spectra')
    
    mean_asone = spectradata_asone.values.mean()
    spectradata_asone_mean = spectradata_asone - mean_asone
    std_asone = spectradata_asone.values.std(ddof=1)
    spectra_all = spectradata_asone_mean / std_asone
    
    
if allspectra == 1:

    spectradata_blue = spectradata_temp[['ast_num', 'spectra 418', 'spectra 462', 'spectra 506', 'spectra 550', 'spectra 594']]
    spectradata_blue.set_index('ast_num', drop = True, inplace=True)
    spectradata_red = spectradata_temp[['ast_num', 'spectra 638', 'spectra 682', 'spectra 726', 'spectra 770']]
    spectradata_red.set_index('ast_num', drop = True, inplace=True)
    print('Computing as red and blue blocks')

    spectradata_red_norm = spectradata_red.copy()
    for index, row in spectradata_red.iterrows():
        spectradata_red_norm.loc[index] = spectradata_red.loc[index] / (row['spectra 682'])

    mean_blue = spectradata_blue.values.mean()
    mean_red = spectradata_red.values.mean()

    spectradata_blue_mean = spectradata_blue - mean_blue
    spectradata_red_mean = spectradata_red - mean_red

    std_blue = spectradata_blue.values.std(ddof=1)
    std_red = spectradata_red.values.std(ddof=1)

    spectradata_blue_stanrd = spectradata_blue_mean / std_blue
    spectradata_red_stanrd = spectradata_red_mean / std_red

    spectra_all = pd.concat([spectradata_blue_stanrd, spectradata_red_stanrd], axis = 1)

In [None]:
"""
Reading in the ellipsoid inversion photometric data.
"""

with open('/home/euvarova/Work/University/PSR/2023/EI_mcst1_new.txt', 'r') as f:
    slopefile = f.readlines()

slopes_list = []
slopes_err_list = []
slope_ast_list = []
period_list = []

    
for row in slopefile:
    slope_ast_list.append(int(row.strip().split()[0]))
    slopes_list.append(float(row.strip().split()[14]))
    slopes_err_list.append(float(row.strip().split()[15]))

    
slopes_org = pd.DataFrame()
slopes_org['ast_num'] = slope_ast_list
slopes_org['slope_ei'] = slopes_list
slopes_org['err_ei'] = slopes_err_list
slopes_org.set_index('ast_num', drop = True, inplace=True)

In [None]:
"""
Reading in the convex inversion photometric data and join it with the ellipsoid inversion data.
"""

with open('/home/euvarova/Work/University/PSR/2023/CXI_mcst1_rms_forLiisa.txt', 'r') as f:
    slopefile = f.readlines()

slopes_list = []
slopes_err_list = []
slope_ast_list = []
rms_list = []
    
for row in slopefile:
    slope_ast_list.append(int(row.strip().split()[0]))
    rms_list.append(float(row.strip().split()[1]))
    slopes_list.append(float(row.strip().split()[12]))
    slopes_err_list.append(float(row.strip().split()[13]))
    

slopes_temp = pd.DataFrame()
slopes_temp['ast_num'] = slope_ast_list 
slopes_temp['rms_cxi'] = rms_list
slopes_temp['slope_cxi'] = slopes_list
slopes_temp['err_cxi'] = slopes_err_list
slopes_temp.set_index('ast_num', drop = True, inplace=True)

slopes_org = slopes_org.join(slopes_temp, how = 'inner')
slopes_org = slopes_org[slopes_org.rms_cxi >= 0]
slopes = slopes_org.copy()

In [None]:
"""
Removing all photometric slopes MCMC sd over 0.3 and rms over 0.06
"""

slopes = slopes[slopes['err_ei'] < 0.3]
slopes = slopes[slopes['err_cxi'] < 0.3]

slopes = slopes[slopes['rms_cxi'] < 0.06]

slopes = slopes.drop(['err_ei'], axis=1)
slopes = slopes.drop(['err_cxi'], axis=1)
slopes = slopes.drop(['rms_cxi'], axis=1)

In [None]:
"""
Performing normalization and standardization for the photometric slopes.
"""

mean_slopes_ei = slopes['slope_ei'].values.mean()
mean_slopes_cxi = slopes['slope_cxi'].values.mean()

slopes_mean_ei = slopes['slope_ei'] - mean_slopes_ei
slopes_mean_cxi = slopes['slope_cxi'] - mean_slopes_cxi

std_slopes_ei = slopes['slope_ei'].values.std(ddof=1)
std_slopes_cxi = slopes['slope_cxi'].values.std(ddof=1)

slopes_stanrd_ei = slopes_mean_ei / std_slopes_ei
slopes_stanrd_cxi = slopes_mean_cxi / std_slopes_cxi

slopes_stanrd = pd.concat([slopes_stanrd_ei,slopes_stanrd_cxi], axis = 1)

In [None]:
"""
Combining the standardized spectra and the standardized slopes.
"""

spectra_slopes = spectra_all.join(slopes_stanrd, how = 'inner')

if specslopparam == 2:
    if onlyCXI == 0:
        spectra_slopes = slopes_stanrd.copy()
    elif onlyCXI == 1:
        spectra_slopes = slopes_stanrd['slope_cxi'].copy()

In [None]:
"""
Creating a taxonomy dataframe with only asteroid numbers and classes.
"""

taxonomy_temp = pd.DataFrame()
taxonomy_temp['Unified class'] = taxonomy_full['Unified class'].copy()
taxonomy_temp.index = taxonomy_full['Asteroid number']
taxonomy_temp.drop(0, inplace = True)

In [None]:
"""
Classifying those asteroids that had Tholen E, M, P classes in their 'Unified class' -column as their
respective B-dM or Bus classes. Since there are multiple B-dM and Bus taxonomy colunmns, we check all
of them. If the asteroid has not been classified by Bus or B-dM, we classify the asteroid as X. 
"""

lst = []
temp=[]

taxonomy_full_ind = taxonomy_full[['Asteroid number', 'BUS CLASS', 'S3OS2 CLASS BB', 'BUS DEMEO CLASS']].copy()
taxonomy_full_ind.set_index('Asteroid number', drop = True, inplace=True)

for cls in ['E', 'M', 'P']:
    lst.append(taxonomy_temp.loc[taxonomy_temp['Unified class'] == cls].index.to_list())
    
lst_flat = [item for sublist in lst for item in sublist]

for i in taxonomy_temp.index:
    if i in lst_flat:
        if not pd.isna(taxonomy_full_ind.loc[i]['BUS DEMEO CLASS']):
            temp.append(taxonomy_full_ind.loc[i]['BUS DEMEO CLASS'])
        elif not pd.isna(taxonomy_full_ind.loc[i]['S3OS2 CLASS BB']):
            temp.append(taxonomy_full_ind.loc[i]['S3OS2 CLASS BB'])
        elif not pd.isna(taxonomy_full_ind.loc[i]['BUS CLASS']):
            temp.append(taxonomy_full_ind.loc[i]['BUS CLASS'])    
        else:
            temp.append('X')
    else:
        temp.append(taxonomy_temp.loc[i]['Unified class'])
        
taxonomy_temp['Unified class'] = temp

In [None]:
"""
Combining subgroups of S, C and X spectra under one name to simplify the labeling
"""

temp = []
for cl in taxonomy_temp['Unified class']:
    if cl == 'Sk' or cl == 'Sr' or cl == 'Sq' or cl == 'Sa' or cl == 'Sv' or cl == 'Sl':
        temp.append('S')
    elif cl == 'Xe' or cl == 'Xc' or cl == 'Xk':
        temp.append('X')
    elif cl == 'Ch' or cl == 'Cg' or cl == 'Cb' or cl == 'Cgh' or cl == 'G' or cl == 'F' or cl == 'B':
        temp.append('C')
    else:
        temp.append(cl)
        
taxonomy_temp['Main class'] = temp

In [None]:
"""
Performing PCA. The PCA is not performed for groups with only slopes, due to their low dimensionality.
"""
if specslopparam == 0:

    model = PCA(n_components = 4)
    fit = model.fit(spectra_all)
    pc = model.fit_transform(spectra_all)

    principal_spectra_all = pd.DataFrame(data = pc, columns = ['principal component 1', 'principal component 2', 'principal component 3', 'principal component 4'], index = spectradata_temp['ast_num'])

elif specslopparam == 1:

    model = PCA(n_components = 4)
    fit = model.fit(spectra_slopes)
    pc = model.fit_transform(spectra_slopes)

    principal_spectra_slopes = pd.DataFrame(data = pc, columns = ['principal component 1', 'principal component 2', 'principal component 3', 'principal component 4'], index = spectra_slopes.index)

In [None]:
"""
Checking the explained variation and eigendata.
"""

if specslopparam == 0:
    
    print('Explained variation per principal component: {}'.format(model.explained_variance_ratio_))
    eigenvectors = model.components_
    eigendata = pd.DataFrame(eigenvectors, columns = spectra_all.columns)
    print(eigendata)

if specslopparam == 1:
    
    print('Explained variation per principal component: {}'.format(model.explained_variance_ratio_))
    eigenvectors = model.components_
    eigendata = pd.DataFrame(eigenvectors, columns = spectra_slopes.columns)
    print(eigendata)

if specslopparam == 2:
    
    if onlyCXI == 0:
        eigendata = pd.DataFrame(eigenvectors, columns = spectra_slopes.columns)
        print(eigendata)

In [None]:
"""
Plotting the photometric slope histograms to see how the taxonomic classes distribute.
"""

SCXDastlist = []
SCXDclasslist = []

for index, row in taxonomy_temp.iterrows():
    if row['Main class'] == 'S' or row['Main class'] == 'C' or row['Main class'] == 'X' or row['Main class'] == 'D':
        SCXDastlist.append(index)
        SCXDclasslist.append(row['Main class'])
        
SCXDdf = pd.DataFrame()
SCXDdf['ast'] = SCXDastlist
SCXDdf['class'] = SCXDclasslist
SCXDdf.set_index('ast', drop = True, inplace=True)

SCXDdf_slopes = SCXDdf.join(slopes_org, how = 'inner')

plt.rcParams["figure.dpi"] = 300
slopehist = SCXDdf_slopes.hist(column='slope_cxi', by = 'class', bins = 20, edgecolor='black', sharex = True)
for ax in slopehist.flatten():
    ax.set_xlabel("CXI slope value")
    ax.set_ylabel("Frequency")
plt.tight_layout()

In [None]:
"""
Join the taxonomy classes with the spectra, slopes and spectra + slopes dataframes in their original and
PCA dimensionalities.
"""
if specslopparam == 0:
    taxonomy_spectra_pca = taxonomy_temp.join(principal_spectra_all, how = 'inner')
    taxonomy_spectra = taxonomy_temp.join(spectra_all, how = 'inner')

elif specslopparam == 1:
    taxonomy_spectra_slopes_pca = taxonomy_temp.join(principal_spectra_slopes, how = 'inner')
    taxonomy_spectra_slopes = taxonomy_temp.join(spectra_slopes, how = 'inner')
    
elif specslopparam == 2:
    taxonomy_slopes = taxonomy_temp.join(spectra_slopes, how = 'inner')

In [None]:
if specslopparam == 1:

    plt.rcParams.update({'font.size': 13})

    size = 24

    #---------------------------
    #1VS2
    fig, ax=plt.subplots(dpi = 200)
    sns.scatterplot(data = taxonomy_spectra_slopes_pca, x = 'principal component 1', y = 'principal component 2', hue = 'Main class', s=size, ax=ax)
    ax.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()
    #---------------------------
    #1VS3
    fig, ax=plt.subplots(dpi = 200)
    sns.scatterplot(data = taxonomy_spectra_slopes_pca, x = 'principal component 1', y = 'principal component 3', hue = 'Main class', s=size, ax=ax)
    ax.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()
    #---------------------------
    #1VS4
    fig, ax=plt.subplots(dpi = 200)
    sns.scatterplot(data = taxonomy_spectra_slopes_pca, x = 'principal component 1', y = 'principal component 4', hue = 'Main class', s=size,ax=ax)
    ax.legend(bbox_to_anchor=(1.1, 1.05))
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))
    ax.set_ylim(-2,2)
    plt.show()
    #---------------------------
    #2VS3
    fig, ax=plt.subplots(dpi = 200)
    sns.scatterplot(data = taxonomy_spectra_slopes_pca, x = 'principal component 2', y = 'principal component 3', hue = 'Main class', s=size, ax=ax)
    ax.legend(bbox_to_anchor=(1.1, 1.05))
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))
    ax.set_ylim(-2,2)
    plt.show()
    #---------------------------
    #2VS4
    fig, ax=plt.subplots(dpi = 200)
    sns.scatterplot(data = taxonomy_spectra_slopes_pca, x = 'principal component 2', y = 'principal component 4', hue = 'Main class', s=size, ax=ax)
    ax.legend(bbox_to_anchor=(1.1, 1.05))
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))
    ax.set_ylim(-2,2)
    plt.show()
    #---------------------------
    #3VS4
    fig, ax=plt.subplots(dpi = 200)
    sns.scatterplot(data = taxonomy_spectra_slopes_pca, x = 'principal component 3', y = 'principal component 4', hue = 'Main class', s=size, ax=ax)
    ax.legend(bbox_to_anchor=(1.1, 1.05))
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))
    ax.set_ylim(-2,2)
    plt.show()
    #---------------------------

if specslopparam == 2:
    if onlyCXI == 0:

        plt.rcParams.update({'font.size': 13})

        size = 24

        fig, ax=plt.subplots(dpi = 200)
        sns.scatterplot(data = taxonomy_slopes_pca, x = 'principal component 1', y = 'principal component 2', hue = 'Main class', s=size, ax=ax)
        ax.legend(bbox_to_anchor=(1.1, 1.05))
        plt.show()

In [None]:
"""
Encode taxonomy classes as numerical data for the neural network.
"""
label_encoder = preprocessing.LabelEncoder()

if pcaparam == 0:
    if specslopparam == 0:
        taxonomy_spectra['num_class']= label_encoder.fit_transform(taxonomy_spectra['Main class'])

    elif specslopparam == 1:
        taxonomy_spectra_slopes['num_class']= label_encoder.fit_transform(taxonomy_spectra_slopes['Main class'])

    elif specslopparam == 2:
        taxonomy_slopes['num_class']= label_encoder.fit_transform(taxonomy_slopes['Main class'])
        
if pcaparam == 1:    
    if specslopparam == 0:
        taxonomy_spectra_pca['num_class']= label_encoder.fit_transform(taxonomy_spectra_pca['Main class'])

    elif specslopparam == 1:
        taxonomy_spectra_slopes_pca['num_class']= label_encoder.fit_transform(taxonomy_spectra_slopes_pca['Main class'])

In [None]:
"""
Choosing only taxonomic groups C, S, X, and class D to look at.
"""

if pcaparam == 0:
    if specslopparam == 0:
        taxonomy_spectra_SCX = taxonomy_spectra[(taxonomy_spectra['Main class'] == 'S') | (taxonomy_spectra['Main class'] == 'C') | (taxonomy_spectra['Main class'] == 'D') | (taxonomy_spectra['Main class'] == 'X')]

    elif specslopparam == 1:
        taxonomy_spectra_slopes_SCX = taxonomy_spectra_slopes[(taxonomy_spectra_slopes['Main class'] == 'S') | (taxonomy_spectra_slopes['Main class'] == 'C') | (taxonomy_spectra_slopes['Main class'] == 'D') | (taxonomy_spectra_slopes['Main class'] == 'X')]
        
    elif specslopparam == 2:
        taxonomy_slopes_SCX = taxonomy_slopes[(taxonomy_slopes['Main class'] == 'S') | (taxonomy_slopes['Main class'] == 'C') | (taxonomy_slopes['Main class'] == 'D') | (taxonomy_slopes['Main class'] == 'X')]
        
if pcaparam == 1:
    if specslopparam == 0:
        taxonomy_spectra_SCX_pca = taxonomy_spectra_pca[(taxonomy_spectra_pca['Main class'] == 'S') | (taxonomy_spectra_pca['Main class'] == 'C') | (taxonomy_spectra_pca['Main class'] == 'D') | (taxonomy_spectra_pca['Main class'] == 'X')]

    elif specslopparam == 1:
        taxonomy_spectra_slopes_pca_SCX = taxonomy_spectra_slopes_pca[(taxonomy_spectra_slopes_pca['Main class'] == 'S') | (taxonomy_spectra_slopes_pca['Main class'] == 'C') | (taxonomy_spectra_slopes_pca['Main class'] == 'D') | (taxonomy_spectra_slopes_pca['Main class'] == 'X')]
        
    elif specslopparam == 2:
        if onlyCXI == 0:
            taxonomy_slopes_pca_SCX = taxonomy_slopes_pca[(taxonomy_slopes_pca['Main class'] == 'S') | (taxonomy_slopes_pca['Main class'] == 'C') | (taxonomy_slopes_pca['Main class'] == 'D') | (taxonomy_slopes_pca['Main class'] == 'X')]

In [None]:
"""
Defining data (x), labels (y) and asteroid numbers (z) for the neural network.
"""

if pcaparam == 0:
    if specslopparam == 0:
        x = taxonomy_spectra_SCX.drop(['Unified class', 'Main class', 'num_class'], axis=1)
        y = taxonomy_spectra_SCX['num_class']
        z = taxonomy_spectra_SCX.index

    elif specslopparam == 1:
        x = taxonomy_spectra_slopes_SCX.drop(['Unified class', 'Main class', 'num_class'], axis=1)
        y = taxonomy_spectra_slopes_SCX['num_class']
        z = taxonomy_spectra_slopes_SCX.index

    elif specslopparam == 2:
        x = taxonomy_slopes_SCX.drop(['Unified class', 'Main class', 'num_class'], axis=1)
        y = taxonomy_slopes_SCX['num_class']
        z = taxonomy_slopes_SCX.index
        
if pcaparam == 1:
    if specslopparam == 0:
        x = taxonomy_spectra_SCX_pca.drop(['Unified class', 'Main class', 'num_class'], axis=1)
        y = taxonomy_spectra_SCX_pca['num_class']
        z = taxonomy_spectra_SCX_pca.index

    elif specslopparam == 1:
        x = taxonomy_spectra_slopes_pca_SCX.drop(['Unified class', 'Main class', 'num_class'], axis=1)      
        y = taxonomy_spectra_slopes_pca_SCX['num_class']
        z = taxonomy_spectra_slopes_pca_SCX.index

x = x.to_numpy()
y = y.to_numpy()
z = z.to_numpy()

In [None]:
"""
Defining number of classes for the network depending on the data group we're using.
"""

if pcaparam == 0:
    if specslopparam == 0:
        num_classes = len(taxonomy_spectra['Main class'].unique())
    elif specslopparam == 1:
        num_classes = len(taxonomy_spectra_slopes['Main class'].unique())
    elif specslopparam == 2:
        num_classes = len(taxonomy_slopes['Main class'].unique())
if pcaparam == 1:
    if specslopparam == 0:
        num_classes = len(taxonomy_spectra_pca['Main class'].unique())
    elif specslopparam == 1:
        num_classes = len(taxonomy_spectra_slopes_pca['Main class'].unique())

In [None]:
"""
Using class spectradataset to tie together data and labels for shuffling, and to allow data extraction
during the NN training.
"""

class spectradataset(Dataset):
    def __init__(self, x, y, z):
        self.x = torch.tensor(x, dtype = torch.float32)
        self.y = torch.tensor(y, dtype = torch.int64)
        self.z = torch.tensor(z, dtype = torch.int64)
        self.length = self.x.shape[0]
        
    def __getitem__(self, idx):
        item = dict()
        item["spectra"] = self.x[idx]
        item["labels"] = self.y[idx]
        item["astnum"] = self.z[idx] 
        return item
    
    def __len__(self):
        return self.length

In [None]:
"""
Defining the training, validation and test datasets and turning them into dataloaders.
"""

train_dataset = spectradataset(x, y, z)
train_len = int(len(train_dataset) * 0.8)

train_set, test_set = random_split(train_dataset, [train_len, len(train_dataset)-train_len])

val_len = int(len(train_set)*0.25)
val_set, train_set = random_split(train_set, [val_len, len(train_set)-val_len])

train_loader = DataLoader(dataset = train_set, shuffle = True, batch_size = batch_size)
val_loader = DataLoader(dataset = val_set, shuffle = False, batch_size = len(val_set))
test_loader = DataLoader(dataset = test_set, shuffle = False, batch_size = len(test_set))


In [None]:
"""
Defining a simple feed-forward neural network.
"""

class FFNN(nn.Module):
    
    def __init__(self, input_size, num_classes):
        super(FFNN, self).__init__()
        self.layer1 = nn.Linear(input_size, 32)
        self.layer2 = nn.Linear(32, num_classes)
        self.activation = nn.ReLU()

    def forward(self, x):
        y = self.layer1(x)
        y = self.activation(y)
        y = self.layer2(y)        
        return y

In [None]:
"""
Function to train and validate the NN model. After training the fuction runs the model on a test set
to get the final accuracy.
"""

def runtrainNN(train_loader, val_loader, test_loader, x, num_classes):

    input_size = x.shape[1]
    num_epochs = 250
    batch_size = 10
    learning_rate = 0.001 
    n_total_steps = len(train_loader)
    
    #--- set up ---
    model = FFNN(input_size, num_classes)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 

    acc_temp = []
    epoch_temp = []
    old_val_acc = 0

    for epoch in range(num_epochs):
        total_acc = 0

        for i, items in enumerate(train_loader):
            n_samples = 0
            n_correct = 0
            spectra = items['spectra']
            labels = items['labels']

            #--- forward pass training ---
            outputs = model(spectra)
            loss = criterion(outputs, labels)
            _, predictions = torch.max(outputs,1)
            n_samples = labels.shape[0]
            n_correct = (predictions == labels).sum().item()
            total_acc += n_correct

            #--- backward and optimize ---
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        #--- validation ---
        n_correct = 0
        n_samples = 0

        for items in val_loader:
            spectra = items['spectra']
            labels = items['labels']
            outputs = model(spectra)
            _, predictions = torch.max(outputs,1)
            n_samples += labels.shape[0]
            n_correct += (predictions == labels).sum().item()

        acc_val = 100 * (n_correct / n_samples)

        #--- forced break ---
        if acc_val < old_val_acc:
            break
        old_val_acc = acc_val
        acc_temp.append(total_acc/x.shape[0])
        epoch_temp.append(epoch)
        
    #--- test ---        
    with torch.no_grad():
        n_correct = 0
        n_samples = 0

        for items in test_loader:
            astnum = items['astnum']
            spectra = items['spectra'][astnum<20000]
            labels = items['labels'][astnum<20000]
            outputs = model(spectra)
            _, predictions = torch.max(outputs,1)
            n_samples += labels.shape[0]
            n_correct += (predictions == labels).sum().item()

        acc = 100 * (n_correct / n_samples)
        
        return acc, model, predictions

In [None]:
"""
Running the model either 50 or 100 times and printing the statistics.
"""

results = []
for i in range(100):
    result, model, predictions = runtrainNN(train_loader, val_loader, test_loader, x, num_classes)
    results.append(result) 
    
print(np.mean(results))
print(np.std(results))

In [None]:
"""
Creating the confusion matrix to appraise the model. 
"""

for i, item in enumerate(test_loader):
    X_test = item['spectra'][astnum<20000].numpy()
    y_test = item['labels'][astnum<20000].numpy()

cm = confusion_matrix(y_test, predictions)
print(classification_report(y_test, predictions))

plt.rcParams["figure.dpi"] = 300
plt.rcParams.update({'font.size': 13})

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['C', 'D', 'S', 'X'])
disp.plot(cmap = 'BuPu')

if specslopparam == 2:
    if pcaparam == 0:
        if onlyCXI == 1:
            plt.title('CXI Slopes (Group XIII)')
        else:
            plt.title('CXI and EI Slopes (Group XIV)')
    if pcaparam == 1:
        plt.title('CXI and EI Slopes, PCA')

else:
    if allspectra == 0:
        if pcaparam == 0:
            if specslopparam == 0:
                plt.title('Spectra as one block (Group I)')
            if specslopparam == 1:
                if onlyCXI == 0:
                    plt.title('CXI and EI Slopes + Spectra as one block (Group IX)')
                if onlyCXI == 1:
                    plt.title('CXI Slopes + Spectra as one block (Group V)')
        if pcaparam == 1:
            if specslopparam == 0:
                plt.title('Spectra as one block, PCA (Group II)')
            if specslopparam == 1:
                if onlyCXI == 0:
                    plt.title('CXI and EI Slopes + Spectra as one block, PCA (Group X)')
                if onlyCXI == 1:
                    plt.title('CXI Slopes + Spectra as one block, PCA (Group VI)')

    if allspectra == 1:
        if pcaparam == 0:
            if specslopparam == 0:
                plt.title('Spectra as two blocks (Group III)')
            if specslopparam == 1:
                if onlyCXI == 0:
                    plt.title('CXI and EI Slopes + Spectra as two blocks (Group XI)')
                if onlyCXI == 1:
                    plt.title('CXI Slopes + Spectra as two blocks (Group VII)')
                    
        if pcaparam == 1:
            if specslopparam == 0:
                plt.title('Spectra as two blocks, PCA (Group IV)')
            if specslopparam == 1:
                if onlyCXI == 0:
                    plt.title('CXI and EI Slopes + Spectra as two blocks, PCA (Group XII)')
                if onlyCXI == 1:
                    plt.title('CXI Slopes + Spectra as two blocks, PCA (Group VIII)')

In [None]:
"""
Defining the dataset of asteroids previously without labels to apply the classification model on them.
"""

rest_batch_size = 139

if pcaparam == 0:
    if specslopparam == 0:
        rest_x = spectra_all.drop(taxonomy_spectra.index)
        rest_astnum = rest_x.index
        X_rest = rest_x.to_numpy()

    if specslopparam == 1:
        rest_x = spectra_slopes.drop(taxonomy_spectra_slopes.index)
        rest_astnum = rest_x.index
        X_rest = rest_x.to_numpy()
        
    if specslopparam == 2:
        rest_x = spectra_slopes.drop(taxonomy_slopes.index)
        rest_astnum = rest_x.index
        X_rest = rest_x.to_numpy()
        
if pcaparam == 1:
    if specslopparam == 0:
        rest_x = principal_spectra_all.drop(taxonomy_spectra_pca.index)
        rest_astnum = rest_x.index
        X_rest = rest_x.to_numpy()

    if specslopparam == 1:
        rest_x = principal_spectra_slopes.drop(taxonomy_spectra_slopes_pca.index)
        rest_astnum = rest_x.index
        X_rest = rest_x.to_numpy()
        
def load_restset(data:float, rest_astnum, dataset_size: int = len(X_rest)):
    labels = torch.zeros((dataset_size, 1))
    spectradat = torch.tensor(data, dtype = torch.float32)
    astnum = torch.tensor([[a] for a in rest_astnum], dtype = torch.int64)
    dataset = spectradataset(spectradat, labels, astnum)
    return dataset

rest_dataset = load_restset(X_rest, rest_astnum)

rest_dataloader = DataLoader(dataset = rest_dataset, shuffle = False, batch_size = rest_batch_size)

In [None]:
"""
Predicting the taxonomic classes for asteroid with no previous labels.
"""

out = []

with torch.no_grad():
    for batch_num, items in enumerate(rest_dataloader):
        
        spectra = items['spectra']
        labels = items['labels']
        outputs = model(spectra)
        _, predictions = torch.max(outputs,1)
        
        out.append(predictions.numpy())

In [None]:
"""
Agregating the new predicted labels with existing PCA results and plotting new predicted asteroids into
PCA plots for comparison with known-label asteroids.
"""

flat_out = [item for sublist in out for item in sublist]
flat_out_class = label_encoder.inverse_transform(flat_out)

if specslopparam == 0:
    rest_x_pca = principal_spectra_all.drop(taxonomy_spectra.index)
    rest_x_pca['predicted'] = flat_out_class
    
if specslopparam == 1:
    rest_x_pca = principal_spectra_slopes.drop(taxonomy_spectra_slopes.index)
    rest_x_pca['predicted'] = flat_out_class
    
plot_pred = rest_x_pca.copy()

if specslopparam == 0:
    sns.scatterplot(data = plot_pred, x = 'principal component 1', y = 'principal component 2', hue = 'predicted', hue_order=taxonomy_spectra_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 1', y = 'principal component 3', hue = 'predicted', hue_order=taxonomy_spectra_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 2', y = 'principal component 3', hue = 'predicted', hue_order=taxonomy_spectra_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

if specslopparam == 1:
    sns.scatterplot(data = plot_pred, x = 'principal component 1', y = 'principal component 2', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 1', y = 'principal component 3', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 1', y = 'principal component 4', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 2', y = 'principal component 3', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 2', y = 'principal component 4', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

    sns.scatterplot(data = plot_pred, x = 'principal component 3', y = 'principal component 4', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()
    
if specslopparam == 2:

    sns.scatterplot(data = plot_pred, x = 'principal component 1', y = 'principal component 2', hue = 'predicted', hue_order=taxonomy_spectra_slopes_pca['Main class'].unique())
    plt.legend(bbox_to_anchor=(1.1, 1.05))
    plt.show()

In [None]:
"""
Computing statistics of predicted classes to compare with statistics of existing labels.
"""

labstats = taxonomy_spectra_slopes['Main class'].value_counts(normalize=True).mul(100)
labstats.rename('All classified', inplace = True)

labCSXstats = taxonomy_spectra_slopes_pca_SCX['Main class'].value_counts(normalize=True).mul(100)
labCSXstats.rename('NN input', inplace = True)

predstats = plot_pred['predicted'].value_counts(normalize=True).mul(100)
predstats.rename('All predicted', inplace = True)

plotstats = pd.DataFrame(labstats)
plotstats = plotstats.join(labCSXstats, how = 'outer')
plotstats = plotstats.join(predstats, how = 'outer')
plotstats=plotstats.reindex(taxonomy_spectra_slopes_pca['Main class'].unique())

In [None]:
"""
Plotting the comparison statistics.
"""

plt.rcParams["figure.dpi"] = 300
plt.rcParams.update({'font.size': 13})
plotstats.T.plot(kind='bar', stacked=True, color=sns.color_palette())
plt.ylabel('%', loc = 'top', rotation = 'horizontal')
plt.xticks(rotation=0)
plt.legend(bbox_to_anchor = (1, 1))
plt.show()