In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
#%matplotlib widget

from sklearn.metrics import confusion_matrix
import seaborn as sns

import json

from iterativenn.nn_modules.MaskedLinear import MaskedLinear

In [None]:
# Intake the mass spec data csv
df = pd.read_csv('/home/kjmetzler/iterativenn/notebooks/START_HERE/4-data/mass_spec_data(in).csv',header=0)

df = df.drop('chem_name', axis=1)

#Split the data frame into two
labels = df.iloc[:,-50:]
chems = df.iloc[:,0:df.shape[1]-50]

In [None]:
#create noisy labels
def add_noise(dataframe):
    lab = dataframe.iloc[:, -50:]  # Last 50 columns
    chemicals = dataframe.iloc[:, 0:dataframe.shape[1] - 50]  # Other columns
    
    # Create noise for labels, making sure they sum to 1 across each row
    noise_labels = np.random.rand(lab.shape[0], lab.shape[1])  # Random noise between 0 and 1
    noise_labels = noise_labels / noise_labels.sum(axis=1, keepdims=True)  # Normalize to sum to 1 per row
    
    # Concatenate the chemical data with the normalized noise
    new_start = pd.concat([chemicals, pd.DataFrame(noise_labels)], axis=1, ignore_index=True)
    
    return new_start

In [None]:
# Set the size of each input vector based on the second dimension of 'x_start'.
x_size = df.shape[1]
y_size = df.shape[1]
# Determine the number of data points (rows in 'x_start').
data_size = df.shape[0]

# Define the size of blocks within the mask for each row group. 
# This is related to the 'Experimental Models' sheet in Kevin's Drive.
row_sizes = [chems.shape[1],x_size-chems.shape[1]] 

# Define the size of blocks within the mask for each column group.
col_sizes = [x_size]

# Specify the types of blocks to be used in the masked linear layer. This configuration controls the structure of the weight matrix.
# 'D' indicates a diagonal block, while 
# 'W' indicates a dense block.
# 'S' indicates a sparse block, with n trainable entries distributed randomly, 'S=...'.
#  0  indicates a zero block 
block_types = [['S=800'],['S=400']]

# Define initialization types for the blocks.
# This dictates how the weight matrices within each block are initially set up.
# 'G' indicates a Gaussian Distribution, 1 indicates the identity, 0 indicates a zero block.
initialization_types = [[1],['G']]

# Specify which blocks are trainable.
# A value of 0 indicates the block is not trainable, while a value of 1 indicates it is trainable.
trainable = [[1],[1]]

# Initialize a MaskedLinear layer with specified configurations.
chem_ml = MaskedLinear(x_size, x_size, bias=True)
chem_MaskLin = chem_ml.from_description(row_sizes=row_sizes,
                                          col_sizes=col_sizes,
                                          block_types=block_types,
                                          initialization_types=initialization_types,
                                          trainable=trainable)

# Construct the neural network using sequential layers.
# Here, 'rand_MaskLin' specifies the custom configured MaskedLinear layer, interleaved with LeakyReLU activation functions.
chem_INN = torch.nn.Sequential(chem_MaskLin, 
                               nn.LeakyReLU(),
                               chem_MaskLin,
                               nn.LeakyReLU(),
                               chem_MaskLin)

# Number of LeakyReLU activation functions used in the network.
# This is used for determining the inverse function.
num_relus = 2


In [None]:
# Turn a pandas dataframe into a pytorch tensor
def df_to_tensor(df):
    return torch.tensor(df.values, dtype=torch.float32)

# Make two pytorch tensor datasets from the start and target data
x_start_tensor = df_to_tensor(add_noise(df))
x_target_tensor = df_to_tensor(df)

# a dataloader which returns a batch of start and target data
class Data(torch.utils.data.Dataset):
    def __init__(self, x_start, x_target):
        self.x_start = x_start
        self.x_target = x_target
    def __len__(self):
        return len(self.x_start)
    def __getitem__(self, idx):
        return self.x_start[idx], self.x_target[idx]
    
train_data = Data(x_start_tensor, x_target_tensor)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=100, shuffle=True)

In [None]:
# Define the loss function and optimizer
criterion = torch.nn.BCEWithLogitsLoss()
criterion2= torch.nn.MSELoss() 
optimizer = torch.optim.Adam(chem_INN.parameters(), lr=0.0001)

In [None]:
# Set the maximum number of epochs for training.
max_epochs = 3000
# Initialize an empty list to keep track of the loss values after each epoch.
loss_graph = []

# Begin the training loop.
for epoch in range(max_epochs):
    # Iterate over batches of data in the training loader.
    for batch_idx, (start, target) in enumerate(train_loader):
        # Reset the gradients of all model parameters to zero.
        optimizer.zero_grad()
        
        # Pass the batch through the model to get predictions.
        set = chem_INN(start)
        soft_guess = torch.softmax(set[:,-50:],dim=1)
        mass_spec = set[:,:set.shape[1]-50]
        
        # Initialize the loss for the current batch.
        loss = 0
        # Calculate the loss
        loss += 10*criterion(soft_guess, target[:,-50:])
        loss += criterion2(mass_spec, target[:,:target.shape[1]-50]) #need to change the target to include the mass spec

        # Compute the gradients based on this loss.
        loss.backward()
        # Update the parameters of the model according to the optimization strategy.
        optimizer.step()
        
        # Optionally, save the current state of the model.
        state = chem_INN.state_dict()
        
    # After each epoch, record the loss for plotting.
    loss_graph.append(loss.item())
    # Optionally, print the training progress every 10 epochs.
    if epoch % 100 == 0:
        print(f'Epoch {epoch}, Batch {batch_idx}, Loss {loss.item()}')

# After training, plot the recorded loss values.
plt.figure()
plt.plot(np.arange(0, len(loss_graph)), loss_graph)
plt.semilogy()  # Use a logarithmic scale for the y-axis to better visualize changes.
plt.show()


In [None]:
# Confusion Matrix
pred_output = chem_INN(x_start_tensor)
pred_last_50 = pred_output[:, -50:]  # Last 50 elements

target_last_50 = x_target_tensor[:,-50:]

pred_classes = torch.argmax(pred_last_50, dim=1).cpu().numpy()
target_classes = torch.argmax(target_last_50, dim=1).cpu().numpy()

conf_matrix = confusion_matrix(target_classes, pred_classes)

plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='g', cmap='Blues', cbar=False)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')

# Display the plot
plt.show()