In [None]:
import os
import random
import torch
import torch.nn as nn
from torch.nn import utils
import torch.optim as optim
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import pathlib
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler, LabelEncoder



# Set random seed for reproducibility
manualSeed = 999
debug = True
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)
torch.use_deterministic_algorithms(True) 
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
cwd = pathlib.Path.cwd()
file_path_spectra = cwd / '..'  /  '..'  / 'datasets' / 'RiverD' / 'RamanData_pc.xlsx'
file_path_classes = cwd / '..'  /  '..'  / 'datasets' / 'RiverD' / 'RamanMetaData.xlsx'
try:
    df_spectra = pd.read_excel(file_path_spectra)
    df_classes = pd.read_excel(file_path_classes, sheet_name="class", header=None)
except FileNotFoundError:
    print("File could not be found")
except Exception as e:
    print(f"An error ocurred reading file, Exception: {e}")



In [None]:
class RamanSpectraGANDataset(Dataset):
    def __init__(self, dataframe_X, dataframe_y, transforms=None):
        self.spectra = dataframe_X
        self.targets = dataframe_y
        self.transforms = transforms

        self.spectra.columns = self.spectra.columns.astype(str)
        self.spectrum_scaler = MinMaxScaler()
        self.normalized_spectra = self.spectrum_scaler.fit_transform(self.spectra).astype(np.float32)

        self.label_encoders = []
        # Encodes the targets
        le = LabelEncoder()
        self.encoded_targets = le.fit_transform(self.targets).astype(np.int32)        

    def __len__(self):
        return len(self.normalized_spectra)

    def __getitem__(self, idx):
        return torch.tensor(self.normalized_spectra[idx]), torch.tensor(self.encoded_targets[idx])

In [None]:
def plot_spectrum(spectrum_series: pd.Series):
    
    y_values = spectrum_series    
    x_values = range(len(y_values))
    
    plt.figure(figsize=(12, 6))
    plt.plot(x_values, y_values)
    
    plt.title("Raman Spectrum")
    plt.xlabel("Feature Index (Pixel)")
    plt.ylabel("Intensity (Normalized)")
    plt.grid(True)
    plt.show()

In [None]:
def plot_samples(epoch, dim, features, generator, real_samples, real_labels, num_samples=50):
   
    generator.eval()  
    
    real_to_plot = real_samples[:num_samples, :].cpu().numpy().squeeze()
    labels = real_labels[:num_samples]
    
    with torch.no_grad():
        noise = torch.randn(num_samples, dim).to(device)
        fake_to_plot = generator(noise,labels)[:num_samples, :].cpu().numpy().squeeze()

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    fig.suptitle(f'Epoch {epoch+1} - Real vs. Generated Samples', fontsize=16)
    x_axis = np.arange(features-1)

    ax1.set_title("Real Samples")
    for i in range(num_samples):
        ax1.plot(x_axis, real_to_plot[i], alpha=0.7, label=f'Real {i+1}')
    ax1.legend()

    ax2.set_title("Generated Samples")
    for i in range(num_samples):
        ax2.plot(x_axis, fake_to_plot[i], alpha=0.7, label=f'Fake {i+1}')
    ax2.legend()

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()
    
    generator.train() 

In [None]:
def export_generated_samples(epoch, output_folder, dim, generator, labels, num_samples=50, device='cpu'):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    generator.eval()
    
    current_labels = labels[:num_samples]
    
    with torch.no_grad():
        noise = torch.randn(num_samples, dim).to(device)
        generated_data = generator(noise, current_labels).cpu().numpy().squeeze()
        
        if isinstance(current_labels, torch.Tensor):
            current_labels_np = current_labels.cpu().numpy()
            if len(current_labels_np.shape) > 1 and current_labels_np.shape[1] > 1:
                current_labels_np = np.argmax(current_labels_np, axis=1)
        else:
            current_labels_np = current_labels

    df = pd.DataFrame(generated_data)
    df.columns = [f'feat_{i}' for i in range(df.shape[1])]
    df.insert(0, 'label', current_labels_np)

    filename = f"epoch_{epoch+1:03d}_generated.csv"
    file_path = os.path.join(output_folder, filename)
    
    df.to_csv(file_path, index=False)
    print(f"Epoch {epoch+1}: Saved {num_samples} generated samples to {file_path}")

    generator.train()

In [None]:
batch_size = 128
df_classes.columns = df_spectra.columns
df_spectra = df_spectra.T
df_classes = df_classes.T
df_combined = pd.concat([df_spectra, df_classes], ignore_index=True, axis=1)
dataset_X = df_combined.iloc[:, :-1]
dataset_y = df_combined.iloc[:, -1]
dataset = RamanSpectraGANDataset(dataset_X, dataset_y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=0)

In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_channels=2, seq_len=700, embed_dim = 50, n_classes=1):
        super().__init__()
        self.seq_len = seq_len
        self.embed_dim = embed_dim
        self.embed = nn.Embedding(n_classes, embed_dim)
        self.embed_fc = utils.spectral_norm(nn.Linear(embed_dim, seq_len)) 

        self.main = nn.Sequential(
            nn.Conv1d(input_channels, 32, kernel_size=6, stride=2, padding=2),
            nn.BatchNorm1d(32),
            nn.LeakyReLU(0.1),
            nn.Dropout(0.25),
            nn.Conv1d(32, 64, kernel_size=6, stride=2, padding=2, bias=False),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.1),
            nn.Dropout(0.25),
            nn.Conv1d(64, 128, kernel_size=6, stride=2, padding=2, bias=False),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.1),
            nn.Dropout(0.25),
            nn.Conv1d(128, 256, kernel_size=6, stride=2, padding=2, bias=False),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.1),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Flatten()           
            
        )
        encoder_output_dim = 5376

        self.final = nn.Sequential(
            nn.Linear(encoder_output_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid(),
            )
        
    def forward(self, x, y):
        y_embed = self.embed(y).squeeze(1)
        y_embed = self.embed_fc(y_embed).view(-1, self.seq_len)
        combined_input = torch.cat([x, y_embed], dim=1)
        combined_input = combined_input.view(-1, 2, self.seq_len)
        x = self.main(combined_input)
        x = self.final(x)
        return x

In [None]:
class Generator(nn.Module):
    def __init__(self, input_channels=1, seq_len=700, z_dim=123, embed_dim = 5, n_classes=1):
        super().__init__()
        self.seq_len = seq_len
        self.init_seq_len = seq_len // 16
        self.embed = nn.Embedding(n_classes, embed_dim)
        self.fc = nn.Linear(z_dim + embed_dim, 256 * self.init_seq_len)
        
        self.main = nn.Sequential(
            nn.Unflatten(1, (256, self.init_seq_len)),
            nn.ConvTranspose1d(256, 128, kernel_size=6, stride=2, padding=2, output_padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.ConvTranspose1d(128, 64, kernel_size=6, stride=2, padding=2, output_padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.ConvTranspose1d(64, 32, kernel_size=6, stride=2, padding=2, output_padding=0),
            nn.BatchNorm1d(32),
            nn.ReLU(),            
            nn.ConvTranspose1d(32, input_channels, kernel_size=2, stride=2, padding=0, output_padding=0),
            nn.Sigmoid() 
        )

       

    def forward(self, x, y):
        y_embed = self.embed(y)
        combined_input = torch.cat([x, y_embed], dim=1)
        x = self.fc(combined_input)
        x = self.main(x)
        return x

In [None]:
netD = Discriminator().to(device)
netD.apply(weights_init)
netG = Generator().to(device)
netG.apply(weights_init)

In [None]:

real_label = 1
fake_label = 0
g_lr = 0.00009
d_lr = 0.00005
noise_size = 123
LOG_INTERVAL = 50    
PLOT_INTERVAL = 10    

fixed_noise = torch.randn(64, noise_size, device=device)
fixed_real_samples, fixed_real_labels = next(iter(dataloader))
fixed_real_samples = fixed_real_samples.to(device)
fixed_real_labels = fixed_real_labels.to(device)
criterion = nn.BCELoss()

optimizerD = optim.Adam(netD.parameters(), lr=d_lr, betas=(0.5, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=g_lr, betas=(0.5, 0.999))

In [None]:
# Training Loop

img_list = []
G_losses = []
D_losses = []
D_accuracies = []
iters = 0
num_epochs =  2000 #to test


print("Starting Training Loop...")

for epoch in range(num_epochs):
    netD.train()
    for i, (spectra, labels) in enumerate(dataloader):        
        spectra = spectra.to(device)
        labels = labels.to(device)
        batch_size = spectra.size(0)
        real_labels = torch.ones(batch_size, 1).to(device)
        fake_labels = torch.zeros(batch_size, 1).to(device)

        optimizerD.zero_grad()

        real_output = netD(spectra, labels) 

        d_loss_real = criterion(real_output, real_labels)
        real_acc = ((real_output > 0.5).float() == real_labels).float().mean()
       
        noise = torch.randn(batch_size, noise_size, device=device)
        fake_spectra = netG(noise, labels).squeeze()
        
        fake_output = netD(fake_spectra.detach(), labels)        
        d_loss_fake = criterion(fake_output, fake_labels)

        fake_acc = (fake_output < 0.5).float().mean()

        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        
        optimizerD.step()

        d_accuracy = (real_acc + fake_acc) / 2

        optimizerG.zero_grad()
        fake_spectra_for_g = netG(noise,labels).squeeze()
        output = netD(fake_spectra_for_g, labels)

        g_loss = criterion(output, real_labels)

        g_loss.backward()
        optimizerG.step()

        if (i + 1) % LOG_INTERVAL == 0:
            print(
                f"[Epoch {epoch+1}/{num_epochs}] [Batch {i+1}/{len(dataloader)}] "
                f"[D loss: {d_loss.item():.4f}] [G loss: {g_loss.item():.4f}] "
                f"[D Acc: {d_accuracy.item():.2%}]"
            )
            D_losses.append(d_loss.item())
            G_losses.append(g_loss.item())
            D_accuracies.append(d_accuracy.item())
        
    if (epoch + 1) % PLOT_INTERVAL == 0:
        print(f"--- Generating plot for epoch {epoch+1} ---")
        plot_samples(epoch=epoch, generator=netG, real_samples=fixed_real_samples, real_labels=fixed_real_labels ,dim=noise_size, features=701)


print("--- Training Finished ---")

