Library imports

In [18]:
import os
import numpy as np
import pandas as pd
import scipy.io as spio
from scipy.io import savemat
from pathlib import Path
import json
from utils import loadmat_dicts

from Toolbox_GCSR.Preprocessing import Processor
from Toolbox_GCSR.utility import eval_band, eval_band_cv
from Toolbox_GCSR.EGCSR_BS_Ranking import EGCSR_BS_Ranking

from Toolbox_bombs.runner import main
from Toolbox_bombs.utils import Arguments

#To use matlab engine. see: https://www.mathworks.com/help/matlab/matlab_external/install-the-matlab-engine-for-python.html
import matlab.engine
eng = matlab.engine.start_matlab()

eng.addpath(eng.genpath("Toolbox_SVD"));
eng.addpath(eng.genpath("Toolbox_OCF"));

Data Importation and adequation

In [19]:
#Spectral dataset must be in .mat files, for signatires matrix should be (samples, wavelengths)
#Load from .mat

#root for datasets
pwdpath=os.getcwd()
root = f"{pwdpath}\HSI_Files\\"

datasetName = "Corn20220623"

im_, gt_ = f'{datasetName}Data', f'{datasetName}Labels'
#results dir
result = f'{pwdpath}\Masters_Results\{datasetName}\\'


hsi_path = {'img_path': f'{root}{im_}.mat', 'gt_path': f'{root}{gt_}.mat'}


In [20]:
#Band selection is preformed for each iter*factor number of bands. Example: iter = 2 and factor 16, best 16 and 32 best discriminant bands will be selected. 

#The number of iterations and number of selected bands
iter = 8
factor = 4

#Two experiments were presented, most plant stress datasets have 4 clases of 25% differences in nutrition. Experiment 1 takes all, Experiment 2 takes 50% and 100% nutrition. 
#Flag for reduced dataset. 0 full dataset, 1 only classes 1 and 4.
reduced = 0

# Band selection

SVD Algorithm

In [21]:
def svd(data_m, band_num):

    #data_m = eng.double(eng.importdata(hsi_path['img_path']))

    select_svd = eng.svd_function(data_m, band_num);

    bands = np.array(select_svd)

    bands = bands.reshape(1,band_num)

    bands = bands[0]

    bands = bands - 1
    
    return bands

OCF Algorithm

In [22]:

def ocf(data_m, band_num):

    #data_m = eng.double(eng.importdata(hsi_path['img_path']))

    select_ocf = eng.ocf_trc_fdpc(data_m, band_num);

    bands = np.array(select_ocf)

    bands = bands.reshape(1,band_num)

    bands = bands[0]

    bands = bands - 1

    return bands

BOMBS Algorithm

In [None]:
RESULTS_DIR = os.path.join(
    os.getenv("Bombs_results", os.path.join("..", "..", "results")),
    "bombs_band_selection"
)

def bombs(hsi_path, bands_num):

    arguments = Arguments(
            bands_per_antibody=bands_num,
            data_path = hsi_path['img_path'],
            ref_map_path = hsi_path['gt_path'],
            dest_path=RESULTS_DIR,
            Gmax=6,
            Na=10,
            Nd=25,
            Nc=25,
            TD_size=30,
            P_init_size=100,
        )
    return main(args=arguments)

GCSR Algorithm

In [None]:
#
def EGCSR(hsi_path, bands_num):

    p = Processor()
    hsi_data, _ = p.prepare_data(hsi_path['img_path'], hsi_path['gt_path'])

    alg_Ranking = EGCSR_BS_Ranking(bands_num, regu_coef=1e2, n_neighbors=3, ro=0.8)
    alg_Ranking.predict(hsi_data)
    return alg_Ranking.band_indx


### Algorithms to be used in selection

In [None]:
#set of band selection methods
BS_functions = []
#EGCSR not used due to poor results in later classification
#BS_functions.append(EGCSR)
BS_functions.append(bombs)
BS_functions.append(svd)
BS_functions.append(ocf)

#dictionary for subsets selected from all algorithms
bands_selected = {}

### band selection process

In [None]:
#Matlab is currently creating a memory leak each time data is passed. Created here 1 time to avoid this.  
data_m = eng.double(eng.importdata(hsi_path['img_path']))

#iterates band selection algorithms
for alg in BS_functions:

    #Dictionary entry for bands selection. one numpy array will save al results for this method. 
    bands_selected[alg.__name__] = np.zeros((iter,factor*iter)).astype(int)

    print(alg.__name__ + ' START')
    for i in range(iter):
        
        band_count = (i+1)*factor

        print(f"Selecting {band_count} bands...")

        #If algorithm is imported from matlab... use data_m
        if alg.__name__ == "svd" or alg.__name__ == "ocf":
            
            alg_bands = alg(data_m, band_count)

        #To select bands for other algothms, gives data directory
        else:

            alg_bands = alg(hsi_path, band_count)

        print(f"Best {band_count} bands for {datasetName} are: {alg_bands}")

        #saves selected bands in numpy array
        bands_selected[alg.__name__][i,:len(alg_bands)] = alg_bands.astype(int)

    print(alg.__name__ + ' ENDS')

In [None]:
#WILL DELETE PREVIOUS RESULTS. CAUTION IS ADVISED

#only use if experiment changes. else, load selected bands.
#saves selected bands 
savemat(result + datasetName + "Bands.mat", bands_selected)

In [23]:
#Load saved selected bands
saved_bands = spio.loadmat(result + datasetName + "Bands.mat")
bands_selected = {}
for key in saved_bands.keys():
    if "__" not in key:
        bands_selected[key] = np.array(saved_bands[key])
        print(key, bands_selected[key].shape)


bombs (8, 32)
svd (8, 32)
ocf (8, 32)


# Classification

In [24]:
# classification and visualizarion imports
import seaborn as sn
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn import metrics

from sklearn.svm import SVC

### Classification algorithms definition

In [25]:
#Classification algorithms to use

#parameters found by grid search

#Creates RandomForest Clasiffier
RF = RandomForestClassifier(max_depth = 60, min_samples_leaf = 3, min_samples_split = 6, n_estimators = 400,random_state=0)

#Creates Svm Clasiffier
SVM = SVC(C = 2000, kernel = 'poly', degree = 5, gamma = 1, random_state=0)

#Creates Multi-Leyer perceptron classifier
MLP = MLPClassifier(hidden_layer_sizes=(500,300,100,50), activation='relu', alpha= 0.000001, solver='adam', random_state=0)

#dictionary to save classifier algorithms objects, must have .fit and a .predict methods
Classifiers = {}
Classifiers["RF"] = RF
Classifiers["SVM"] = SVM
Classifiers["MLP"] = MLP

#creates dictionaries to save results for every selected bandset
Accuracy_values = {}

for c in Classifiers:
    Accuracy_values[c] = {}

    for alg in bands_selected:
        Accuracy_values[c][alg] = []

### Data loading

In [26]:
#Data, labels and wavelength loading

Wavelength =  pd.read_csv(root + datasetName + "wavelength.txt", sep = '\s+', header= None)

Labels = spio.loadmat(hsi_path['gt_path'])
for key in Labels.keys():
    if "__" not in key:
        Labels = Labels[key]
        break
Labels = np.array(Labels)

if Labels.shape[0] < Labels.shape[1]:
    Labels = Labels.T

Data = spio.loadmat(hsi_path['img_path'])
for key in Data.keys():
    if "__" not in key:
        Data = Data[key]
        break
Data = np.array(Data)

if Data.shape[0] != Labels.shape[0]:
    Data = Data.T


In [27]:
#This section takes clases 50% and 100% of nutrition of experiment 2 flag is 1
if reduced == 1:
    datasetName += "redu50"
    Data_filtered = []
    Labels_filtered = []

    for i in range(Data.shape[0]):
        
        if key == "EGCSR":
            continue

        #ignoring middle classes to classifie with two
        if reduced == 1:

            if Labels[i] == 2 or Labels[i] == 4:
                
                Data_filtered.append(Data[i,:])
                Labels_filtered.append(Labels[i])

        else: 

            Data_filtered.append(Data[i,:])
            Labels_filtered.append(Labels[i])

    Data = np.array(Data_filtered)
    Labels = np.array(Labels_filtered)

In [28]:
print(Wavelength.shape)
print(Data.shape)
print(Labels.shape)

(1485, 1)
(2033, 1485)
(2033, 1)


### Classification over all selected band sets. 

In [29]:
#Iterates over bands sets 
for alg in bands_selected:      

    #iterate 
    for i in range(iter):

        #number of bands in current iteration
        b_count = (i+1)*factor

        #takes selected bands from dataset
        DataSelected = Data[:,bands_selected[alg][i,0:b_count]]

        print(DataSelected.shape)

        #Prints bands to process
        print("Processing: ", alg, b_count)

        # Split dataset into training set and test set
        X_train, X_test, y_train, y_test = train_test_split(DataSelected, Labels,
                                                            test_size=0.15, random_state=11, stratify = Labels)

        for C in Classifiers:
        
            Classifiers[C].fit(X_train, y_train.ravel())

            #Prediction of test set using trained model
            test_prediction = Classifiers[C].predict(X_test)

            accuracy = metrics.accuracy_score(y_test,test_prediction)

            print(accuracy, C)
            Accuracy_values[C][alg].append(accuracy)  

(2033, 4)
Processing:  bombs 4
0.46885245901639344 RF
0.4918032786885246 SVM
0.4885245901639344 MLP
(2033, 8)
Processing:  bombs 8
0.4918032786885246 RF
0.5540983606557377 SVM
0.4163934426229508 MLP
(2033, 12)
Processing:  bombs 12
0.5344262295081967 RF
0.5967213114754099 SVM
0.46557377049180326 MLP
(2033, 16)
Processing:  bombs 16
0.5606557377049181 RF
0.6786885245901639 SVM
0.5049180327868853 MLP
(2033, 20)
Processing:  bombs 20
0.580327868852459 RF
0.6721311475409836 SVM
0.5540983606557377 MLP
(2033, 24)
Processing:  bombs 24
0.5442622950819672 RF
0.6918032786885245 SVM
0.4163934426229508 MLP
(2033, 28)
Processing:  bombs 28
0.5901639344262295 RF


In [None]:
#WILL DELETE PREVIOUS RESULTS. CAUTION IS ADVISED

#only use if experiment changes. else, load accuracy.

#save accuracy values
savemat(result + datasetName + "Accuracy.mat", Accuracy_values)

In [None]:

#Load accuracy values
Accuracy = loadmat_dicts(result + datasetName + "Accuracy.mat")
Accuracy_values = {}
for key in Accuracy.keys():
    if "__" not in key:
        Accuracy_values[key] = np.array(Accuracy[key])
        print(key, Accuracy_values[key].shape)
print(Accuracy_values)

# Results Visualization

### Latex tables

In [None]:
#Gets each selection methon accuracy resutls to dictionaries

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.colheader_justify', 'center')
pd.set_option('display.precision', 3)

Dataframes_dict = {}

Dataframes_dict["Random Forest"] = pd.DataFrame( Accuracy_values['RF'].item())

print("Random Forest")
print(Dataframes_dict['Random Forest'])

Dataframes_dict["Support Vector Machine"] = pd.DataFrame( Accuracy_values['SVM'].item())

print("Support Vector Machine")
print(Dataframes_dict['Support Vector Machine'])

Dataframes_dict["Multi Layer Perceptron"] = pd.DataFrame( Accuracy_values['MLP'].item())

print("Multi Layer Perceptron")
print(Dataframes_dict['Multi Layer Perceptron'])

In [None]:
#generates latex tables of accuracies
row = []
for i in range(iter):
    row.append(str((i+1)*factor))
for key, value in Dataframes_dict.items():
    value.index = row
    print("With classifier ", key , "classification results are: \n", value.round(3).to_latex(bold_rows=True))

In [None]:
#finds maximum accuracy classification values
row = []
MaxValues = {}
MaxAccuracy = {}
MaxAccuracy["acc"] = 0
for i in range(iter):
    row.append(str((i+1)*factor))
    
for key, value in Dataframes_dict.items():
    value.index = row
    print("For Classifier "+ key)
    
    MaxValues[key] = {"algorithm" : value.columns.tolist(),"Max accuracy": value.max().tolist(),"Max bands": value.idxmax(axis=0).tolist()}

    m = max(MaxValues[key]["Max accuracy"])

    if m > MaxAccuracy["acc"]:
        MaxAccuracy["acc"] = m
        MaxIndex = MaxValues[key]["Max accuracy"].index(m)
        MaxAccuracy["algorithm"] = MaxValues[key]["algorithm"][MaxIndex]
        MaxAccuracy["band num"] = int(MaxValues[key]["Max bands"][MaxIndex])
        
#adds bands to max accuracy
MaxAccuracy["bands"] = np.sort(bands_selected[MaxAccuracy["algorithm"]][int((MaxAccuracy["band num"] / factor) - 1), 0:MaxAccuracy["band num"]])

print(MaxAccuracy)

#saves best selected bands
savemat(result + datasetName + "MaxAccuracy.mat", MaxAccuracy)

In [None]:
#Load maximum accuracy results
savedMax = loadmat_dicts(result + datasetName + "MaxAccuracy.mat")
MaxAccuracy = {}
for key in savedMax.keys():
    if "__" not in key:
        MaxAccuracy[key] = np.array(savedMax[key])
        print(key, MaxAccuracy[key])

In [None]:
#Generates latex table for maximum accuracy bands

bandsDataframe = pd.DataFrame({'Band Number': MaxAccuracy["bands"], 'Wavelength': np.reshape(Wavelength.iloc[MaxAccuracy["bands"]].values, len(MaxAccuracy["bands"])) })

#Generate latex table

bandsDataframe.to_csv(result + datasetName + "bandsAndWavelenghts.csv",index=None, sep=' ', mode='w')

print(bandsDataframe.round(3).to_latex(index=False))


### Graphics

In [None]:
#Displays selected bands over clasess average
titleFig = "Promedio clases"
#Per class samples
_, ClassCount = np.unique(Labels, return_counts=True)
print(ClassCount)

CornAlone = pd.DataFrame(Data.T)

CornMeanPerNitrogen = pd.DataFrame({"25":CornAlone.iloc[:,:ClassCount[0]].mean(axis = 1),   
                                    "50":CornAlone.iloc[:,ClassCount[0]:ClassCount.cumsum()[1]].mean(axis = 1),    
                                    "75":CornAlone.iloc[:,ClassCount.cumsum()[1]:ClassCount.cumsum()[2]].mean(axis = 1),   
                                    "100":CornAlone.iloc[:,ClassCount.cumsum()[2]:].mean(axis = 1)}) 

plt.grid(False)
plt.ylabel('Reflectancia')
plt.xlabel('Ancho de Banda (nm)')

plt.plot(Wavelength, CornMeanPerNitrogen["25"], label='25N')  
plt.plot(Wavelength, CornMeanPerNitrogen["50"], label='50N') 
plt.plot(Wavelength, CornMeanPerNitrogen["75"], label='75N') 
plt.plot(Wavelength, CornMeanPerNitrogen["100"], label='100N') 
plt.legend()
plt.title(titleFig)

#for a in range(0,20):
#    plt.axvline(x=CornData.iloc[bands_selected['bombs'][4][a]-1]['wavelengths'], color='k', linestyle='--')

plt.savefig(result + datasetName + titleFig + '.eps', format='eps')

In [None]:
#Displays selected bands over calsses average

alg = "svd"
n_bands = 20

titleFig = f"{n_bands} bandas {alg}"

#Per class samples
_, ClassCount = np.unique(Labels, return_counts=True)
print(ClassCount)

CornAlone = pd.DataFrame(Data.T)

CornMeanPerNitrogen = pd.DataFrame({"25":CornAlone.iloc[:,:ClassCount[0]].mean(axis = 1),   
                                    "50":CornAlone.iloc[:,ClassCount[0]:ClassCount.cumsum()[1]].mean(axis = 1),    
                                    "75":CornAlone.iloc[:,ClassCount.cumsum()[1]:ClassCount.cumsum()[2]].mean(axis = 1),   
                                    "100":CornAlone.iloc[:,ClassCount.cumsum()[2]:].mean(axis = 1)}) 

plt.grid(False)
plt.ylabel('Reflectancia')
plt.xlabel('Ancho de Banda (nm)')

plt.plot(Wavelength, CornMeanPerNitrogen["25"], label='25N')  
plt.plot(Wavelength, CornMeanPerNitrogen["50"], label='50N') 
plt.plot(Wavelength, CornMeanPerNitrogen["75"], label='75N') 
plt.plot(Wavelength, CornMeanPerNitrogen["100"], label='100N') 
plt.title(titleFig)

for a in range(0,n_bands):
    plt.axvline(x=Wavelength[0][bands_selected[alg][int(n_bands/factor)-1][a]], color='dimgray', linestyle='--')

plt.savefig(result + datasetName + titleFig + '.eps', format='eps')

### Confusion matrixes

In [None]:
#Confusion Matrix Experiment 1 all clases

#this section retrains specific classifier and selected bands to create a confusion matrix

#takes selected bands from dataset

n_bands = 20
alg = "svd"

DataSelected = Data[:,bands_selected[alg][int((n_bands/factor) -1),0:n_bands]]

# Split dataset into training set and test set
X_train, X_test, y_train, y_test = train_test_split(DataSelected, Labels,
                                                    test_size=0.15, random_state=11, stratify = Labels)


#training classification algorithm

experiment1 = SVM.fit(X_train, y_train.ravel())

#Prediction of test data

predictEX1 = experiment1.predict(X_test)

#Creating confusion matrix

names = ["25N", "50N", "75N" , "100N"]

matrix = metrics.confusion_matrix(y_test, predictEX1)

group_counts = ["{0:0.0f}".format(value) for value in matrix.flatten()]
group_percentages = ["{0:.2%}".format(value) for value in matrix.flatten()/np.sum(matrix)]
anotattion = [f"{v1}\n{v2}" for v1, v2 in
          zip(group_counts,group_percentages)]
anotattion = np.asarray(anotattion).reshape(4,4)

plt.figure()
immat = sn.heatmap(matrix,cmap="Blues", cbar=False, annot=anotattion, fmt='', xticklabels = names, yticklabels = names)

titleFig = "Matríz de confusión Ex1"


plt.title(titleFig)

plt.savefig(result + datasetName + titleFig + '.eps', format='eps')

print(metrics.classification_report(y_test, predictEX1, target_names = names))

In [None]:
#Confusion Matrix Experiment 2 Two clases

#this section retrains specific classifier and selected bands to create a confusion matrix

#cuts data to take only two clases
condition = (Labels == 2) | (Labels == 4)
Datadf = pd.DataFrame(Data)
Datacut = np.array(Datadf.loc[condition])
Labelsdf = pd.DataFrame(Labels)
Labelscut = np.array(Labelsdf.loc[condition])

#takes selected bands from dataset
n_bands = 28
alg = "bombs"

DataSelected = Datacut[:,bands_selected[alg][int((n_bands/factor) -1),0:n_bands]]

# Split dataset into training set and test set

X_train, X_test, y_train, y_test = train_test_split(DataSelected, Labelscut,
                                                    test_size=0.15, random_state=11, stratify = Labelscut)

#training classification algorithm

experiment2 = SVM.fit(X_train, y_train.ravel())

#Prediction of test data

predictEX2 = experiment2.predict(X_test)

#Creating confusion matrix

names = ["50N", "100N"]

matrix = metrics.confusion_matrix(y_test, predictEX2)

group_counts = ["{0:0.0f}".format(value) for value in matrix.flatten()]
group_percentages = ["{0:.2%}".format(value) for value in matrix.flatten()/np.sum(matrix)]
anotattion = [f"{v1}\n{v2}" for v1, v2 in
          zip(group_counts,group_percentages)]
anotattion = np.asarray(anotattion).reshape(2,2)

plt.figure()
immat = sn.heatmap(matrix,cmap="Blues", cbar=False, annot=anotattion, fmt='', xticklabels = names, yticklabels = names)

titleFig = "Matríz de confusión Ex2"

plt.title(titleFig)

plt.savefig(result + datasetName + titleFig + '.eps', format='eps')

print(metrics.classification_report(y_test, predictEX2, target_names = names))