In [None]:
import random
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from IPython.display import clear_output
from sklearn.metrics import r2_score

import matplotlib as mpl
import matplotlib.pyplot as plt

import data_parser
%matplotlib inline

In [None]:
# CUDA
if torch.cuda.is_available():
    dev = "cuda:0"
    print("CUDA avaiable")
else:  
    dev = "cpu"
    print("CUDA not avaiable")
device = torch.device(dev)

In [None]:
def live_plot_X_y(X_train_label, y_train, train_pred=None, X_val_label=None, y_val=None, \
                  val_pred=None, epoch=None, loss=None, mer=None, output_col=None, title=None, savefig=False, figname=None, unwrap=False):
    
    clear_output(wait=True)
    F = np.arange(0, 100.2, 0.2)
    
    if X_val_label is not None:
        fig, ax = plt.subplots(2, figsize=(20, 10))
        
        ax[0].plot(F, y_train.detach().numpy())
        if train_pred is not None:
            ax[0].plot(F, train_pred.detach().numpy())
        ax[0].set_xlabel(X_train_label+' Frequency (GHz)')
        if output_col is not None:
            ax[0].set_ylabel(output_col)
        
        ax[1].plot(F, y_val.detach().numpy())
        if val_pred is not None:
            text_kwargs = dict(fontsize=18, )
            ax[1].plot(F, val_pred.detach().numpy())
            plt.text(0, 0, 'Batch %d Loss: %.4f Max Error Rate : %.4f' % (epoch, loss, mer), **text_kwargs)
    
        ax[1].set_xlabel(X_val_label+' Frequency (GHz)')
        if output_col is not None:
            ax[1].set_ylabel(output_col)
        
        if title is not None:
            ax[1].set_title(title)
        
    else:
        fig, ax = plt.subplots(1, figsize=(20, 10))
        ax.plot(F, y_train.detach().numpy())
        if train_pred is not None:
            text_kwargs = dict(fontsize=18, )
            ax.plot(F, pred.detach().numpy())
            plt.text(0, 0, 'Batch %d Loss: %.4f Max Error Rate : %.4f' % (epoch, loss, mer), **text_kwargs)  
        ax.set_xlabel(X_train_label+' Frequency (GHz)')
        if output_col is not None:
            ax.set_ylabel(output_col)
            
        if title is not None:
            ax[1].set_title(title)
    if unwrap == True:
        figname += "_unwrap"
    if savefig == True:
        plt.savefig('../Figures/pdf/'+figname+'.pdf')
        plt.savefig('../Figures/png/'+figname+'.png')
    plt.show();

In [None]:
class data_processor():
    def __init__(self):
        self.dict = {}
    
    def initial_normalize(self, X, y):
        self.dict['X_std'] = X.std(dim=0)
        self.dict['X_mean'] = X.mean(dim=0)
        self.dict['y_std'] = y.std(dim=0)
        self.dict['y_mean'] = y.mean(dim=0)
        # new_X = (X - self.dict['X_mean']) / self.dict['X_std']
        # new_y = (y - self.dict['y_mean']) / self.dict['y_std']
        # return new_X, new_y
        return
    
    def normalize(self, X=None, y=None):
        new_X, new_y = None, None
        if X is not None:
            new_X = (X - self.dict['X_mean']) / self.dict['X_std']
        if y is not None:
            new_y = (y - self.dict['y_mean']) / self.dict['y_std']
        return new_X, new_y
        
    def denormalize(self, X=None, y=None):
        original_X, original_y = None, None
        if X is not None:
            original_X = X * self.dict['X_std'] + self.dict['X_mean']
        if y is not None:    
            original_y = y * self.dict['y_std'] + self.dict['y_mean']
        return original_X, original_y

In [None]:
class P13_fitting_MLP_6(nn.Module):
    def __init__(self):
        super(P13_fitting_MLP_6, self).__init__()
        # Define the layers in the model
        self.fc1 = nn.Linear(4, 800)
        self.fc2 = nn.Linear(800, 800)
        self.fc3 = nn.Linear(800, 800)
        self.fc4 = nn.Linear(800, 800)
        self.fc5 = nn.Linear(800, 1)

    def forward(self, x):
        # Forward propagation
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))
        x = self.fc5(x)
        return x
    
def initialize_weights(self):
    for m in self.modules():
        if isinstance(m, nn.Linear):
            torch.nn.init.kaiming_normal_(m.weight.data, nonlinearity='relu')
            if m.bias is not None:
                torch.nn.init.zeros_(m.bias.data)



In [None]:
def train(df, input_cols, output_col, unwrap, result_config):
    

    
    # Define Model name, figure title and figure name
    modelpath = '../Models/'+ output_col + '.pt'
    figtitle = output_col
    figname = output_col

    # Randomly shuffle the indices
    index_list = list(dict.fromkeys(df.index.get_level_values(0)))
    random.seed(42)
    np.random.shuffle(index_list)

    # Split the indices into 80% training set, 10% testing set and 10% validation set
    train_idx = index_list[:int(len(index_list) * 0.8)]
    test_idx = index_list[int(len(index_list) * 0.8):int(len(index_list) * 0.9)]
    val_idx = index_list[int(len(index_list) * 0.9):]
    X_train, y_train = torch.Tensor(df.loc[train_idx][input_cols].to_numpy()).to(device), torch.Tensor(df.loc[train_idx][output_col].to_numpy().reshape(-1, 1)).to(device)
    X_test, y_test = torch.Tensor(df.loc[test_idx][input_cols].to_numpy()).to(device), torch.Tensor(df.loc[test_idx][output_col].to_numpy().reshape(-1, 1)).to(device)
    X_val, y_val = torch.Tensor(df.loc[val_idx][input_cols].to_numpy()).to(device), torch.Tensor(df.loc[val_idx][output_col].to_numpy().reshape(-1, 1)).to(device)

    # Define an example training, validating index for live plot
    train_idx_example = train_idx[0]
    val_idx_example = val_idx[0]
    X_train_example, y_train_example = torch.Tensor(df.loc[train_idx_example][input_cols].to_numpy()).to(device), torch.Tensor(df.loc[train_idx_example][output_col].to_numpy().reshape(-1, 1)).to(device)
    X_val_example, y_val_example = torch.Tensor(df.loc[val_idx_example][input_cols].to_numpy()).to(device), torch.Tensor(df.loc[val_idx_example][output_col].to_numpy().reshape(-1, 1)).to(device)
    
    # Preprocess the data
    dp = data_processor()
    X = df[input_cols].to_numpy()
    y = df[output_col].to_numpy().reshape(-1, 1)
    dp.initial_normalize(torch.Tensor(X).to(device), torch.Tensor(y).to(device))
    X_train_norm, y_train_norm = dp.normalize(X_train, y_train)
    X_test_norm, y_test_norm = dp.normalize(X_test, y_test)
    X_val_norm, y_val_norm = dp.normalize(X_val, y_val)
    
    # Normalize example data
    X_train_example_norm, _ = dp.normalize(X=X_train_example)
    X_val_example_norm, _ = dp.normalize(X=X_val_example)
    
    model = P13_fitting_MLP_6()
    learning_rate = 0.01
    optimizer = optim.Adamax(model.parameters(), lr=learning_rate)
    model = model.to(device)

    epoch = 0
    num_epochs = 30000
    
    # Define the loss function
    criterion = nn.MSELoss()
    
    #for epoch in range(num_epochs)
    train_loss = 1.0
    best_val_mse = 100.0
    best_val_mer = -100.0
    
    train_size = X_train.shape[0]
    batch_size = 501 * 1
    batch_start = 0
    
    mini_epoch = 100
    for epoch in range(num_epochs):
        print(epoch)
        batch_end = batch_start + batch_size
        if batch_end >= train_size:
            X_train_batch = torch.cat((X_train_norm[batch_start:train_size], X_train_norm[0:(batch_end-train_size)]))
            y_train_batch = torch.cat((y_train_norm[batch_start:train_size], y_train_norm[0:(batch_end-train_size)]))
            batch_start = batch_end - train_size
        else:
            X_train_batch = X_train_norm[batch_start:batch_end]
            y_train_batch = y_train_norm[batch_start:batch_end]
            batch_start = batch_end
        
        for i in range(mini_epoch):
            # Clear the gradients
            optimizer.zero_grad()
            
            # Forward propagation
            outputs = model(X_train_batch)

            # Calculate the loss function
            loss = criterion(outputs, y_train_batch)

            # Backward propagation
            loss.backward()

            # Update parameters
            optimizer.step()

        # Calculate original values
        _, orignial_outputs = dp.denormalize(y=outputs)
        _, original_y_train = dp.denormalize(y=y_train_batch)
        
        # Validation
        if epoch % 100 == 99: 

            with torch.no_grad():

                # Make predictions on the test data
                predictions = model(X_val_norm)

                # Calculate the loss function
                val_loss = criterion(predictions, y_val_norm)

                # Track the loss value 
                val_loss = val_loss.item()

                _, orignial_predictions = dp.denormalize(y=predictions)
                _, original_y_val = dp.denormalize(y=y_val_norm)
                
                # Compute the max error rate
                mer = torch.max(torch.abs(orignial_predictions - original_y_val) / original_y_val)
                
                # Save best model
                if val_loss < best_val_mse:
                    best_val_mse = val_loss
                    best_val_mer = mer
                    torch.save(model.state_dict(), modelpath) 
                    
                # Example data
                predictions = model(X_train_example_norm)
                _, orignial_train_predictions = dp.denormalize(y=predictions)
                
                predictions = model(X_val_example_norm)
                _, original_val_predictions = dp.denormalize(y=predictions)
                

                live_plot_X_y(train_idx_example, y_train_example.cpu(), orignial_train_predictions.cpu(), val_idx_example, \
                              y_val_example.cpu(), original_val_predictions.cpu(), epoch + 1, val_loss, \
                              mer, output_col=output_col, title=figtitle, savefig=((epoch + 1) == num_epochs), figname=figname, unwrap)
    # Testing
    
    model.load_state_dict(torch.load(modelpath))
    with torch.no_grad():

        # Make predictions on the test data
        predictions = model(X_test_norm)

        # Calculate the loss function
        test_loss = criterion(predictions, y_test_norm)

        # Track the loss value 
        test_loss = test_loss.item()  
        
        _, orignial_predictions = dp.denormalize(y=predictions)
        _, original_y_test = dp.denormalize(y=y_test_norm)
        
        # Compute the max error rate
        mer = torch.max(torch.abs(orignial_predictions - original_y_test) / original_y_test)

    # Save results
    result_lst = [[output_col, best_val_mse, best_val_mer, test_loss, mer]]
    result_config['df'] = pd.concat([result_config['df'], pd.DataFrame(result_lst, columns=result_config['columns'])], ignore_index=True)
    return result_config

In [7]:
df = data_parser.calculate_amp_phase(data_parser.data_parse(), unwrap=False)
df_unwrap = data_parser.calculate_amp_phase(data_parser.data_parse()) # unwrap the phase

In [8]:
df.index.to_numpy()
df_unwrap.index.to_numpy()
input_cols = ["F", "W", "Trap", "Length"]
output_col = 'P(1,3)'

columns = ['S_parameter', 'best_val_mse', 'best_val_mer', 'test_mse', 'test_mer']
result_config = {}
result_config['df'] = pd.DataFrame(columns=columns)
result_config['columns'] = columns

In [None]:
result_config = train(df, input_cols, output_col, result_config)

0
