# Oxidation and Classification Model for Hyperspectral Fitting

##### Use this code block to generate a training, validation, and test dataset of synthetic Differential Reflectance Spectra (DRS). The equations for the Lorentzian shaped DRS curves were generated by performing a non-linear least squares lorentzian fitting of the original dataset from a Hyperspectral Optical Microscope (HOM) in the range of 550 - 700 nm which is right around the A-exciton peak of WS2. This was done for each ground truth oxidation time step from 0 - 15 minutes. This code block also consists of simulated noise and slight variations in the peak parameters corresponding to the confidence of the original peak fit.

##### The Convolutional Neural Network architecture that was used includes two branches, one for classification and one for regression. We also used a custom "Masked MAE" loss function to ensure that the regression loss is only calculated on pixels that were classified as material since we do not expect any change from the substrate signal with increased oxidation time.

# Training the Model

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os

In [None]:
n_wavelengths = 150
num_samples = 50000
wavelengths = np.linspace(550, 700, n_wavelengths)

# Create oxidation times (70% of samples will be material)
material_samples = int(num_samples * 0.7)
oxidation_times = np.random.uniform(0, 15, material_samples)

def lorentzian(wavelengths, amplitude, width, position):
    return amplitude * (width**2 / ((wavelengths - position)**2 + width**2))

# Generate synthetic dataset
synthetic_spectra = []
oxidation_labels = []
classification_labels = []

# Generate material spectra (70%)
for time in oxidation_times:
    amplitude = 0.435 - 0.15 * time**(0.26)
    width = 9.452 + 3.89 * time**0.26
    position = 621.06 - 16.16 * time**0.08

    # Add some random variation to parameters (±5%)
    amplitude *= np.random.uniform(0.95, 1.05)
    width *= np.random.uniform(0.95, 1.05)
    position *= np.random.uniform(0.995, 1.005)

    spectrum = lorentzian(wavelengths, amplitude, width, position)

    # Add noise
    noise = np.random.normal(0, 0.005, size=spectrum.shape)
    spectrum += noise

    synthetic_spectra.append(spectrum)
    oxidation_labels.append(time)
    classification_labels.append(1)

# Generate substrate spectra (30%)
substrate_samples = num_samples - material_samples
for _ in range(substrate_samples):
    # Generate substrate spectrum with slight random slope and offset
    base = np.random.uniform(0, 0.02)
    slope = np.random.uniform(-0.0001, 0.0001)
    spectrum = base + slope * wavelengths
    noise = np.random.normal(0, 0.002, size=wavelengths.shape)
    spectrum += noise

    synthetic_spectra.append(spectrum)
    oxidation_labels.append(-1)
    classification_labels.append(0)

synthetic_spectra = np.array(synthetic_spectra)
oxidation_labels = np.array(oxidation_labels)
classification_labels = np.array(classification_labels)

# Split the data
X_train_val, X_test, y_class_train_val, y_class_test, y_ox_train_val, y_ox_test = train_test_split(
    synthetic_spectra, classification_labels, oxidation_labels, test_size=0.2)

X_train, X_val, y_class_train, y_class_val, y_ox_train, y_ox_val = train_test_split(
    X_train_val, y_class_train_val, y_ox_train_val, test_size=0.2)

# Reshape for CNN
X_train = X_train[..., np.newaxis]
X_val = X_val[..., np.newaxis]
X_test = X_test[..., np.newaxis]

# Define model
def create_model(input_shape):
    """"
    Function for generating the Convolutional Neural Network model

    Args:
        input_shape: A tuple representing the shape of the input data
    Returns:
        model: A tf.keras.Model object
    """
    inputs = tf.keras.Input(shape=input_shape)

    # Feature extraction blocks
    x = layers.Conv1D(64, kernel_size=5, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.MaxPooling1D(2)(x)

    x = layers.Conv1D(128, kernel_size=5, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.MaxPooling1D(2)(x)

    x = layers.Conv1D(256, kernel_size=5, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.GlobalAveragePooling1D()(x)

    # Shared dense layers
    x = layers.Dense(256)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.Dropout(0.5)(x)

    # Classification branch
    class_x = layers.Dense(128)(x)
    class_x = layers.BatchNormalization()(class_x)
    class_x = layers.LeakyReLU()(class_x)
    class_x = layers.Dropout(0.3)(class_x)
    classification_output = layers.Dense(1, activation='sigmoid', name='classification')(class_x)

    # Regression branch
    reg_x = layers.Dense(128)(x)
    reg_x = layers.BatchNormalization()(reg_x)
    reg_x = layers.ReLU()(reg_x)
    reg_x = layers.Dropout(0.3)(reg_x)
    regression_output = layers.Dense(1, name='regression')(reg_x)

    return models.Model(inputs, [classification_output, regression_output])

# Model Compilation
model = create_model((n_wavelengths, 1))

# Custom loss function for regression
def masked_mae(y_true, y_pred):
    """
    Function to calculate the Masked Mean Absolute Error (Masked MAE)

    Args:
        y_true: Ground truth labels
        y_pred: Predicted labels
    Returns:
        masked_mae: MAE loss on pixels that were classified as material
    """
    mask = tf.cast(tf.not_equal(y_true, -1), tf.float32)
    return tf.reduce_sum(tf.abs(y_true - y_pred) * mask) / (tf.reduce_sum(mask) + 1e-7)

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss={'classification': 'binary_crossentropy', 'regression': masked_mae},
    metrics={'classification': 'accuracy', 'regression': masked_mae}
)

# Training
history = model.fit(X_train, {'classification': y_class_train, 'regression': y_ox_train},
    validation_data=(X_val, {'classification': y_class_val, 'regression': y_ox_val}), epochs=50, batch_size=32,
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_classification_accuracy', patience=10, restore_best_weights=True, mode='max')])

# Evaluate
model.evaluate(X_test, {'classification': y_class_test, 'regression': y_ox_test})

# Save model to 'Oxidation_Model' folder, create one if there is not one present already
try:
    model.save('Oxidation_Model/Ox_Class_Model.h5')
    print("Model saved successfully.")
except:
    print("Creating Oxidation_Model folder.")
    os.mkdir('Oxidation_Model')
    model.save('Oxidation_Model/Ox_Class_Model.h5')
    print("Model saved successfully.")

# Plot a few example spectra for sanity check
plt.figure(figsize=(6,4))
for idx in np.random.choice(range(X_train.shape[0]), size=40, replace=False):
    plt.plot(wavelengths, X_train[idx,:], alpha=0.7)
plt.xlabel('Wavelength (nm)')
plt.ylabel('Intensity')
plt.title('Example Synthetic Spectra')
plt.grid(True)
plt.show()