In [None]:
import torch; torch.manual_seed(0)
import torch.nn as nn
import torch.nn.functional as F
import torch.utils
import torch.distributions
import numpy as np
import matplotlib.pyplot as plt; plt.rcParams['figure.dpi'] = 200
import sqlite3
import pandas as pd
import json
from sklearn.model_selection import train_test_split
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
# Load the data
# First 50 % of the data from NaI and rest 50 % is from PS.


db_name = "./Combined_spectrum_database_12_Sep_2023_RN_Ar-41_Co-60_Cs-137_I-131_K-40_Tl-208.db"
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
x = []
y = []
try:
    for i in range(100000):
        query = f"SELECT * FROM spec_data WHERE id = {i+1}"
        cursor.execute(query)
        spectrum_data = cursor.fetchall()
        x.append( np.array(json.loads(spectrum_data[0][2])) ) # Add NaI spectrum
        y.append( np.array(list(json.loads(spectrum_data[0][1]).values()))/100 )  # Radionuclide Percentage
        
        
finally:
conn.close()




# Initializing hyperparameters
batch_size = 1000
number_of_batches = 100
label_array_size = 6
data_array_size = 1024

In [None]:
# Function for normalization 
def normlize_array(arr):
    arr = arr/arr.sum()
    return(arr)


x_norm = []
for i in x:
    x_norm.append(normlize_array(i))

In [None]:
# Reshaping the data to [1000,100,1,1024] and splitting the data in train and test data


data = torch.Tensor(x_norm).view(number_of_batches,batch_size,1,data_array_size)
label = torch.Tensor(y).view(number_of_batches,batch_size,1,label_array_size)


train_data, test_data, train_label, test_label = train_test_split(data, label, test_size=0.2,shuffle=False)

In [None]:
class VariationalEncoder(nn.Module):
    def __init__(self, latent_dims):
        super(VariationalEncoder, self).__init__()
        #Dimensionality reduction fromn1024 to latent_dims 
        self.linear1 = nn.Linear(1024, 512)
        self.linear_1 = nn.Linear(512, 256)
        self.linear2 = nn.Linear(256, latent_dims)
        self.linear3 = nn.Linear(256, latent_dims)
        #self.dropout = nn.Dropout(0.2)
        self.N = torch.distributions.Normal(0, 1)
        self.kl = 0


    def forward(self, x):
        
        x = torch.flatten(x, start_dim=1)
        x = self.linear1(x)
        x = self.linear_1(x)
        #x = self.dropout(x)
        x = F.relu(x)
        # Extracting Probabiluty Distribution
        mu =  self.linear2(x)
        sigma = torch.exp(self.linear3(x))
        # Z = Latent Vector
        z = mu + sigma*self.N.sample(mu.shape)
        self.kl = (sigma**2 + mu**2 - torch.log(sigma) - 1/2).sum()
        return z

In [None]:
class Decoder(nn.Module):
    def __init__(self, latent_dims):
        super(Decoder, self).__init__()
        self.linear1 = nn.Linear(latent_dims, 512)
        # self.linear_1 = nn.Linear(256, 512)
        self.linear2 = nn.Linear(512, 1024)
        self.linear3 = nn.Linear(512, 6)
 def forward(self, z):
        z_1 = F.relu(self.linear1(z))
        decoded_spec = torch.sigmoid(self.linear2(z_1))
        label = torch.softmax(self.linear3(z_1), dim=1)
        return decoded_spec.reshape((-1, 1, 1, 1024)),    label.reshape((-1, 1, 1, 6))


In [None]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dims):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = VariationalEncoder(latent_dims)
        self.decoder = Decoder(latent_dims)


    def forward(self, x):
        z = self.encoder(x)
        x_hat, y_hat = self.decoder(z)
        return x_hat, y_hat

In [None]:
number_of_epochs = 200


def train(autoencoder, data,label ,epochs=number_of_epochs):
    opt = torch.optim.Adam(autoencoder.parameters())
    lm1 = [] #For getting loss value plots
    for epoch in range(epochs):
        print('epochs : ',epoch+1)
        l1 = [] 
        l2 = []
        
        for x, y in zip(data,label):
            x = torch.flatten(x, start_dim=1) 
            opt.zero_grad()
            x_hat, y_hat = autoencoder(x)
            x_hat = torch.flatten(x_hat, start_dim=1)
            loss_m = ((x - x_hat)**2).sum()    # Reconstruction Loss
            loss_kl = autoencoder.encoder.kl    # KL Divergence
            # total loss  = Reconstruction LOss + KL Divergence
            loss = loss_m + loss_kl 
            l1.append(loss_m)
            l2.append(loss_kl)
            # Backpropagation
            loss.backward()
            opt.step()
        
lm1.append([torch.mean(torch.stack(l1)), torch.mean(torch.stack(l2))])
    return autoencoder,x_hat,y_hat,lm1

In [None]:
latent_dims = 32
vae = VariationalAutoencoder(latent_dims) # GPU
vae,x_hat,y_hat = train(vae,train_data,train_label)