# Import Package

In [1]:
import torch.nn as nn
import torch
import numpy as np
import torch.nn.functional as F

# Model Class

In [2]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        
        # For encoder
        self.encode = nn.Sequential(
            nn.Linear(12,32),
            nn.LeakyReLU(),
            nn.Linear(32,64),
            nn.LeakyReLU(),
            nn.Linear(64,32),
            nn.LeakyReLU()
        )
        
        self.l_mu = nn.Linear(in_features=32, out_features=10)
        self.l_var = nn.Linear(in_features=32,out_features=10)
    
    def encoder(self, x):
        x = x.view(-1,12)
        h = self.encode(x)
        mu = self.l_mu(h)
        log_var = self.l_var(h)
        
        return mu, log_var  # mu, log_var
    
    def sampling(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu)  # return z sample
    
    def auxilary(self,z):
        z_p = Variable(z.data.new(z.size()).normal_())
        return z_p
    
    def forward(self, x):
        mu, log_var = self.encoder(x)
        z = self.sampling(mu, log_var)
        z_p = self.auxilary(z)
        return mu, log_var, z,z_p

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        
        self.decode = nn.Sequential(
            nn.Linear(10,28),
            nn.LeakyReLU(),
            nn.Linear(28,40),
            nn.LeakyReLU(),
            nn.Linear(40, 18),
            nn.LeakyReLU()
        )
        self.output = nn.Linear(in_features=18, out_features=12)
    
    def forward(self, z):
        h = self.decode(z)
        h = self.output(h)
        return h
    
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.discriminator = nn.Sequential(
            nn.Linear(12,32),
            nn.LeakyReLU(),
            nn.Linear(32, 24),
            nn.LeakyReLU(),
            nn.Linear(24, 1),
        )
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = x.view(-1,12)
        validity = self.discriminator(x)
        return self.sigmoid(validity)
    
    def similarity(self, x):
        x = x.view(-1,12)
        h = self.discriminator(x)
        return h

In [3]:
def discriminator_loss(recon_data, sample_data, real_data, REAL_LABEL, FAKE_LABEL):
    recon_loss = F.binary_cross_entropy(recon_data, FAKE_LABEL)
    sample_loss = F.binary_cross_entropy(sample_data, FAKE_LABEL)
    real_loss = F.binary_cross_entropy(real_data, REAL_LABEL)
    return recon_loss + sample_loss + real_loss

## Plot loss(JPG)

In [4]:
def plot_loss(train_dec_loss_li,train_dis_loss_li,train_enc_loss_li,valid_dec_loss_li,valid_dis_loss_li,valid_enc_loss_li,test_dec_loss_li,test_dis_loss_li,test_enc_loss_li):
    plt.plot(train_dec_loss_li, color='b', label='train_dec_loss')
    plt.plot(train_dis_loss_li, color='g', label='train_dis_loss')
    plt.plot(train_enc_loss_li, color='r', label='train_enc_loss')
    plt.plot(valid_dec_loss_li, color='cadetblue', label='valid_dec_loss')
    plt.plot(valid_dis_loss_li, color='coral', label='valid_dis_loss')
    plt.plot(valid_enc_loss_li, color='cyan', label='valid_enc_loss')
    plt.plot(test_dec_loss_li, color='fuchsia', label='valid_dec_loss')
    plt.plot(test_dis_loss_li, color='hotpink', label='valid_dis_loss')
    plt.plot(test_enc_loss_li, color='lavenderblush', label='valid_enc_loss')
    plt.legend()
    plt.savefig('loss_png/loss_all-{}.jpg'.format(str('')))
    plt.close()

    plt.plot(train_dec_loss_li, color='b', label='train_dec_loss')
    plt.plot(train_dis_loss_li, color='g', label='train_dis_loss')
    plt.plot(train_enc_loss_li, color='r', label='train_enc_loss')
    plt.legend()
    plt.savefig('loss_png/train_loss-{}.jpg'.format(str('')))
    plt.close()

    plt.plot(valid_dec_loss_li, color='cadetblue', label='valid_dec_loss')
    plt.plot(valid_dis_loss_li, color='coral', label='valid_dis_loss')
    plt.plot(valid_enc_loss_li, color='cyan', label='valid_enc_loss')
    plt.legend()
    plt.savefig('loss_png/valid_loss-{}.jpg'.format(str('')))
    plt.close()
    
    plt.plot(test_dec_loss_li, color='fuchsia', label='test_dec_loss')
    plt.plot(test_dis_loss_li, color='hotpink', label='test_dis_loss')
    plt.plot(test_enc_loss_li, color='lavenderblush', label='test_enc_loss')
    plt.legend()
    plt.savefig('loss_png/test_loss-{}.jpg'.format(str('')))
    plt.close()

## L1_loss Calculation

In [5]:
def l1loss_cal(data_loader):
    l1_loss = []
    for i, (input_data,ground_truth) in tqdm(enumerate(data_loader)):
        input_data = Variable(input_data.type(Tensor)).type(torch.cuda.FloatTensor)
        ground_truth = Variable(ground_truth.type(Tensor)).type(torch.cuda.FloatTensor)

        mu, log_var, z,z_p  = encoder(input_data)
        predicts = decoder(z)
        l1_loss.append(abs(float(torch.mean(ground_truth[0,:,1]-predicts.reshape(-1,4,3)[0,:,1]))))
    return sum(l1_loss)/len(l1_loss)

In [6]:
def plot_l1_loss(train_l1,valid_l1,test_l1):
    plt.plot(train_l1, color='darkred', label='Train_l1_loss')
    plt.plot(valid_l1, color='deeppink', label='Valid_l1_loss')
    plt.plot(test_l1, color='gold', label='Test_l1_loss')
    plt.legend()
    plt.savefig('loss_png_l1/all_l1_loss-{}.jpg'.format(str('')))
    plt.close()
    
    plt.plot(train_l1, color='darkred', label='Train_l1_loss')
    plt.legend()
    plt.savefig('loss_png_l1/train_l1_loss-{}.jpg'.format(str('')))
    plt.close()
    
    plt.plot(valid_l1, color='deeppink', label='Valid_l1_loss')
    plt.legend()
    plt.savefig('loss_png_l1/valid_l1_loss-{}.jpg'.format(str('')))
    plt.close()
    
    plt.plot(test_l1, color='gold', label='Valid_l1_loss')
    plt.legend()
    plt.savefig('loss_png_l1/test_l1_loss-{}.jpg'.format(str('')))
    plt.close()

# Load Data

In [7]:
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm 

## DataLoader Class

In [8]:
class SignalDataset(Dataset):
    def __init__(self, input_data,ground_truth):
        self.input_data =input_data
        self.ground_truth = ground_truth

    def __getitem__(self, index):
        input_data = np.array(self.input_data[index], dtype=np.float32)
        ground_truth = np.array(self.ground_truth[index], dtype=np.float32)
        return input_data,ground_truth

    def __len__(self):
        return len(self.input_data)

In [9]:
train_input = np.load('train_input.npy')
train_gt = np.load('train_gt.npy')
valid_input = np.load('valid_input.npy')
valid_gt = np.load('valid_gt.npy')
test_input = np.load('test_input.npy')
test_gt = np.load('test_gt.npy')

In [10]:
train_input = train_input.transpose(0,2,1)
train_gt = train_gt.transpose(0,2,1)
valid_input = valid_input.transpose(0,2,1)
valid_gt = valid_gt.transpose(0,2,1)
test_input = test_input.transpose(0,1,3,2)
test_gt = test_gt.transpose(0,1,3,2)

In [11]:
test_input = test_input[:54,:1200,:,:]
test_input = test_input.reshape(-1,4,3)[:5]
test_gt = test_gt[:54,:1200,:,:]
test_gt = test_gt.reshape(-1,4,3)[:5]

In [12]:
train_dataset = SignalDataset(train_input,train_gt)
valid_dataset = SignalDataset(valid_input,valid_gt)
test_dataset = SignalDataset(test_input,test_gt)

train_loader = DataLoader(dataset=train_dataset, batch_size=100, shuffle=True, num_workers=0)
valid_loader = DataLoader(dataset=valid_dataset, batch_size=100, shuffle=True, num_workers=0)
test_loader = DataLoader(dataset=test_dataset, batch_size=100, shuffle=True, num_workers=0)

# Train Model

In [13]:
from torch.autograd import Variable
import matplotlib.pyplot as plt

In [14]:
if torch.cuda.is_available():
    Tensor = torch.cuda.FloatTensor
    encoder = Encoder().cuda()
    decoder = Decoder().cuda()
    discriminator = Discriminator().cuda()
else:
    Tensor = torch.FloatTensor
    encoder = Encoder()
    decoder = Decoder()
    discriminator = Discriminator()

In [15]:
train_dec_loss_li,train_dis_loss_li,train_enc_loss_li,valid_dec_loss_li,valid_dis_loss_li,valid_enc_loss_li = [],[],[],[],[],[]
test_dec_loss_li,test_dis_loss_li,test_enc_loss_li = [],[],[]
train_l1,valid_l1,test_l1 = [],[],[]

In [16]:
optimizer_E = torch.optim.Adam(encoder.parameters(), lr=0.0001)
optimizer_D = torch.optim.Adam(decoder.parameters(), lr=0.0001)
optimizer_Dis = torch.optim.Adam(discriminator.parameters(), lr=0.0001)
# lr_scheduler_G = torch.optim.lr_scheduler.StepLR(optimizer_G, step_size = 1, gamma=0.9)

## Train model process detail function

In [17]:
def model_process(mode,dataloader,epoch):
    flag = 0
    for i, (input_data,ground_truth) in tqdm(enumerate(dataloader)):
        valid = Variable(Tensor(len(input_data), 1).fill_(1.0), requires_grad=False).type(torch.cuda.FloatTensor)
        fake = Variable(Tensor(len(input_data), 1).fill_(0.0), requires_grad=False).type(torch.cuda.FloatTensor)
        input_data = Variable(input_data.type(Tensor)).type(torch.cuda.FloatTensor)
        ground_truth = Variable(ground_truth.type(Tensor)).type(torch.cuda.FloatTensor)

        mu, log_var, z,z_p  = encoder(input_data)
        predicts = decoder(z)
        predicts_p = decoder(z_p)

        X = discriminator(predicts)
        X_p = discriminator(predicts_p)
        X_real = discriminator(ground_truth)
        X_sim = discriminator.similarity(predicts)
        X_data = discriminator.similarity(ground_truth)
        dis_loss = discriminator_loss(X, X_p, X_real, valid, fake)
        if mode =='train':
            if epoch % 2 == 0 and flag==0:
                if epoch == 0 and i>10:
                    flag = 1
                optimizer_Dis.zero_grad()
                dis_loss.backward(retain_graph=True)
                optimizer_Dis.step()
        
        X = discriminator(predicts)
        X_p = discriminator(predicts_p)
        X_real = discriminator(ground_truth)
        X_sim = discriminator.similarity(predicts)
        X_data = discriminator.similarity(ground_truth)
        dis_loss = discriminator_loss(X, X_p, X_real, valid, fake)
        rec_loss = ((X_sim - X_data) ** 2).mean()
        decoder_loss = 25 * rec_loss - dis_loss
        if mode =='train':
            optimizer_D.zero_grad()
            decoder_loss.backward(retain_graph=True)
            optimizer_D.step()

        mu, log_var, z,z_p  = encoder(input_data)
        predicts = decoder(z)
        predicts_p = decoder(z_p)
        X_sim = discriminator.similarity(predicts)
        X_data = discriminator.similarity(ground_truth)
        rec_loss = ((X_sim - X_data) ** 2).mean()
        mu, log_var, z,z_p  = encoder(input_data)
        KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
        KLD = KLD / len(input_data) * 12
        encoder_loss = KLD + 5 * rec_loss
        if mode =='train':
            optimizer_E.zero_grad()
            encoder_loss.backward()
            optimizer_E.step()
    return decoder_loss,dis_loss,encoder_loss

## Start Train Epoch

In [None]:
for epoch in range(0,100):
    train_dec_loss,train_dis_loss,train_enc_loss,valid_dec_loss,valid_dis_loss,valid_enc_loss = 0,0,0,0,0,0
    
    #Training
    print('Training')
    decoder_loss,dis_loss,encoder_loss = model_process('train',train_loader,epoch)
    train_dec_loss = train_dec_loss + decoder_loss.item()
    train_dis_loss = train_dis_loss + dis_loss.item()
    train_enc_loss = train_enc_loss + encoder_loss.item()
    print('====> Epoch: {} Average Decoder loss: {:.4f} Average Discriminator loss: {:.4f} Average Ecoder loss: {:.4f}'.format(
          epoch+1,train_dec_loss / len(train_loader.dataset),train_dis_loss / len(train_loader.dataset),train_enc_loss / len(train_loader.dataset)))
    train_dec_loss_li.append(train_dec_loss/len(train_loader.dataset))
    train_dis_loss_li.append(train_dis_loss/len(train_loader.dataset))
    train_enc_loss_li.append(train_enc_loss/len(train_loader.dataset))
    
    #Validing
    print('Validing')   
    decoder_loss,dis_loss,encoder_loss = model_process('valid',valid_loader,epoch)
    valid_dec_loss = valid_dec_loss + decoder_loss.item()
    valid_dis_loss = valid_dis_loss + dis_loss.item()
    valid_enc_loss = valid_enc_loss + encoder_loss.item()
    print('Valid Epoch:{} ===> Average Decoder loss: {:.4f} Average Discriminator loss: {:.4f} Average Ecoder loss: {:.4f}'.format(epoch+1,
          valid_dec_loss / len(valid_loader.dataset),valid_dis_loss / len(valid_loader.dataset),valid_enc_loss / len(valid_loader.dataset)))
    valid_dec_loss_li.append(valid_dec_loss/len(valid_loader.dataset))
    valid_dis_loss_li.append(valid_dis_loss/len(valid_loader.dataset))
    valid_enc_loss_li.append(valid_enc_loss/len(valid_loader.dataset))
    
    #Testing
    print('Testing')   
    decoder_loss,dis_loss,encoder_loss = model_process('test',test_loader,epoch)
    test_dec_loss = valid_dec_loss + decoder_loss.item()
    test_dis_loss = valid_dis_loss + dis_loss.item()
    test_enc_loss = valid_enc_loss + encoder_loss.item()
    print('Test Epoch:{} ===> Average Decoder loss: {:.4f} Average Discriminator loss: {:.4f} Average Ecoder loss: {:.4f}'.format(epoch+1,
          test_dec_loss / len(valid_loader.dataset),test_dis_loss / len(valid_loader.dataset),test_enc_loss / len(valid_loader.dataset)))
    test_dec_loss_li.append(valid_dec_loss/len(valid_loader.dataset))
    test_dis_loss_li.append(valid_dis_loss/len(valid_loader.dataset))
    test_enc_loss_li.append(valid_enc_loss/len(valid_loader.dataset))
    
    #Plot loss
    plot_loss(train_dec_loss_li,train_dis_loss_li,train_enc_loss_li,valid_dec_loss_li,valid_dis_loss_li,valid_enc_loss_li,test_dec_loss_li,test_dis_loss_li,test_enc_loss_li)
    
    #L1 loss Calculation and Plot L1 loss
    print('L1 loss Calculation')
    train_l1.append(l1loss_cal(train_loader))
    valid_l1.append(l1loss_cal(valid_loader))
    test_l1.append(l1loss_cal(test_loader))
    plot_l1_loss(train_l1,valid_l1,test_l1)
    
    if epoch%5 ==0:
        torch.save(encoder,'model/encoder.pt')
        torch.save(decoder,'model/decoder.pt')
        torch.save(discriminator,'model/discriminator.pt')

0it [00:00, ?it/s]

Training


196it [00:02, 81.58it/s]

In [None]:
input_data[0,:,1]

In [None]:
predicts.reshape(-1,4,3)[0,:,1]

In [None]:
ground_truth[0,:,1]

In [None]:
ground_truth[0,:,1]-predicts.reshape(-1,4,3)[0,:,1]

In [None]:
print(input_data[0],predicts.reshape(-1,4,3)[0],ground_truth[0])