In [None]:
import seaborn as sns
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import sklearn
import imblearn
import pickle

from sklearn.cluster import DBSCAN, AgglomerativeClustering
from sklearn.metrics import confusion_matrix
from scipy.cluster.hierarchy import dendrogram
from sklearn.preprocessing import StandardScaler
from mpl_toolkits.mplot3d import Axes3D

import pacmap

import pychemauth

# Reload modules automatically if updated
%load_ext autoreload
%autoreload 2
    
# This will list all the packages you have imported and their versions for reproducibility
%load_ext watermark
%watermark -t -m -h -v --iversions

In [None]:
new_figures = False # Whether or not to override existing figures on disk
reprocess = False # Re-process data from disk or use saved results?

import matplotlib as mpl
mpl.rcParams['font.family'] = ['serif']
mpl.rcParams['font.serif'] = ['Times New Roman']
mpl.rcParams['font.size'] = 12

# Miscellaneous Functions

In [None]:
%matplotlib inline 

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import VarianceThreshold
from mpl_toolkits.mplot3d import Axes3D 
import pacmap

def relabel(name, head='../data/raw/training/'):
    """This is the way we are choosing to collect these materials into different categories."""
    raw = ' '.join(name.split(head)[1].split('_'))
    
    if '1085' in raw:
        return 'Lubricating Oil'
    elif 'steel' in raw:
        return 'Steel'
    elif 'carbon' in raw:
        return 'Carbon Powder'
    elif 'concrete' in raw:
        return 'Concrete'
    elif 'dolomitic' in raw:
        return 'Dolomitic Limestone'
    elif 'fuel' in raw:
        return 'Fuel Oil'
    elif 'glass' in raw:
        return 'Forensic Glass'
    elif 'graphite' in raw:
        return 'Graphite/Urea Mixture'
    elif '1632' in raw or '2685' in raw or '2692' in raw or '2693' in raw or '2775' in raw:
        return 'Coal and Coke'
    elif '2790' in raw or '2791' in raw or 'leaves' in raw:
        return 'Biomass'
    elif 'phosphate' in raw:
        return 'Phosphate Rock'
    elif 'titanium' in raw:
        return 'Titanium Alloy'
    elif 'zircaloy':
        return 'Zircaloy'
    else:
        raise Exception('Unrecognized: '+raw)

from matplotlib.patches import Ellipse
import matplotlib.transforms as transforms

def pca_project(X, y, ax, title=None, threshold=0.0, ellipses=False, min_d=0.0, scale=1):
    # Linear approach
    pca = PCA(n_components=2)
    vt = VarianceThreshold(threshold=threshold)
    ss = StandardScaler()
    X_proj = pca.fit_transform(ss.fit_transform(vt.fit_transform(X)))
    cycle_colors = plt.cm.turbo(np.linspace(0.01, 0.99, len(np.unique(y))))
    
    for i,class_ in enumerate(np.unique(y)):
        mask = y == class_
        ax.plot(X_proj[mask,0], X_proj[mask,1], '.', label=class_, color=cycle_colors[i], alpha=0.3)
        if ellipses:
            def eigsorted(cov):
                vals, vecs = np.linalg.eigh(cov)
                order = vals.argsort()[::-1]
                return vals[order], vecs[:,order]

            xdata = X_proj[mask, 0]
            ydata = X_proj[mask, 1]

            # Get values to build the ellipse
            cov = np.cov(xdata, ydata)
            vals, vecs = eigsorted(cov)
            theta = np.degrees(np.arctan2(*vecs[:,0][::-1]))
            w, h = 1 * 2 * np.sqrt(vals) * scale
            
            w = np.max([min_d, w]) # For visualization
            h = np.max([min_d, h]) # For visualization

            # Create the ellipse
            ell = Ellipse(xy=(np.mean(xdata), np.mean(ydata)),
                  width=w, height=h,
                  angle=theta, color='black', alpha=0.2)

            ell.set_facecolor(cycle_colors[i]) # Reference the colour for each factor (defined by lab)
            ax.add_artist(ell)
        
    ax.set_xlabel('PC 1')
    ax.set_ylabel('PC 2')
    ax.set_title(title)
    
    return pca, vt

def pacmap_project(X, y, ax, title=None, threshold=0.0, ellipses=False, min_d=0.0, scale=1):
    # Non-linear approach
    embedding = pacmap.PaCMAP(n_components=2, n_neighbors=5,
                          MN_ratio=0.5, FP_ratio=2.0,
                          distance='euclidean',
                          apply_pca=False, random_state=42)
    
    vt = VarianceThreshold(threshold=threshold)
    ss = StandardScaler()
    X_proj = embedding.fit_transform(ss.fit_transform(vt.fit_transform(X)))
    cycle_colors = plt.cm.turbo(np.linspace(0.01, 0.99, len(np.unique(y))))
    
    for i,class_ in enumerate(np.unique(y)):
        mask = y == class_
        ax.plot(X_proj[mask,0], X_proj[mask,1], 'o', label=class_, color=cycle_colors[i])
        if ellipses:
            def eigsorted(cov):
                vals, vecs = np.linalg.eigh(cov)
                order = vals.argsort()[::-1]
                return vals[order], vecs[:,order]

            xdata = X_proj[mask, 0]
            ydata = X_proj[mask, 1]

            # Get values to build the ellipse
            cov = np.cov(xdata, ydata)
            vals, vecs = eigsorted(cov)
            theta = np.degrees(np.arctan2(*vecs[:,0][::-1]))
            w, h = 2 * 2 * np.sqrt(vals) * scale
            
            w = np.max([min_d, w]) # For visualization
            h = np.max([min_d, h]) # For visualization

            # Create the ellipse
            ell = Ellipse(xy=(np.mean(xdata), np.mean(ydata)),
                  width=w, height=h,
                  angle=theta, color='black', alpha=0.2)

            ell.set_facecolor(cycle_colors[i]) # Reference the colour for each factor (defined by lab)
            ax.add_artist(ell)
            
    ax.set_xlabel('Dim 1')
    ax.set_ylabel('Dim 2')
    ax.set_title(title)
    
    return embedding, vt

def create_dendrogram(model, **kwargs):
    # Create linkage matrix and then generate info for the dendrogram

    # Create the counts of samples under each node
    counts = np.zeros(model.children_.shape[0])
    n_samples = len(model.labels_)
    for i, merge in enumerate(model.children_):
        current_count = 0
        for child_idx in merge:
            if child_idx < n_samples:
                current_count += 1  # Leaf node
            else:
                current_count += counts[child_idx - n_samples]
        counts[i] = current_count

    linkage_matrix = np.column_stack(
        [model.children_, model.distances_, counts]
    ).astype(float)
    
    # Plot the corresponding dendrogram
    dend = dendrogram(linkage_matrix, **kwargs)
    
    return dend

def smoothsegment(seg, Nsmooth=100):
    return np.concatenate([[seg[0]], np.linspace(seg[1], seg[2], Nsmooth), [seg[3]]])

def plot_dendrogram(dend, full_labels, y, gap=0.2):
    fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})

    icoord = np.array(dend['icoord']) # x coordinates of dendrogram lines
    dcoord = np.array(dend['dcoord']) # y coordinates of dendrogram lines

    # For angular, project x coordinates into [0, 2*pi]
    ang_coord = ((icoord - icoord.min())/(icoord.max() - icoord.min()))*((1-gap) + gap/2)*2*np.pi

    # For radial, project y coordinates into [0.1, 1.0] and take -log
    r_coord = -np.log(0.9*((dcoord - dcoord.min()) / (dcoord.max() - dcoord.min())) + 0.1)

    for xs, ys in zip(ang_coord, r_coord):
        xs = smoothsegment(xs)
        ys = smoothsegment(ys)
        ax.plot(xs, ys, color="silver")

    ax.spines['polar'].set_visible(False)
    ax.spines['inner'].set_visible(False)
    ax.set_xticks([])
    ax.set_yticks([])

    leaf_x = []
    exclude_x = []
    for xs, ys in zip(icoord, dcoord):
        if (ys[0] == 0.0):
            leaf_x.append(xs[0])
        if (ys[-1] == 0.0):
            leaf_x.append(xs[-1])
        if np.all(ys == 0.0):
            exclude_x.append(np.average([xs[0], xs[-1]]))
    
    exclude_bool = [l not in exclude_x for l in leaf_x] # Only False where in exclude_bool
    leaf_x = np.array(leaf_x)[exclude_bool]
    leaf_ang = ((leaf_x - icoord.min())/(icoord.max() - icoord.min()))*((1-gap) + gap/2)*2*np.pi
    leaf_sort_inds = np.argsort(leaf_ang)
    leaf_ang = leaf_ang[leaf_sort_inds]

    # If each point is a node, gives ordering of points in left-to-right traversal of leaves
    dend_labels = np.array(full_labels)[dend['leaves']]

    # Loop over classes and plot all corresponding points in same color
    unique_y = np.unique(y)
    class_colors = plt.cm.turbo(np.linspace(0.0, 1.0, len(unique_y)))
    for i, class_ in enumerate(unique_y):
        this_x = leaf_ang[[(class_ in dl) for dl in dend_labels]]
        ax.plot(this_x, r_coord.max()*np.ones_like(this_x),
                '.', label=class_, color=class_colors[i], markersize=5.0)

    ax.legend(loc='upper left', bbox_to_anchor=(1.0, 1.0))
    fig.tight_layout()

def make_dend(X, y, files):
    agg_model = AgglomerativeClustering(n_clusters=None, distance_threshold=0.0, affinity='euclidean', linkage='single')
    agg_clust = agg_model.fit(X)
    full_labels = [y[i]+'/'+files[i] for i in range(len(y))]
    dend = create_dendrogram(agg_model,
                         no_plot=True,
                         color_threshold=0,
                         truncate_mode=None,
                         labels=full_labels)
    plot_dendrogram(dend, full_labels, y, gap=0.2)
    
def plot_importance(gs, aligned_centers, dim, return_max=1, ax=None):
    scalings = np.abs(gs.best_estimator_.named_steps['dim_red'].components_[dim])
    if 'variance_threshold' in gs.best_estimator_.named_steps:
        peaks = gs.best_estimator_.named_steps['variance_threshold'].inverse_transform(scalings.reshape(1,-1))[0]
    else:
        peaks = scalings
    if ax is None:
        fig = plt.figure()
        ax = fig.gca()

    ax.plot(
        aligned_centers,
        peaks,
    )
    
    return peaks[np.argsort(peaks)[::-1]][:return_max], aligned_centers[np.argsort(peaks)[::-1]][:return_max]

def visualize_spectra(df, aligned_centers, chosen_cutoff=None):
    for label in df['label'].unique():
        plt.figure(figsize=(16,4))
        y_ = df[df['label'] == label].drop(['label', 'file_names'], axis=1).values
        _ = [plt.plot(aligned_centers, a, '.', alpha=0.4) 
             for a in y_]
        plt.yscale('log')
        plt.xlabel('Energy (keV)')
        plt.ylabel('Spectra')
        plt.title('Original label: '+label+' ; New label: '+relabel(label))
        plt.axvline(chosen_cutoff, color='r')
        
def plot_hard_decision_boundaries_2d(X_proj, y, clf, resolution=0.02, bounds=5):
    enc = LabelEncoder()
    enc.fit(y)
    
    cmap = plt.cm.tab20
    
    x1_min, x1_max = X_proj[:,0].min()-bounds*resolution, X_proj[:,0].max()+bounds*resolution
    x2_min, x2_max = X_proj[:,1].min()-bounds*resolution, X_proj[:,1].max()+bounds*resolution
    
    xg1, xg2 = np.meshgrid(
        np.arange(x1_min, x1_max, resolution),
        np.arange(x2_min, x2_max, resolution),
    )
    
    Z = enc.transform(clf.predict(np.array([xg1.ravel(), xg2.ravel()]).T))
    Z = Z.reshape(xg1.shape)
    plt.contourf(xg1, xg2, Z, alpha=0.4, levels=len(np.unique(y))+1, cmap='gray')#, cmap=plt.cm.tab20)
    
    for i,class_ in enumerate(np.unique(y)):
        mask = y == class_
        color = cmap(i)
        plt.plot(X_proj[mask,0], X_proj[mask,1], '.', label=class_, color=color)

# Load Data

In [None]:
head = '../../data/raw/training/'

## Either Load from Disk

In [None]:
if not reprocess:
    import pickle
    X_use = pickle.load(open('../../data/raw/X_use.pkl', 'rb'))
    y_use = pickle.load(open('../../data/raw/y_use.pkl', 'rb'))
    files_use = pickle.load(open('../../data/raw/files_use.pkl', 'rb'))
    aligned_centers = pickle.load(open('../../data/raw/aligned_centers.pkl', 'rb'))

    X_challenge = pickle.load(open('../../data/raw/X_challenge.pkl', 'rb'))
    y_challenge = pickle.load(open('../../data/raw/y_challenge.pkl', 'rb'))
    files_challenge = pickle.load(open('../../data/raw/files_challenge.pkl', 'rb'))
    
    import matplotlib.pyplot as plt
    plt.figure(figsize=(4,3))
    for i, class_ in zip([0, 19, 0], ['Titanium Alloy', 'Coal and Coke', 'Concrete']):
        plt.plot(aligned_centers, X_use[y_use == class_][i], alpha=0.65, label=class_) 
    plt.yscale('log')
    plt.xlabel('Energy (keV)')
    plt.ylabel('Normalized Counts')
    plt.legend(loc='best')
    plt.axvline(7650, color='r', alpha=0.4)

    if new_figures:
        plt.savefig('example_spectra.png', bbox_inches='tight', dpi=300)

## Or Re-process Raw Data

In [None]:
import sys
sys.path.append('../../code/')
import load_data, os

In [None]:
if reprocess:
    # 1. Read raw data
    spectra = []
    energies = []
    y = []
    file_names = []

    for root, subdirectories, files in os.walk(head):
        for file in files:
            if 'SPE' not in file:
                continue
            raw, bins = load_data.read_spe(os.path.join(root, file), 
                                           coarsen=1, # Coarsen after alignment
                                           convert=True, # Convert from bin # to energy (keV)
                                           annihilation=False, # Leave annihilation peak
                                           normalize=None, # No normalization for now
                                          )
            spectra.append(raw)
            energies.append(bins)
            assert(len(subdirectories) == 0)
            y.append(root.split(head+'/')[-1])
            file_names.append(file)
    spectra = np.array(spectra)
    energies = np.array(energies)
    y = np.array(y)
    file_names = np.array(file_names)

    # 2. Align energy bins - coarsen and drop the first 40 bins
    # Alignment also uses first/last values in spectra to extrapolate instead of to 0
    aligned_spectra, aligned_centers = load_data.align_spectra(spectra, energies, coarsen=4, drop=40)

    # 3. Create DataFrame
    df = pd.DataFrame(data=aligned_spectra)
    df['label'] = y
    df['file_names'] = file_names
    
    # Visualize - observe that not all data spans the same range of energies
    visualize_spectra(df, aligned_centers, chosen_cutoff=7650)

In [None]:
if reprocess:
    # Now let's empirically normalize the spectra after coarsening and dropping
    df = pd.DataFrame(
        data=(aligned_spectra.T / np.sum(aligned_spectra, axis=1)).T
    )
    df['label'] = y
    df['file_names'] = file_names
    
    visualize_spectra(df, aligned_centers, chosen_cutoff=7650)

In [None]:
if reprocess:
    # Give categories our chosen names
    df['pretty label'] = df.label.apply(lambda x:relabel(x))

    plt.figure(figsize=(4,3))
    sns.histplot(df.sort_values(by='pretty label'), x='pretty label')
    _ = plt.xticks(rotation=90)
    plt.xlabel('')
    plt.ylabel('Number of Samples')

    # Only train on categories with at least 10 observations - rest will be in challenge set
    min_obs = 10
    plt.axhline(min_obs, color='red')

    if new_figures:
        plt.savefig('counts.png', dpi=300, bbox_inches='tight')

In [None]:
if reprocess:
    cats, counts = np.unique(df['pretty label'], return_counts=True)
    training_cats = dict([ca,co] for ca,co in zip(cats, counts) if co > min_obs)
    training_cats

In [None]:
if reprocess:
    X = df.drop(['label', 'file_names', 'pretty label'], axis=1).values
    y = df['pretty label'].values
    columns = df.drop(['label', 'file_names', 'pretty label'], axis=1).columns
    files = df['file_names'].values
    df['pretty label'].value_counts()

In [None]:
if reprocess:
    mask = np.array([True if c in training_cats else False for c in y])

    X_use = X[mask,:]
    y_use = y[mask]
    files_use = files[mask]

    X_challenge = X[~mask,:]
    y_challenge = y[~mask]
    files_challenge = files[~mask]

In [None]:
# if reprocess:
#     pickle.dump(X_use, open('../../data/raw/X_use.pkl', 'wb'))
#     pickle.dump(y_use, open('../../data/raw/y_use.pkl', 'wb'))
#     pickle.dump(files_use, open('../../data/raw/files_use.pkl', 'wb'))
#     pickle.dump(aligned_centers, open('../../data/raw/aligned_centers.pkl', 'wb'))

#     pickle.dump(X[~mask,:], open('../../data/raw/X_challenge.pkl', 'wb'))
#     pickle.dump(y[~mask], open('../../data/raw/y_challenge.pkl', 'wb'))
#     pickle.dump(files[~mask], open('../../data/raw/files_challenge.pkl', 'wb'))

# Dimensionality Reduction by Variance Thresholding

In [None]:
plt.figure(figsize=(4,3))
plt.plot(aligned_centers, np.std(X_use, axis=0)**2)
plt.yscale('log')
plt.xlabel('Energy (keV)')
plt.ylabel(r'$\sigma^2$')

chosen_cutoff = 7650

plt.axvline(chosen_cutoff, color='r', alpha=0.25)

if new_figures:
    plt.savefig('variance.png', bbox_inches='tight', dpi=300)

In [None]:
bin_cutoff = np.argmin((aligned_centers - chosen_cutoff)**2)
aligned_centers[bin_cutoff]

In [None]:
# Consider PCA and PaCMAP when using thresholding to eliminate high energy, low variance bins ("automatically")
thresholds = [0.0, 1.0e-12, 1.0e-10, 1.0e-8, 1.0e-6, 1.0e-4]

fig, axes = plt.subplots(nrows=2, ncols=len(thresholds), figsize=(25,10))

pca_proj = []
pacmap_proj = []
for i,thresh in enumerate(thresholds):
    pca_, vt_ = pca_project(X_use, y_use, axes[0][i], threshold=thresh, title="")
    axes[0][i].set_title('PCA with T={} ({} bins)'.format(thresh, np.sum(vt_.get_support())))
    pca_proj.append([pca_, vt_])
    
    pmp_, vt1_ = pacmap_project(X_use, y_use, axes[1][i], threshold=thresh, title='PaCMAP with T={}'.format(thresh))
    axes[1][i].set_title('PaCMAP with T={} ({} bins)'.format(thresh, np.sum(vt1_.get_support())))
    pacmap_proj.append([pmp_, vt1_])
    
axes[0][-1].legend(loc='best', fontsize=8)
plt.tight_layout()

In [None]:
fig, axes = plt.subplots(nrows=6, ncols=3, figsize=(30,18))

for i, (ax, vt, pca) in enumerate(zip(axes, [x[1] for x in pca_proj], [x[0] for x in pca_proj])):
    ax[0].plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[0]).reshape(1,-1))[0],
        color='C0'
    )
    ax[0].set_xlabel('Energy (keV)', fontsize=20)
    ax[0].set_ylabel('|Coeff.| in PC1', fontsize=20)
    ax[0].tick_params(axis='both', which='major', labelsize=20)
    
    ax[1].plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[1]).reshape(1,-1))[0],
        color='C1'
    )
    ax[1].set_xlabel('Energy (keV)', fontsize=20)
    ax[1].set_ylabel('|Coeff.| in PC2', fontsize=20)
    ax[1].tick_params(axis='both', which='major', labelsize=20)
    
    ax[2].plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[0]).reshape(1,-1))[0],
        color='C0',
        label='PC 1',
        alpha=0.4
    )
    ax[2].plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[1]).reshape(1,-1))[0],
        color='C1',
        label='PC 2',
        alpha=0.4
    )
    ax[2].set_xlabel('Energy (keV)', fontsize=20)
    ax[2].set_ylabel('|Coeff.|', fontsize=20)
    ax[2].legend(loc=0, fontsize=20)
    ax[2].tick_params(axis='both', which='major', labelsize=20)
    
    ax[0].set_title('Threshold = {} ({} bins)'.format(thresholds[i], np.sum(vt.get_support())), fontsize=28)
    ax[1].set_title('Threshold = {} ({} bins)'.format(thresholds[i], np.sum(vt.get_support())), fontsize=28)
    ax[2].set_title('Threshold = {} ({} bins)'.format(thresholds[i], np.sum(vt.get_support())), fontsize=28)
    
    ax[2].axvline(chosen_cutoff, color='k')
plt.tight_layout()

if new_figures:
    plt.savefig('thresholding.png', dpi=900, bbox_inches='tight')

In [None]:
# Thresholding of about 1e-8 to 1.e-10 is necessary to "cutoff" the high energy parts of the spectra we may not trust
# since they are not present for all samples in the dataset.

In [None]:
mpl.rcParams['font.size'] = 14

def change_axis_color(ax, color):
    ax.spines['bottom'].set_color(color)
    ax.spines['top'].set_color(color) 
    ax.spines['right'].set_color(color)
    ax.spines['left'].set_color(color)
    
fig = plt.figure(constrained_layout=False, facecolor='white', figsize=(18,8))
gs = fig.add_gridspec(nrows=5, ncols=4, left=0.05, right=0.75,
                      hspace=1.2, wspace=0.3)

idx = 0
ax0 = fig.add_subplot(gs[0, :2])

vt, pca = pca_proj[idx][1], pca_proj[idx][0]
ax0.plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[0]).reshape(1,-1))[0],
        color='C0',
        label='PC 1',
        alpha=0.4
    )
ax0.plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[1]).reshape(1,-1))[0],
        color='C1',
        label='PC 2',
        alpha=0.4
    )
ax0.set_xlabel('Energy (keV)')
ax0.set_ylabel('|Coeff.|')
ax0.legend(loc=0, fontsize=12)
ax0.set_title('Threshold={} ({} bins)'.format(int(thresholds[idx]), np.sum(vt.get_support())))
ax0.axvline(chosen_cutoff, color='k')
change_axis_color(ax0, 'red')

idx = 3
ax0 = fig.add_subplot(gs[0, 2:])

vt, pca = pca_proj[idx][1], pca_proj[idx][0]
ax0.plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[0]).reshape(1,-1))[0],
        color='C0',
        label='PC 1',
        alpha=0.4
    )
ax0.plot(
        aligned_centers, 
        vt.inverse_transform(np.abs(pca.components_[1]).reshape(1,-1))[0],
        color='C1',
        label='PC 2',
        alpha=0.4
    )
ax0.set_xlabel('Energy (keV)')
ax0.set_ylabel('|Coeff.|')
ax0.legend(loc=0, fontsize=12)
ax0.set_title('Threshold={} ({} bins)'.format(thresholds[idx], np.sum(vt.get_support())))
ax0.axvline(chosen_cutoff, color='k')
change_axis_color(ax0, 'blue')

for j,(i,md_pca,md_pacmap) in enumerate(zip([0, 1, 3, 5], [15, 10, 5, 0.5], [5, 5, 5, 5])):
    ax_ = fig.add_subplot(gs[1:3, j])
    pca_, vt_ = pca_project(X_use, y_use, ax_, threshold=thresholds[i], title="", ellipses=True, min_d=md_pca, scale=2.5)
    
    
    ax_.set_title(r'$\it{T}$'+'={} '.format(
        np.format_float_scientific(thresholds[i]) if i>0 else int(thresholds[i])
    )+'({} bins)'.format(np.sum(vt_.get_support())))
    
    if j == 0:
        change_axis_color(ax_, 'red')
    elif j == 2:
        change_axis_color(ax_, 'blue')
    elif j == 3:
        ax_.legend(loc='upper right', fontsize=9)
        ax_.set_xlim((-5, 12))
        ax_.set_ylim((-4, 10))
    
    ax_ = fig.add_subplot(gs[3:, j])
    _, vt_ = pacmap_project(X_use, y_use, ax_, threshold=thresholds[i], title="", 
                            ellipses=True, min_d=md_pacmap, scale=1.25)

mpl.rcParams['font.size'] = 12
    
if new_figures:
    plt.savefig('dim_red.png', dpi=300, bbox_inches='tight')

# Clustering

In [None]:
# Agglomerative clustering to create dendrograms

# Use thresholding to remove high E bins
pca = PCA(n_components=2)
vt = VarianceThreshold(threshold=1.0e-8) # ~ the same as removing all high E bins
ss = StandardScaler()
X_proj = pca.fit_transform(ss.fit_transform(vt.fit_transform(X_use)))
make_dend(X_proj, [y_ for y_ in y_use], files_use)

# Manually trim bins at a chosen cutoff
pca = PCA(n_components=2)
ss = StandardScaler()
X_proj = pca.fit_transform(ss.fit_transform(X_use[:,:bin_cutoff]))
make_dend(X_proj, [y_ for y_ in y_use], files_use)

In [None]:
embedding = pacmap.PaCMAP(n_components=2, n_neighbors=5,
                          MN_ratio=0.5, FP_ratio=2.0,
                          distance='euclidean',
                          apply_pca=False, random_state=42)
vt = VarianceThreshold(threshold=1.0e-8) # ~ the same as removing all high E bins
ss = StandardScaler()
X_proj = embedding.fit_transform(ss.fit_transform(vt.fit_transform(X_use)))
make_dend(X_proj, [y_ for y_ in y_use], files_use)

# Manually trim instead of threshold
embedding = pacmap.PaCMAP(n_components=2, n_neighbors=5,
                          MN_ratio=0.5, FP_ratio=2.0,
                          distance='euclidean',
                          apply_pca=False, random_state=42)
X_proj = embedding.fit_transform(X_use[:,:bin_cutoff])
make_dend(X_proj, [y_ for y_ in y_use], files_use)

# Without thresholding 
embedding = pacmap.PaCMAP(n_components=2, n_neighbors=5,
                          MN_ratio=0.5, FP_ratio=2.0,
                          distance='euclidean',
                          apply_pca=False, random_state=42)
X_proj = embedding.fit_transform(X_use)
make_dend(X_proj, [y_ for y_ in y_use], files_use)

# Examine Discriminative Models

In [None]:
import sys
sys.path.append('model_performances')
from pipe_preprocessing import final_fit, preprocessing_factory

In [None]:
from sklearn.model_selection import train_test_split

# Select a stratified 80:20 split for train:test
X_tr, X_te, y_tr, y_te = train_test_split(X_use, y_use, test_size=0.2, random_state=42, stratify=y_use)

# Will do 5-fold CV to optimize hyperparameters
k_fold = 5

## Overall Performances from Nested CV

In [None]:
overall = []
methods = ['logreg', 'lda', 'qda', 'rf', 'dtree', 'hard_plsda']
pipes = ["_thresh", "_thresh_pca", "_thresh_standard", "_thresh_standard_pca", "_thresh_robust", "_thresh_robust_pca"]
for method in methods:
    results = pickle.load(open('./model_performances/'+method+'_summary.pkl', 'rb'))
    column = []
    for pipe in pipes:
        perf = results[method+pipe]
        column += [np.mean(perf), np.std(perf)]
    overall.append(column)

In [None]:
def name_methods(m):
    if m == 'logreg':
        return 'Logistic Regression'
    elif m == 'lda':
        return 'Linear Discriminant Analysis'
    elif m == 'qda':
        return 'Quadratic Discriminant Analysis'
    elif m == 'rf':
        return 'Random Forest'
    elif m == 'dtree':
        return 'Decision Tree'
    elif m == 'hard_plsda':
        return 'Hard PLS-DA'
    else:
        raise ValueError('unknown name')

In [None]:
pd.set_option("display.precision", 3)
df = pd.DataFrame(data=overall, index=[name_methods(m) for m in methods], columns=np.array([["<A> ({})".format(p), "+/-"] for p in pipes]).ravel())

df['mean'] = df[['<A> (_thresh)', '<A> (_thresh_pca)', '<A> (_thresh_standard)', '<A> (_thresh_standard_pca)', 
    '<A> (_thresh_robust)', '<A> (_thresh_robust_pca)'
   ]].mean(axis=1)

sorted_df = df.sort_values(by='mean', ascending=False).drop(['mean'], axis=1)

sorted_df

In [None]:
df['mean']

In [None]:
sorted_df.describe()

In [None]:
# RF the best, QDA the worst - but overall very similar

# Since the methods are all similar, let's look at the "simplest" pipelines that produce interpretable results.
# 1. LogReg (_thresh) as a baseline
# 2. LogReg (_thresh_standard_pca) for comparison to see "simplification" - may be able to visualize if dims is low enough
# 3. DTree (_thresh) usually very good and similar to RF but easier to intepret

## 1. LogReg (_thresh)

In [None]:
from sklearn.linear_model import LogisticRegression
import imblearn

steps, param_grid = preprocessing_factory(
    balance=False, # Use class balancing internally
    savgol=False,
    threshold=True,
    scaling=None,
    dim_red=None
)

steps += [('model',
           LogisticRegression(
               random_state=42,
               solver='lbfgs', # For speed
               max_iter=1000,
               multi_class='multinomial',
               class_weight='balanced', 
               fit_intercept=True)
          )]

param_grid.update({
    'model__penalty':['none', 'l2'],
    'model__C':np.logspace(-3, 2, 6),
})

lr1_pipe = imblearn.pipeline.Pipeline(steps=steps)

In [None]:
gs = final_fit(X_tr, y_tr, lr1_pipe, param_grid, k_fold)

In [None]:
gs.best_params_ # Minimal thresholding used

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(20,8))

# See the documentation for interpretation of the sign of these values:
# https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html
# Positive values correspond to "True" (belongs to class).
max_peaks = {}
for i, ax in enumerate(axes.flatten()):
    if i < len(gs.best_estimator_.named_steps['model'].classes_):
        # Non-dimensionalize coefficients
        coefs = gs.best_estimator_.named_steps['model'].coef_[i] * gs.best_estimator_.named_steps['variance_threshold'].transform(np.std(X_use, axis=0).reshape(1,-1))
        ax.plot(
            aligned_centers,
            gs.best_estimator_.named_steps['variance_threshold'].inverse_transform(coefs.reshape(1,-1))[0]
        )
        ax.set_xlabel('Energy (keV)', fontsize=20)
        ax.set_ylabel(r'$a_k~\times~s$', fontsize=20)
        max_peaks[gs.best_estimator_.named_steps['model'].classes_[i]] = np.argsort(np.abs(gs.best_estimator_.named_steps['variance_threshold'].inverse_transform(coefs.reshape(1,-1))[0]))[::-1]
        ax.set_title('{}'.format(gs.best_estimator_.named_steps['model'].classes_[i]), fontsize=22)
        ax.tick_params(axis='both', which='major', labelsize=18)
    else:
        ax.set_visible(False)
plt.tight_layout()

if new_figures:
    plt.savefig('lr_coef.png', dpi=300, bbox_inches='tight')

In [None]:
# The largest peak is at 2224 keV for most
# Qualitatively similar conclusion even if we do not account for the standard deviation or if we used mean for the scale instead of std. dev.
for k in max_peaks:
    print(k, aligned_centers[max_peaks[k][0]], aligned_centers[max_peaks[k][1]], aligned_centers[max_peaks[k][2]])

In [None]:
mpl.rcParams['font.size'] = 14

fig, axes = plt.subplots(nrows=3, ncols=1)

for i, ax in zip([5, 0, 9], axes.flatten()):
    if i < len(gs.best_estimator_.named_steps['model'].classes_):
        coefs = gs.best_estimator_.named_steps['model'].coef_[i] * gs.best_estimator_.named_steps['variance_threshold'].transform(np.std(X_use, axis=0).reshape(1,-1))
        ax.plot(
            aligned_centers,
            gs.best_estimator_.named_steps['variance_threshold'].inverse_transform(coefs.reshape(1,-1))[0],
            color=plt.cm.tab20(i)
        )
        ax.set_xlabel('Energy (keV)', fontsize=14)
        ax.set_ylabel(r'$a_k~\times~s$', fontsize=14)
        
        ax.set_title('{}'.format(
            gs.best_estimator_.named_steps['model'].classes_[i]),
                    fontsize=16)
        ax.tick_params(axis='both', which='major', labelsize=12)
plt.tight_layout()

mpl.rcParams['font.size'] = 12

if new_figures:
    plt.savefig('lr_importances.png', dpi=300, bbox_inches='tight')

## 2. LogReg (_thresh_standard_pca)

In [None]:
from sklearn.linear_model import LogisticRegression
import imblearn

steps, param_grid = preprocessing_factory(
    balance=False, # Use class balancing internally
    savgol=False,
    threshold=True,
    scaling='standard',
    dim_red='pca'
)

steps += [('model',
           LogisticRegression(
               random_state=42,
               solver='lbfgs', # For speed
               max_iter=1000,
               multi_class='multinomial',
               class_weight='balanced', 
               fit_intercept=True)
          )]

param_grid.update({
    'model__penalty':['none', 'l2'],
    'model__C':np.logspace(-3, 2, 6),
    'dim_red__n_components':[2], # Force 2D for easy visualization
})

lr2_pipe = imblearn.pipeline.Pipeline(steps=steps)

In [None]:
gs = final_fit(X_tr, y_tr, lr2_pipe, param_grid, k_fold)

In [None]:
gs.best_params_ 
# Also, we are using Pareto scaling which leaves behind some "scale" information

In [None]:
gs.score(X_tr, y_tr)

In [None]:
gs.score(X_te, y_te)

In [None]:
preprocess_steps = gs.best_estimator_.steps[:-1]
preprocess_pipe = imblearn.pipeline.Pipeline(steps=preprocess_steps)
X_proj = preprocess_pipe.transform(X_tr)

In [None]:
plot_hard_decision_boundaries_2d(
    preprocess_pipe.transform(X_tr), 
    y_tr, 
    gs.best_estimator_.named_steps['model'],
    resolution=0.0005,
    bounds=200
)
plt.legend(loc='best', fontsize=12)
plt.xlabel('PC 1', fontsize=16)
plt.ylabel('PC 2', fontsize=16)
plt.gca().tick_params(axis='both', which='major', labelsize=14)

if new_figures:
    plt.savefig('pca_lr.png', bbox_inches='tight', dpi=300)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(8,2))
ax.set_xlabel('Energy (keV)', fontsize=16)
ax.set_ylabel('|Coefficient|', fontsize=16)
ax.set_title('PC 1', fontsize=18)
ax.tick_params(axis='both', which='major', labelsize=14)
print(plot_importance(gs, aligned_centers, dim=0, return_max=1, ax=ax))

if new_figures:
    plt.savefig('lr_pc1.png', dpi=300, bbox_inches='tight')

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(8,2))
ax.set_xlabel('Energy (keV)', fontsize=16)
ax.set_ylabel('|Coefficient|', fontsize=16)
ax.set_title('PC 2', fontsize=18)
ax.tick_params(axis='both', which='major', labelsize=14)
print(plot_importance(gs, aligned_centers, dim=1, return_max=4, ax=ax))

if new_figures:
    plt.savefig('lr_pc2.png', dpi=300, bbox_inches='tight')

In [None]:
# These peaks from PCA are the same as from LR!

# Soft PLS-DA (Intermediate Model)

In [None]:
res = pickle.load(open('model_performances/soft_plsda_summary.pkl', 'rb'))

for k,v in res.items():
    print(k, '%.3f'%np.mean(v), '%.3f'%np.std(v))

In [None]:
def rename(name):
    parts = name.split('_')
    model_ = parts[0].capitalize()+' PLS-DA'
    pre_ = ''
    for i in range(2, len(parts)):
        add_ = parts[i]
        if add_ == 'standard':
            add_ = 'Standard Scaling'
        elif add_ == 'robust':
            add_ = 'Robust Scaling'
        elif add_ == 'thresh':
            add_ = 'Thresholding'
        elif add_ == 'pca':
            add_ = 'PCA'
        pre_ += add_ + ' : '
    pre_ += model_
    return pre_

In [None]:
renamed_res = {}
for k in res.keys():
    renamed_res[rename(k)] = res[k]

In [None]:
# These differences are not significant
from pychemauth import analysis
from pychemauth.analysis import compare
ax_ = compare.Compare.visualize(renamed_res, n_repeats=5, alpha=0.05)
ax_.set_title(r'TEFF +/- $\sigma$', fontsize=16)
ax_.legend(bbox_to_anchor=(1,-0.1), fontsize=12)
ax_.tick_params(axis='both', which='major', labelsize=14)

In [None]:
# No statistical difference so lets select some simple ones to examine
# 1. _thresh (no preprocessing)
# 2. _thresh_pca (best performer)

## 1. PLS-DA (_thresh)

In [None]:
import imblearn
from pychemauth.classifier.plsda import PLSDA

steps, param_grid = preprocessing_factory(
    balance=True,
    savgol=False,
    threshold=True,
    scaling=None,
    dim_red=None
)

steps += [('model', 
           PLSDA(
               gamma=0.01, 
               not_assigned='UNKNOWN', 
               style='soft', 
               score_metric='TEFF'))
         ]

param_grid.update({
    'model__alpha':[0.01, 0.05],
    'model__scale_x':[True, False],
    'model__n_components':[1, 2, 3, 4, 5, 10, 20, 50],
})

soft_plsda_pipe = imblearn.pipeline.Pipeline(steps=steps)

In [None]:
gs = final_fit(X_tr, 
               y_tr, 
               soft_plsda_pipe, 
               param_grid, 
               k_fold)

In [None]:
gs.best_params_ 

In [None]:
gs.score(X_tr, y_tr) # TEFF

In [None]:
gs.score(X_te, y_te) # TEFF

In [None]:
df, I, CSNS, CSPS, CEFF, TSNS, TSPS, TEFF = gs.best_estimator_.named_steps['model'].figures_of_merit(
    gs.predict(X_te),
    y_te
)

In [None]:
TSPS

In [None]:
TSNS

In [None]:
df

In [None]:
perf = pd.concat([CSPS, CSNS, CEFF], axis=1)
perf.columns = ['Test CSPS', 'Test CSNS', 'Test CEFF']
perf['Training Counts'] = [dict(zip(*np.unique(y_tr, return_counts=True)))[i] for i in perf.index]
perf['Test Counts'] = [dict(zip(*np.unique(y_te, return_counts=True)))[i] for i in perf.index]

In [None]:
pd.set_option("display.precision", 3)
perf.sort_values(by='Training Counts', ascending=False)

## PLS-DA (_thresh_pca)

In [None]:
# Very similar results obtained with _thresh_standard_pca

In [None]:
import imblearn
from pychemauth.classifier.plsda import PLSDA

steps, param_grid = preprocessing_factory(
    balance=True,
    savgol=False,
    threshold=True,
    scaling=None, # 'standard',
    dim_red='pca' # PCA automatically centers the data but does not do additional scaling
)

steps += [('model', 
           PLSDA(
               gamma=0.01, 
               not_assigned='UNKNOWN', 
               style='soft', 
               score_metric='TEFF'))
         ]

param_grid.update({
    'model__alpha':[0.01, 0.05],
    'model__scale_x':[True, False],
    'model__n_components':[1, 2, 3, 4, 5, 10, 20, 50],

})

soft_plsda_pipe = imblearn.pipeline.Pipeline(steps=steps)

In [None]:
gs = final_fit(X_tr, y_tr, soft_plsda_pipe, param_grid, k_fold)

In [None]:
gs.best_params_ 

In [None]:
gs

In [None]:
gs.score(X_tr, y_tr) # TEFF

In [None]:
gs.score(X_te, y_te) # TEFF

In [None]:
df, I, CSNS, CSPS, CEFF, TSNS, TSPS, TEFF = gs.best_estimator_.named_steps['model'].figures_of_merit(
    gs.predict(X_te),
    y_te
)

In [None]:
df

In [None]:
perf3 = pd.concat([CSPS, CSNS, CEFF], axis=1)
perf3.columns = ['Test CSPS', 'Test CSNS', 'Test CEFF']
perf3['Training Counts'] = [dict(zip(*np.unique(y_tr, return_counts=True)))[i] for i in perf3.index]

In [None]:
perf3.sort_values(by='Training Counts', ascending=False)

# SIMCA (Class Models)

In [None]:
def plot_simca(simca_models, target_class, X_tr, y_tr, ax=None, no_legend=True):
    preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models[target_class].best_estimator_.steps[:-2]) # ScaledSMOTEENN should pass through anyway
    
    ax = simca_models[target_class].best_estimator_.named_steps['model'].model.visualize(
        preprocess_pipe.transform(X_tr), 
        y_tr,
        ax=ax
    )
    
    ax.set_title('DD-SIMCA Model for \n{}'.format(target_class))
    if no_legend:
        ax.get_legend().remove()

## Compliant, Non-robust Models

In [None]:
res = pickle.load(open('model_performances/simca_compliant_summary.pkl', 'rb'))

for k,v in res.items():
    print(k, '%.3f'%np.mean(v), '%.3f'%np.std(v))

In [None]:
from pychemauth.classifier.simca import SIMCA_Classifier
import imblearn
import tqdm 

simca_models_nr = {}
for i, target_class in tqdm.tqdm(enumerate(sorted(np.unique(y_use)))):
    steps, param_grid = preprocessing_factory(
        balance=True,
        savgol=False,
        threshold=True,
        scaling=None,
        dim_red=None
    )

    steps += [('model', 
               SIMCA_Classifier(
                   target_class=target_class, 
                   style="dd-simca", 
                   use="compliant",
                   robust=None,
                   sft=False)
              )]
    
    param_grid.update({
        'model__scale_x':[True, False],
        'model__n_components':np.arange(1, 10+1),
        'model__alpha':[0.01, 0.05],
        'variance_threshold__threshold':[1.0e-8, 1.0e-6, 1.0e-4] # Force this to be high enough to essentially ignore high E bins
    })
    
    simca_pipe = imblearn.pipeline.Pipeline(steps=steps)
    simca_models_nr[target_class] = final_fit(X_tr, y_tr, simca_pipe, param_grid, k_fold)

In [None]:
# These performance metrics do not change, even when lower thresholds are allowed!
simca_metrics_nr = {}
display = []
for target_class in simca_models_nr.keys():
    preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models_nr[target_class].best_estimator_.steps[:-2]) # ScaledSMOTEENN should pass through anyway
    
    simca_metrics_nr[target_class] = simca_models_nr[target_class].best_estimator_.named_steps['model'].metrics(
        preprocess_pipe.transform(X_te), 
        y_te
    )
    
    print(target_class, simca_models_nr[target_class].best_params_['model__alpha'])
    
    display.append([
        target_class, 
        simca_metrics_nr[target_class]['tsns'], 
        simca_metrics_nr[target_class]['tsps'], 
        simca_metrics_nr[target_class]['teff'], 
        simca_models_nr[target_class].best_params_['model__n_components'], 
        simca_models_nr[target_class].best_params_['variance_threshold__threshold'], 
        simca_models_nr[target_class].best_params_['model__alpha']
    ])
simca_perf_nr = pd.DataFrame(data=display, columns=['Target class', 'Test TSNS', 'Test TSPS', 'Test TEFF', 'n_components', 'threshold', 'alpha'])

In [None]:
pd.set_option("display.precision", 3)
simca_perf_nr['Target class'] = [x_ for x_ in simca_perf_nr['Target class']]
simca_perf_nr

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(15,10))
for ax, target_class in zip(axes.flatten(), simca_models_nr.keys()):
    plot_simca(simca_models_nr, target_class, X_tr, y_tr, ax=ax)
    ax.tick_params(axis='both', which='major', labelsize=16)
    ax.set_title(ax.get_title(), fontsize=18)
    ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=16)
    ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=16)

axes.flatten()[-1].set_visible(False)
axes.flatten()[-2].set_visible(False)
plt.tight_layout()

if new_figures:
    plt.savefig('simca_results_nr.png', dpi=300, bbox_inches='tight')

## Rigorous, Non-robust Models

In [None]:
res = pickle.load(open('model_performances/simca_rigorous_summary.pkl', 'rb'))

for k,v in res.items():
    print(k, '%.3f'%np.mean(v), '%.3f'%np.std(v))

In [None]:
# Rigorous models seek to get TSNS ~ 1-alpha, and the score is reported as -(TSNS - (1-alpha))^2
# These are much "worse", but remember that alpha was fixed to 0.05 whereas the others optimized to get to 0.01 instead.

res = pickle.load(open('model_performances/simca_rigorous_summary.pkl', 'rb'))

for k,v in res.items():
    print(k, '%.3f'%np.mean(v), '%.3f'%np.std(v), '; <deviation> from 0.95 = {}'.format('%.3f'%np.sqrt(-np.mean(v))))

In [None]:
from pychemauth.classifier.simca import SIMCA_Classifier
import imblearn
import tqdm 

simca_models_rig = {}
for i, target_class in tqdm.tqdm(enumerate(sorted(np.unique(y_use)))):
    steps, param_grid = preprocessing_factory(
        balance=True,
        savgol=False,
        threshold=True,
        scaling=None,
        dim_red=None
    )

    steps += [('model', 
               SIMCA_Classifier(
                   target_class=target_class, 
                   style="dd-simca", 
                   use="rigorous",
                   robust=None, # It was better to use classical estimators instead of robust ones
                   alpha=0.01, # 0.01 was universally selected by the compliant models above, so use this value for fairer comparison
                   sft=False)
              )]
    
    param_grid.update({
        'model__scale_x':[True, False],
        'model__n_components':np.arange(1, 10+1),
        'variance_threshold__threshold':[1.0e-8, 1.0e-6, 1.0e-4] # Force this to be high enough to essentially ignore high E bins - also same as compliant models above
    })
    
    simca_pipe = imblearn.pipeline.Pipeline(steps=steps)
    try:
        simca_models_rig[target_class] = final_fit(
            X_tr,  
            y_tr, 
            simca_pipe, param_grid, k_fold)
    except:
        print('1.0e-4 bad for : '+target_class)
        param_grid.update({
            'variance_threshold__threshold':[1.0e-8, 1.0e-6] # Force this to be high enough to essentially ignore high E bins - also same as compliant models above
        })
        
        simca_models_rig[target_class] = final_fit(
            X_tr,  
            y_tr, 
            simca_pipe, param_grid, k_fold)
    

In [None]:
simca_metrics_rig = {}
display = []
for target_class in simca_models_rig.keys():
    preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models_rig[target_class].best_estimator_.steps[:-2]) # ScaledSMOTEENN should pass through anyway
    
    simca_metrics_rig[target_class] = simca_models_rig[target_class].best_estimator_.named_steps['model'].metrics(
        preprocess_pipe.transform(X_te), 
        y_te
    )
    
    display.append([target_class, 
                    simca_metrics_rig[target_class]['tsns'], 
                    simca_metrics_rig[target_class]['tsps'], 
                    simca_metrics_rig[target_class]['teff'], 
                    simca_models_rig[target_class].best_params_['model__n_components'], 
                    simca_models_rig[target_class].best_params_['variance_threshold__threshold']
                   ])
                    
simca_df = pd.DataFrame(data=display, columns=['Target class', 'Test TSNS', 'Test TSPS', 'Test TEFF', 'n_components', 
                                    'threshold'
                                   ])
simca_df['Training Counts'] = [dict(zip(*np.unique(y_tr, return_counts=True)))[i] for i in simca_df['Target class']]

simca_df

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(15,10))
for ax, target_class in zip(axes.flatten(), simca_models_rig.keys()):
    plot_simca(simca_models_rig, target_class, X_tr, y_tr, ax=ax)
    ax.tick_params(axis='both', which='major', labelsize=16)
    ax.set_title(ax.get_title(), fontsize=18)
    ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=16)
    ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=16)
axes.flatten()[-1].set_visible(False)
axes.flatten()[-2].set_visible(False)

plt.tight_layout()

if new_figures:
    plt.savefig('simca_results_rig.png', dpi=300, bbox_inches='tight')

In [None]:
# Let's look at PC 1 for these models
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(10,10))

for ax, target_class in zip(axes.flatten(), simca_models_rig.keys()):
    ax.set_title(target_class)

    scale = simca_models_rig[target_class].best_estimator_.named_steps['model'].model._DDSIMCA_Model__ss_._CorrectedScaler__std_
    loadings = simca_models_rig[target_class].best_estimator_.named_steps['model'].model._DDSIMCA_Model__pca_.components_[0]
    if simca_models_rig[target_class].best_params_['model__scale_x']:
        loadings *= scale # If X is divided by std, multiply back to make a fair comparison with other models
        add=r'$~\times~\sigma$'
    else:
        add=''
    y_ = simca_models_rig[target_class].best_estimator_.named_steps['variance_threshold'].inverse_transform(loadings.reshape(1,-1))[0]
    
    ax.plot(aligned_centers, y_)
    ax.ticklabel_format(useOffset=False, useMathText=True, scilimits=(0,0), axis='y')
    
    ax.set_xlabel('Energy (keV)')

plt.tight_layout()

In [None]:
fig = plt.figure(constrained_layout=False, facecolor='white', figsize=(18,8))
gs = fig.add_gridspec(nrows=4, ncols=4, left=0.05, right=0.75,
                      hspace=0.9, wspace=0.3)

for i,target_class in enumerate(
    ['Fuel Oil', 'Concrete', 'Forensic Glass', 'Titanium Alloy']
):
    ax = fig.add_subplot(gs[i, :2])
    
    ax.set_title('PC 1 for '+target_class, fontsize=18)

    scale = simca_models_rig[target_class].best_estimator_.named_steps['model'].model._DDSIMCA_Model__ss_._CorrectedScaler__std_
    scalings = np.abs(simca_models_rig[target_class].best_estimator_.named_steps['model'].model._DDSIMCA_Model__pca_.components_[0])
    if simca_models_rig[target_class].best_params_['model__scale_x']:
        scalings *= scale # If X is divided by std, multiply back to make a fair comparison with other models
        add=r'$~\times~s$'
    else:
        add=''
    y_ = simca_models_rig[target_class].best_estimator_.named_steps['variance_threshold'].inverse_transform(scalings.reshape(1,-1))[0]
    
    print(target_class, aligned_centers[np.argsort(y_)[::-1][0]], aligned_centers[np.argsort(y_)[::-1][1]])
    
    ax.plot(aligned_centers, y_)
    ax.ticklabel_format(useOffset=False, useMathText=True, scilimits=(0,0), axis='y')
    
    ax.set_xlabel('Energy (keV)', fontsize=14)
    ax.set_ylabel('|Coefficient|'+add, fontsize=14)
    ax.tick_params(axis='both', which='major', labelsize=14)

ax = fig.add_subplot(gs[:2, 2])
plot_simca(simca_models_rig, 'Fuel Oil', X_tr, y_tr, ax=ax)
ax.set_title('Rigorous DD-SIMCA \nModel for Fuel Oil', fontsize=18)
ax.set_xlabel('')
ax.tick_params(axis='both', which='major', labelsize=14)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=15)

ax = fig.add_subplot(gs[:2, 3])
plot_simca(simca_models_rig, 'Concrete', X_tr, y_tr, ax=ax)
ax.set_title('Rigorous DD-SIMCA \nModel for Concrete', fontsize=18)
ax.set_xlabel('')
ax.tick_params(axis='both', which='major', labelsize=14)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=15)

ax = fig.add_subplot(gs[2:, 2])
plot_simca(simca_models_rig, 'Forensic Glass', X_tr, y_tr, ax=ax)
ax.set_title('Rigorous DD-SIMCA \nModel for Forensic Glass', fontsize=18)
ax.tick_params(axis='both', which='major', labelsize=14)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=15)
ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=15)

ax = fig.add_subplot(gs[2:, 3])
plot_simca(simca_models_rig, 'Titanium Alloy', X_tr, y_tr, ax=ax)
ax.set_title('Rigorous DD-SIMCA \nModel for Titanium Alloy', fontsize=18)
ax.tick_params(axis='both', which='major', labelsize=14)
ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=15)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=15)

if new_figures:
    plt.savefig('ddsimca.png', dpi=300, bbox_inches='tight')

In [None]:
# Compare Coal and Coke Rigorous vs. Compliant
# Training data shown to illustrate how the models are constructed.
target_class = 'Coal and Coke'    

fig = plt.figure(constrained_layout=False, facecolor='white', figsize=(12,4))
gs = fig.add_gridspec(nrows=1, ncols=3, left=0.05, right=0.95,
                      hspace=0.9, wspace=0.3)

ax = fig.add_subplot(gs[0, 0])
preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models_nr[target_class].best_estimator_.steps[:-2]) 
simca_models_nr[target_class].best_estimator_.named_steps['model'].model.visualize(
    preprocess_pipe.transform(X_tr), 
    y_tr,
    ax=ax
)
_ = ax.set_title('Compliant DD-SIMCA \nModel for '+target_class, fontsize=18)
ax.legend(bbox_to_anchor=(1,1.03), fontsize=12)    
ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=16)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=16)
ax.tick_params(axis='both', which='major', labelsize=14)

ax = fig.add_subplot(gs[0, 2])
preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models_rig[target_class].best_estimator_.steps[:-2]) 
simca_models_rig[target_class].best_estimator_.named_steps['model'].model.visualize(
    preprocess_pipe.transform(X_tr), 
    y_tr,
    ax=ax
)
_ = ax.set_title('Rigorous DD-SIMCA \nModel for '+target_class, fontsize=18)
ax.legend(bbox_to_anchor=(1,1.03), fontsize=12)
ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=16)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=16)
ax.tick_params(axis='both', which='major', labelsize=14)

if new_figures:
    plt.savefig('rig_vs_comp_ddsimca.png', dpi=300, bbox_inches='tight')

# Testing Authentication Models against Other Classes

In [None]:
np.unique(y_challenge)

In [None]:
use_simca = simca_models_rig
# simca_models_rig (rigorous, non-robust)
# simca_models_nr (compliant, non-robust)

In [None]:
simca_challenge_metrics = {}
display = []
for target_class in use_simca.keys():
    preprocess_pipe = imblearn.pipeline.Pipeline(steps=use_simca[target_class].best_estimator_.steps[:-2]) # ScaledSMOTEENN should pass through anyway
    
    simca_challenge_metrics[target_class] = use_simca[target_class].best_estimator_.named_steps['model'].metrics(
        preprocess_pipe.transform(X_challenge), 
        y_challenge
    )
    
    display.append([target_class, 
                    simca_challenge_metrics[target_class]['tsns'], 
                    simca_challenge_metrics[target_class]['tsps'], 
                    simca_challenge_metrics[target_class]['teff'], 
                    use_simca[target_class].best_params_['model__n_components'], 
                    use_simca[target_class].best_params_['variance_threshold__threshold'], 
                   ])
df_chall = pd.DataFrame(data=display, columns=['Target class', 'Test TSNS', 'Test TSPS', 'Test TEFF', 'n_components', 'threshold'])

In [None]:
df_chall

In [None]:
# Perfect performance from both!

In [None]:
for target_class in use_simca.keys():
    preprocess_pipe = imblearn.pipeline.Pipeline(steps=use_simca[target_class].best_estimator_.steps[:-2]) 
    
    use_simca[target_class].best_estimator_.named_steps['model'].model.visualize(
        preprocess_pipe.transform(X_challenge), 
        y_challenge
    )
    plt.gca().set_title('DD-SIMCA Model for: {}'.format(target_class))

In [None]:
fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(18,9))

plot_simca(use_simca, 'Fuel Oil', X_challenge, y_challenge, ax=axes[0][0], no_legend=True)
plot_simca(use_simca, 'Concrete', X_challenge, y_challenge, ax=axes[0][1], no_legend=True)
plot_simca(use_simca, 'Forensic Glass', X_challenge, y_challenge, ax=axes[0][2], no_legend=True)
plot_simca(use_simca, 'Titanium Alloy', X_challenge, y_challenge, ax=axes[1][0], no_legend=True)
plot_simca(use_simca, 'Biomass', X_challenge, y_challenge, ax=axes[1][1], no_legend=True)
plot_simca(use_simca, 'Coal and Coke', X_challenge, y_challenge, ax=axes[1][2], no_legend=False)


for i in [0, 1]:
    for j in [0, 1, 2]:
        axes[i][j].xaxis.label.set_size(14)
        axes[i][j].yaxis.label.set_size(14)
        axes[i][j].title.set_size(14)
        axes[i][j].tick_params(axis='x', labelsize=12)
        axes[i][j].tick_params(axis='y', labelsize=12)
    
axes[0][0].legend(loc='upper left', fontsize=10)
axes[0][1].legend(loc='upper left', fontsize=10)
axes[0][2].legend(loc='upper right', fontsize=10)
axes[1][0].legend(loc='upper right', fontsize=10)
axes[1][1].legend(loc='lower right', fontsize=10)
axes[1][2].legend(loc='upper left', fontsize=10)

plt.subplots_adjust(left=0.1,
                    bottom=0.1,
                    right=0.9,
                    top=0.9,
                    wspace=0.15,
                    hspace=0.3)

In [None]:
target_class = 'Coal and Coke'    

fig = plt.figure(constrained_layout=False, facecolor='white', figsize=(12,4))
gs = fig.add_gridspec(nrows=1, ncols=3, left=0.05, right=0.95,
                      hspace=0.9, wspace=0.3)

ax = fig.add_subplot(gs[0, 0])
preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models_nr[target_class].best_estimator_.steps[:-2]) 
simca_models_nr[target_class].best_estimator_.named_steps['model'].model.visualize(
    preprocess_pipe.transform(X_challenge), 
    y_challenge,
    ax=ax
)
_ = ax.set_title('Compliant DD-SIMCA \nModel for '+target_class, fontsize=18)
ax.legend(bbox_to_anchor=(1,1.03), fontsize=14)
ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=16)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=16)
ax.tick_params(axis='both', which='major', labelsize=14)
    
ax = fig.add_subplot(gs[0, 2])
preprocess_pipe = imblearn.pipeline.Pipeline(steps=simca_models_rig[target_class].best_estimator_.steps[:-2]) 
simca_models_rig[target_class].best_estimator_.named_steps['model'].model.visualize(
    preprocess_pipe.transform(X_challenge), 
    y_challenge,
    ax=ax
)
_ = ax.set_title('Rigorous DD-SIMCA \nModel for '+target_class, fontsize=18)
ax.legend(bbox_to_anchor=(1,1.03), fontsize=14)
ax.set_xlabel('ln'+'(1 + h/h'+r'$_0$'+')', fontsize=16)
ax.set_ylabel('ln'+'(1 + q/q'+r'$_0$'+')', fontsize=16)
ax.tick_params(axis='both', which='major', labelsize=14)

if new_figures:
    plt.savefig('auth_ddsimca.png', dpi=300, bbox_inches='tight')