In [7]:
import h5py
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt
import torch
from PIL import Image
from torch.utils.data import TensorDataset, DataLoader

path_train_normal = '/content/drive/MyDrive/Wile_C/Dataset_normal_vertical0.51/X_train_normal.h5'
path_train_failure = '/content/drive/MyDrive/Wile_C/Dataset_normal_vertical0.51/X_train_failure.h5'
X_train_normal = h5py.File(path_train_normal, 'r+')
X_train_failure = h5py.File(path_train_failure, 'r+')
X_train_normal = np.array(X_train_normal['X_train'])
X_train_failure = np.array(X_train_failure['X_train'])
print(X_train_normal.shape, X_train_failure.shape)

path_test_normal = '/content/drive/MyDrive/Wile_C/Dataset_normal_vertical0.51/X_test_normal.h5'
path_test_failure = '/content/drive/MyDrive/Wile_C/Dataset_normal_vertical0.51/X_test_failure.h5'
X_test_normal = h5py.File(path_test_normal, 'r+')
X_test_failure = h5py.File(path_test_failure, 'r+')
X_test_normal = np.array(X_test_normal['X_test'])
X_test_failure = np.array(X_test_failure['X_test'])
print(X_test_normal.shape, X_test_failure.shape)

path_val_normal = '/content/drive/MyDrive/Wile_C/Dataset_normal_vertical0.51/X_val_normal.h5'
path_val_failure = '/content/drive/MyDrive/Wile_C/Dataset_normal_vertical0.51/X_val_failure.h5'
X_val_normal = h5py.File(path_val_normal, 'r+')
X_val_failure = h5py.File(path_val_failure, 'r+')
X_val_normal = np.array(X_val_normal['X_val'])
X_val_failure = np.array(X_val_failure['X_val'])
print(X_val_normal.shape, X_val_failure.shape)

X_train = np.reshape(X_train_normal, (34,1,249999,8))
X_val = np.reshape(X_val_normal, (8,1,249999,8))
X_test = np.reshape(X_test_normal, (7,1,249999,8))
X_train = np.concatenate((X_train, X_val, X_test), axis=0)
print(X_train.shape)
y_train = np.ones(len(X_train))

data = TensorDataset(torch.as_tensor(X_train).float(), torch.as_tensor(y_train))
data = DataLoader(data, batch_size=8, shuffle=True, drop_last=True)
print(data)

(34, 249999, 8, 1) (35, 249999, 8, 1)
(7, 249999, 8, 1) (8, 249999, 8, 1)
(8, 249999, 8, 1) (8, 249999, 8, 1)
(49, 1, 249999, 8)
<torch.utils.data.dataloader.DataLoader object at 0x7fc8fdd6e410>


In [4]:
def forward(self, x):
    base_out = self.base_model(x)
        
    self.mu = self.lin_mu(base_out)
    self.log_var = self.lin_var(base_out)
    std = torch.exp(self.log_var/2)
                
    eps = torch.randn_like(self.mu)
    z = self.mu + eps * std
    return z

In [5]:
class AutoEncoder(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.enc = encoder
        self.dec = decoder
        
    def forward(self, x):
        # when encoder met decoder
        enc_out = self.enc(x)
        return self.dec(enc_out)

In [6]:
def kl_div(mu, std):
    kl_div = -0.5*(1 + np.log(std**2) - mu**2 - std**2)
    return kl_div

In [8]:
def set_seed(self, seed=42):
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.manual_seed(seed)
    np.random.seed(seed)

class EncoderVar(nn.Module):
    def __init__(self, input_shape, z_size, base_model):
        super().__init__()
        self.z_size = z_size
        self.input_shape = input_shape
        self.base_model = base_model
        output_size = self.get_output_size()
        self.lin_mu = nn.Linear(output_size, z_size)
        self.lin_var = nn.Linear(output_size, z_size)
        
    def get_output_size(self):
        device = next(self.base_model.parameters()).device.type
        size = self.base_model(torch.zeros(1, *self.input_shape, device=device)).size(1)
        return size
    
    def kl_loss(self):
        kl_loss = -0.5*(1 + self.log_var - self.mu**2 - torch.exp(self.log_var))
        return kl_loss
        
    def forward(self, x):
        # the base model, same as the traditional AE
        base_out = self.base_model(x)
        
        # now the encoder produces means (mu) using the lin_mu output layer
        # and log variances (log_var) using the lin_var output layer
        # we compute the standard deviation (std) from the log variance
        self.mu = self.lin_mu(base_out)
        self.log_var = self.lin_var(base_out)
        std = torch.exp(self.log_var/2)
                
        # that's the internal random input (epsilon)
        eps = torch.randn_like(self.mu)
        # and that's the z vector
        z = self.mu + eps * std
        
        return z

In [9]:
set_seed(13)

z_size = 1
n_filters = 16
in_channels = 1
img_size_H = 249999
img_size_W = 8
input_shape = (in_channels, img_size_H, img_size_W)

base_model = nn.Sequential(
    # in_channels@28x28 -> n_filters@28x28
    nn.Conv2d(in_channels, n_filters, kernel_size=3, stride=1, padding=1),
    nn.LeakyReLU(),
            
    # n_filters@28x28 -> (n_filters*2)@14x14
    nn.Conv2d(n_filters, n_filters*2, kernel_size=3, stride=2, padding=1),
    nn.LeakyReLU(),
    
    # (n_filters*2)@14x14 -> (n_filters*2)@7x7
    nn.Conv2d(n_filters*2, n_filters*2, kernel_size=3, stride=2, padding=1),
    nn.LeakyReLU(),
    
    # (n_filters*2)@7x7 -> (n_filters*2)@7x7
    nn.Conv2d(n_filters*2, n_filters*2, kernel_size=3, stride=1, padding=1),
    nn.LeakyReLU(),
    
    # (n_filters*2)@7x7 -> (n_filters*2)*7*7
    nn.Flatten(),
)

encoder_var_cnn = EncoderVar(input_shape, z_size, base_model)

decoder_cnn = nn.Sequential(
    # z_size -> (n_filters*2)*7*7
    nn.Linear(z_size, (n_filters*2)*int(img_size_H/4)*int(img_size_W/4)),
    
    # (n_filters*2)*7*7 -> (n_filters*2)@7x7
    nn.Unflatten(1, (n_filters*2, int(img_size_H/4), int(img_size_W/4))),
    
    # (n_filters*2)@7x7 -> (n_filters*2)@7x7
    nn.ConvTranspose2d(n_filters*2, n_filters*2, kernel_size=3, stride=1, padding=1, output_padding=0),
    nn.LeakyReLU(),

    # (n_filters*2)@7x7 -> (n_filters*2)@14x14
    nn.ConvTranspose2d(n_filters*2, n_filters*2, kernel_size=(4,3), stride=2, padding=1, output_padding=1),
    nn.LeakyReLU(),
    
    # (n_filters*2)@15x15 -> n_filters@28x28
    nn.ConvTranspose2d(n_filters*2, n_filters, kernel_size=(4,3), stride=2, padding=1, output_padding=1),
    nn.LeakyReLU(),
    
    # n_filters@28x28 -> in_channels@28x28
    nn.ConvTranspose2d(n_filters, in_channels, kernel_size=3, stride=1, padding=1, output_padding=0),
    nn.Sigmoid(),
)

In [10]:
encoder_var_cnn

EncoderVar(
  (base_model): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01)
    (2): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01)
    (4): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.01)
    (6): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): LeakyReLU(negative_slope=0.01)
    (8): Flatten(start_dim=1, end_dim=-1)
  )
  (lin_mu): Linear(in_features=4000000, out_features=1, bias=True)
  (lin_var): Linear(in_features=4000000, out_features=1, bias=True)
)

In [11]:
decoder_cnn

Sequential(
  (0): Linear(in_features=1, out_features=3999936, bias=True)
  (1): Unflatten(dim=1, unflattened_size=(32, 62499, 2))
  (2): ConvTranspose2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (3): LeakyReLU(negative_slope=0.01)
  (4): ConvTranspose2d(32, 32, kernel_size=(4, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
  (5): LeakyReLU(negative_slope=0.01)
  (6): ConvTranspose2d(32, 16, kernel_size=(4, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
  (7): LeakyReLU(negative_slope=0.01)
  (8): ConvTranspose2d(16, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (9): Sigmoid()
)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model_vae_cnn = AutoEncoder(encoder_var_cnn, decoder_cnn)
model_vae_cnn.to(device)
loss_fn = nn.MSELoss(reduction='none')
optim = torch.optim.Adam(model_vae_cnn.parameters(), 0.0003)

num_epochs = 200

train_losses = []

reconstruction_loss_factor = 1

for epoch in range(1, num_epochs+1):
    batch_losses = []
    for i, (x, _) in enumerate(data):
        model_vae_cnn.train()
        x = x.to(device)
        #print(x.shape)

        # Step 1 - Computes our model's predicted output - forward pass
        yhat = model_vae_cnn(x)
        #print(yhat.shape)

        # Step 2 - Computes the loss
        loss = loss_fn(yhat, x).sum(dim=[1, 2, 3]).sum(dim=0)
        kl_loss = model_vae_cnn.enc.kl_loss().sum(dim=1).sum(dim=0)
        total_loss = reconstruction_loss_factor * loss + kl_loss

        # Step 3 - Computes gradients
        total_loss.backward()

        # Step 4 - Updates parameters using gradients and the learning rate
        optim.step()
        optim.zero_grad()
        
        batch_losses.append(np.array([total_loss.data.item(), loss.data.item(), kl_loss.data.item()]))

    # Average over batches
    train_losses.append(np.array(batch_losses).mean(axis=0))

    print(f'Epoch {epoch:03d} | Loss >> {train_losses[-1][0]:.4f}/{train_losses[-1][1]:.4f}/{train_losses[-1][2]:.4f}')