In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import os
import matplotlib.pyplot as plt
import time

In [None]:
mb_size = 64
p_miss = 0.2
p_hint = 0.9
alpha = 400
train_rate = 0.8
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
dataset_file = 'RLD.csv' 
Data = np.loadtxt(dataset_file, delimiter=",", skiprows=1)
No, Dim = Data.shape
Min_Val = np.min(Data, axis=0)
Max_Val = np.max(Data, axis=0)
Data = (Data - Min_Val) / (Max_Val + 1e-6)


In [None]:
p_miss_vec = p_miss * np.ones((Dim, 1))
Missing = np.zeros((No, Dim))

for i in range(Dim):
    A = np.random.uniform(0., 1., size=No)
    B = A > p_miss_vec[i]
    Missing[:, i] = 1.0 * B

In [None]:
idx = np.random.permutation(No)
Train_No = int(No * train_rate)
Test_No = No - Train_No
trainX = Data[idx[:Train_No], :]
testX = Data[idx[Train_No:], :]
trainM = Missing[idx[:Train_No], :]
testM = Missing[idx[Train_No:], :]


In [None]:
class DataImputationDataset(Dataset):
    def __init__(self, data, missing, p_hint=0.9):
        self.data = torch.tensor(data, dtype=torch.float32)
        self.missing = torch.tensor(missing, dtype=torch.float32)
        self.p_hint = p_hint
        self.No, self.Dim = self.data.shape
    
    def __len__(self):
        return self.No
    
    def __getitem__(self, idx):
       
        X = self.data[idx]            
        M = self.missing[idx]         
        New_X = X * M                 

     
        H = self.sample_Hint(M)       # [1]

       
        New_X = New_X.unsqueeze(1)    
        M = M.unsqueeze(1)            
        X = X.unsqueeze(1)            
        H = H.view(1)                  

        return New_X, M, X, H
    
    def sample_Hint(self, M):
       
        A = torch.rand(1)
        B = A > self.p_hint
        C = B.float()
        H = C.view(1) 
        return H


train_dataset = DataImputationDataset(trainX, trainM, p_hint=p_hint)
test_dataset = DataImputationDataset(testX, testM, p_hint=p_hint)


train_loader = DataLoader(train_dataset, batch_size=mb_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=mb_size, shuffle=False, drop_last=False)
New_X, M, X, H = next(iter(train_loader))


In [None]:
class Generator(nn.Module):
    def __init__(self, input_dim, mask_dim, hidden_dim=128, num_heads=4, seq_len=134):
        super(Generator, self).__init__()
        self.hidden_dim = hidden_dim
        self.seq_len = seq_len

        self.bilstm1 = nn.LSTM(input_size=input_dim + mask_dim, hidden_size=hidden_dim, 
                               batch_first=True, bidirectional=True)
        self.layernorm1 = nn.LayerNorm(hidden_dim * 2)
        self.bilstm2 = nn.LSTM(input_size=hidden_dim * 2, hidden_size=hidden_dim, 
                               batch_first=True, bidirectional=True)
        self.layernorm2 = nn.LayerNorm(hidden_dim * 2)
        self.multihead_attn = nn.MultiheadAttention(embed_dim=hidden_dim * 2, num_heads=num_heads, batch_first=True)
        self.layernorm3 = nn.LayerNorm(hidden_dim * 2)
        self.dropout = nn.Dropout(0.3)

        self.fc1 = nn.Linear(hidden_dim * 2 * seq_len, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, input_dim)  
    def forward(self, new_x, m):
        new_x = new_x.float()
        m = m.float()
        inputs = torch.cat([new_x, m], dim=2)  
        bilstm_out1, _ = self.bilstm1(inputs) 
        bilstm_out1 = self.layernorm1(bilstm_out1)
        bilstm_out2, _ = self.bilstm2(bilstm_out1)  
        bilstm_out2 = self.layernorm2(bilstm_out2)
        attn_out, _ = self.multihead_attn(bilstm_out2, bilstm_out2, bilstm_out2)  
        attn_out = self.layernorm3(attn_out)
        dropout_out = self.dropout(attn_out)
        batch_size, seq_len, hidden_dim_2 = dropout_out.size()
        flattened = dropout_out.reshape(batch_size, -1)  
        G_h1 = F.relu(self.fc1(flattened)) 
        G_h2 = F.relu(self.fc2(G_h1))       
        G_h3 = F.relu(self.fc3(G_h2))      
        G_out = self.fc4(G_h3)             
        G_prob = G_out.unsqueeze(1).repeat(1, seq_len, 1) 
        return G_prob


class Discriminator(nn.Module):
    def __init__(self, input_dim, hint_dim, hidden_dim=128, seq_len=134):
        super(Discriminator, self).__init__()
        self.hidden_dim = hidden_dim
        self.seq_len = seq_len
        self.conv1 = nn.Conv1d(in_channels=input_dim * 2, out_channels=32, kernel_size=3, padding=1)
        self.layernorm1 = nn.LayerNorm([32, seq_len])
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64, 128)
        self.fc2 = nn.Linear(128, 64)
        self.dropout1 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, new_x, h):
        new_x = new_x.float()
        h = h.float()
        batch_size, seq_len, dim = new_x.size()
        h_expanded = h.unsqueeze(1).expand(-1, seq_len, -1) 
        inputs = torch.cat([new_x, h_expanded], dim=2)   
        inputs_reshaped = inputs.permute(0, 2, 1)            
        conv_out1 = self.conv1(inputs_reshaped)              
        layernorm1 = self.layernorm1(conv_out1)             
        conv_out2 = self.conv2(layernorm1)                 
        gap = torch.mean(conv_out2, dim=2)                  
        D_h1 = F.relu(self.fc1(gap))                        
        norm_layer2 = nn.LayerNorm(D_h1.size()[1:]).to(device)(D_h1)  
        D_h2 = F.relu(self.fc2(norm_layer2))             
    dropout1 = self.dropout1(D_h2)                      
        D_logit = self.fc3(dropout1)                        
        D_prob = torch.sigmoid(D_logit)                 
        return D_prob


In [None]:
def discriminator_loss(generator, discriminator, M, New_X, H):
    G_sample = generator(New_X, M) 
    Hat_New_X = New_X * M + G_sample * (1 - M)  
    D_prob = discriminator(Hat_New_X, H) 
    criterion = nn.BCELoss()
    real_labels = torch.ones(D_prob.size()).to(device)
    fake_labels = torch.zeros(D_prob.size()).to(device)
    D_real = discriminator(New_X, H) 
    D_fake = discriminator(G_sample, H)  
    loss_real = criterion(D_real, real_labels)
    loss_fake = criterion(D_fake, fake_labels)
    D_loss = loss_real + loss_fake
    return D_loss


def generator_loss(generator, discriminator, X, M, New_X, H, alpha=10.0):
    G_sample = generator(New_X, M)  
    D_prob = discriminator(G_sample, H) 
    criterion = nn.BCELoss()
 
    real_labels = torch.ones(D_prob.size()).to(device)
    G_loss1 = criterion(D_prob, real_labels)

    MSE_train_loss = torch.mean(((1 - M) * X - (1 - M) * G_sample)**2) / torch.mean(1 - M)

  
    G_loss = G_loss1 + alpha * MSE_train_loss 

    MSE_test_loss = torch.mean(((1 - M) * X - (1 - M) * G_sample)**2) / torch.mean(1 - M)
    return G_loss, MSE_train_loss, MSE_test_loss


def test_loss(generator, X, M, New_X):
    G_sample = generator(New_X, M)  
    MSE_test_loss = torch.mean(((1 - M) * X - (1 - M) * G_sample)**2) / torch.mean(1 - M)

    return MSE_test_loss, G_sample


In [None]:

input_dim = 1         
mask_dim = 1          
hint_dim = 0.9          
seq_len = Dim          


generator = Generator(input_dim=input_dim, mask_dim=mask_dim, hidden_dim=128, num_heads=4, seq_len=seq_len).to(device)
discriminator = Discriminator(input_dim=input_dim, hint_dim=hint_dim, hidden_dim=128, seq_len=seq_len).to(device)
optimizer_G = optim.Adam(generator.parameters(), lr=0.1)
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.1)

In [None]:

start_time = time.time()
num_epochs =1000  

for epoch in range(num_epochs):
    generator.train()
    discriminator.train()
    
    D_loss_epoch = 0.0
    G_loss_epoch = 0.0
    MSE_train_epoch = 0.0
  
    batch_count = 0
    
    for batch_idx, (New_X, M, X, H) in enumerate(train_loader):

        New_X = New_X.to(device) 
        M = M.to(device)          
        X = X.to(device)          
        H = H.to(device)         
     
        optimizer_D.zero_grad()
        D_loss = discriminator_loss(generator, discriminator, M, New_X, H)
        D_loss.backward()
        optimizer_D.step()
        
       
        optimizer_G.zero_grad()
        G_loss, MSE_train_loss, _ = generator_loss(generator, discriminator, X, M, New_X, H, alpha)  
        G_loss.backward()
        optimizer_G.step()
        
 
        D_loss_epoch += D_loss.item()
        G_loss_epoch += G_loss.item()
        MSE_train_epoch += MSE_train_loss.item()

        batch_count += 1
    

    D_loss_avg = D_loss_epoch / batch_count
    G_loss_avg = G_loss_epoch / batch_count
    MSE_train_avg = MSE_train_epoch / batch_count
  torch.save(generator, 'generator_trained_complete.pth')
torch.save(discriminator, 'discriminator_trained_complete.pth')
    
    print(f"Epoch [{epoch+1}/{num_epochs}] - D_loss: {D_loss_avg:.4f}, G_loss: {G_loss_avg:.4f}, MSE_train_loss: {MSE_train_avg:.4f}")
    end_time = time.time()  
elapsed_time = end_time - start_time  

In [None]:
generator = torch.load('generator_trained_complete.pth')
discriminator = torch.load('discriminator_trained_complete.pth')

generator.eval()
discriminator.eval()

total_MSE_test = 0.0
total_MAE_test = 0.0
total_RMSE_test = 0.0
total_samples = 0

with torch.no_grad():
    for New_X, M, X, H in test_loader:
        New_X = New_X.to(device)
        M = M.to(device)
        X = X.to(device)
        H = H.to(device)
        

        mse_test, predicted = test_loss(generator, X, M, New_X)
        total_MSE_test += mse_test.item() * New_X.size(0)  
        

        mae_test = torch.mean(torch.abs((1 - M) * (X - predicted))).item() 
        total_MAE_test += mae_test * New_X.size(0)  

        rmse_test = torch.sqrt(F.mse_loss(predicted, New_X, reduction='sum') / New_X.size(0)).item()
        total_RMSE_test += rmse_test * New_X.size(0) 
        
        total_samples += New_X.size(0)


avg_MSE_test = total_MSE_test / total_samples
avg_MAE_test = total_MAE_test / total_samples
rmse = np.sqrt(avg_MSE_test)

