<a href="https://colab.research.google.com/github/samuel0711/image-anomaly-detection/blob/main/con_tcc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [128]:
import io
import sys
import os
import gc
import shutil
import numpy as np
import matplotlib.pyplot as plt
from skimage import io as skio
import torch
from torch import nn #neural networks
import torch.nn.functional as F

import time

#Carregamento de Dados
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision import transforms



if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive')
    os.chdir("/content/drive/My Drive/Colab Notebooks")
    sys.path.append('/content/drive/My Drive/Colab Notebooks')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [129]:
args = {
    'epoch':50,          #Numero de épocas
    'lr':1e-3,            #Taxa de aprendizado
    'weight_decay':5e-3,  #Penalidade L2 (Regularização)
    'batch_size': 50,     #Tamanho do batch
    'num_workers': 4
}

if torch.cuda.is_available():
  args['device'] = torch.device('cuda')
else:
  args['device'] = torch.device('cpu')

print(args['device'])


train_set = np.load('train_dataset.npy', allow_pickle = True)
test_set = np.load('test_dataset.npy', allow_pickle = True)

cuda


In [130]:
train_set[-1].dtype
#train = torch.from_numpy(train_set)
#test = torch.from_numpy(test_set)

dtype('int8')

In [131]:
train_loader = DataLoader(train_set, 
                          batch_size=args['batch_size'], 
                          shuffle=True, 
                          num_workers=args['num_workers'])

test_loader = DataLoader(test_set, 
                         batch_size=args['batch_size'], 
                         shuffle=True, 
                         num_workers=args['num_workers']) 

  cpuset_checked))


In [155]:
#Define the Convolutional Autoencoder
class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
       
        #Encoder
        self.conv1 = nn.Conv2d(1, 4, 3, padding=1)  #in_channel, out_channel, kernel_size
        self.conv2 = nn.Conv2d(4, 16, 3, padding=1)
        self.conv3 = nn.Conv2d(16, 4, 3, padding=1) 
        self.pool = nn.MaxPool2d(2, 2) #kernel_size, stride
       
        #Decoder
        self.t_conv1 = nn.ConvTranspose2d(4, 16, 2, stride=2)
        self.t_conv2 = nn.ConvTranspose2d(16, 4, 2, stride=2)
        self.t_conv3 = nn.ConvTranspose2d(4, 1, 2, stride=2)


    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = F.relu(self.t_conv1(x))
        x = F.relu(self.t_conv2(x))
        x = torch.sigmoid(self.t_conv3(x))
              
        return x


#Instantiate the model
model = ConvAutoencoder()


# **Treinando**

In [156]:
from torch import optim

criterio = nn.MSELoss().to(args['device'])
optimizer = optim.Adam(model.parameters(), lr=args['lr'], weight_decay=args['weight_decay'])
model.to(args['device'])

ConvAutoencoder(
  (conv1): Conv2d(1, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(4, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(16, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (t_conv1): ConvTranspose2d(4, 16, kernel_size=(2, 2), stride=(2, 2))
  (t_conv2): ConvTranspose2d(16, 4, kernel_size=(2, 2), stride=(2, 2))
  (t_conv3): ConvTranspose2d(4, 1, kernel_size=(2, 2), stride=(2, 2))
)

## Treino ( Somente com imagens não-anomalas)

In [160]:
def train(train_loader, model, epoch):
  ###################
  # treinando o modelo #
  ###################
  model.train()
  
  start = time.time()

  epoch_loss_train = []
  train_loss = 0.0
  for data in train_loader:
    imagem = data.unsqueeze(1).float()
    imagem /= 255.0
    imagem = imagem.to(args['device'])

    # clear the gradients of all optimized variables
    optimizer.zero_grad()

    # forward pass
    output = model(imagem)

    # calculando loss
    loss = criterio(output, imagem)
    epoch_loss_train.append(loss.detach().item())

    # backpropagation
    loss.backward()

    # próximo passo
    optimizer.step()

    # update running training loss
    train_loss += loss.item()*imagem.size(0)

  train_loss = train_loss/len(train_loader)
  epoch_loss_train = np.array(epoch_loss_train)

  end = time.time()

  print('########## Train ##########')
  print('Epoch: %d, Loss: %.4f +/- %.4f, Time: %.2f\n'% (epoch, epoch_loss_train.mean(), epoch_loss_train.std(), end-start))

  print("Epoca: {}\t Train Loss: {:.3f}\n".format(epoch, train_loss))
  #print("Medía do Loss:", epoch_loss_train.mean())

  return epoch_loss_train.mean()

## Teste ( Somente com imagens anômalas )

In [161]:
def validate(test_loader, model, epoch):
  ###################
  # validando o modelo #
  ###################
  model.eval()

  start = time.time()
  
  epoch_loss_test = []
  test_loss = 0.0
  with torch.no_grad():
    for data in test_loader:
      imagem = data.unsqueeze(1).float()
      imagem /= 255.0
      imagem = imagem.to(args['device'])

      # forward pass
      output = model(imagem)

      # calculando loss
      loss = criterio(output, imagem)
      epoch_loss_test.append(loss.detach().item())

      # update running test loss
      test_loss += loss.item()*imagem.size(0)

    test_loss = test_loss/len(test_loader)
    epoch_loss_test = np.array(epoch_loss_test)
    end = time.time()

    print('########## Test ##########')
    print('Epoch: %d, Loss: %.4f +/- %.4f, Time: %.2f\n'% (epoch, epoch_loss_test.mean(), epoch_loss_test.std(), end-start))
    
    print("Epoca: {}\t Test Loss: {:.3f}\n".format(epoch, test_loss))
    #print("Medía do Loss:", epoch_loss_test.mean())

    return epoch_loss_test.mean()

## Rodando Teste e Treino

In [162]:
train_losses = []

for epoch in range(args['epoch']):
  #Train
  train_losses.append(train(train_loader, model, epoch))

  cpuset_checked))


########## Train ##########
Epoch: 0, Loss: 0.2189 +/- 0.1080, Time: 9.99

Epoca: 0	 Train Loss: 10.943

########## Train ##########
Epoch: 1, Loss: 0.0726 +/- 0.0011, Time: 9.82

Epoca: 1	 Train Loss: 3.632

########## Train ##########
Epoch: 2, Loss: 0.0715 +/- 0.0007, Time: 9.92

Epoca: 2	 Train Loss: 3.577

########## Train ##########
Epoch: 3, Loss: 0.0712 +/- 0.0009, Time: 9.89

Epoca: 3	 Train Loss: 3.562

########## Train ##########
Epoch: 4, Loss: 0.0711 +/- 0.0008, Time: 9.89

Epoca: 4	 Train Loss: 3.554

########## Train ##########
Epoch: 5, Loss: 0.0710 +/- 0.0008, Time: 9.97

Epoca: 5	 Train Loss: 3.550

########## Train ##########
Epoch: 6, Loss: 0.0710 +/- 0.0008, Time: 9.99

Epoca: 6	 Train Loss: 3.548

########## Train ##########
Epoch: 7, Loss: 0.0709 +/- 0.0009, Time: 9.94

Epoca: 7	 Train Loss: 3.547

########## Train ##########
Epoch: 8, Loss: 0.0709 +/- 0.0009, Time: 9.95

Epoca: 8	 Train Loss: 3.546

########## Train ##########
Epoch: 9, Loss: 0.0709 +/- 0.0009, 

In [None]:
test_losses = []

for epoch in range(args['epoch']):

  #Validate
  test_losses.append(validate(test_loader, model, epoch))

## Plot Treino x Teste

In [None]:
 plt.plot(train_losses)
 plt.plot(test_losses)
 plt.ylabel('Loss')
 plt.xlabel('Epochs')

## Plot da Imagem Original(normalizada) x Imagem Reconstruída via AutoEncoder Convolucional

In [None]:
# obtain one batch of test images
dataiter = iter(test_loader)
images = dataiter.next()

imagem = images.unsqueeze(1).float()
imagem /= 255.0
imagem = imagem.to(args['device'])

# get sample outputs
output = model(imagem)
# prep images for display
images = images.numpy()


# output is resized into a batch of iages
output = output.view(args['batch_size'], 1, 160, 240)
# use detach when it's an output that requires_grad
output = output.detach()#.numpy()



# plot the first ten input images and then reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=10, sharex=True, sharey=True, figsize=(24,4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    plt.imshow(output[idx].squeeze().cpu(), cmap='gray')
    
# plot the first ten input images and then reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=10, sharex=True, sharey=True, figsize=(24,4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    plt.imshow(images[idx].squeeze(), cmap='gray')
