In [1]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import torch
from torch import nn
import torch.nn.functional as F
import os
from torch.utils.data import Dataset
import cv2
from tqdm import tqdm
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import random
import torchvision

In [3]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [4]:
class MyDataset(Dataset):

    def __init__(self, len, home_directory, noise=2, mode="Train",augment=False,other_dataset=False):
      self.augment = augment
      if self.augment:
        self.len = 3*len
      else:
        self.len = len
      self.examples = []
      self.iter_index = 0
      self.other=other_dataset
      self.X = torch.empty((self.len, 128,128))
      self.Y = torch.empty((self.len,128,128), dtype=torch.long)
      self.input_directory = os.path.join(home_directory, mode, 'input')
      self.mask_directory = os.path.join(home_directory, mode, 'mask')
      self.augmentation = augment
      print("dataset input path {}".format(self.input_directory))
      print("dataset mask path {}".format(self.mask_directory))
      print(self.len)
      # mask_names = os.listdir(self.mask_directory)
      # mask_names.sort()
      
      self.set_dataset()
      # self.set_dataset(self.mask_directory, mask_names, False)

    def set_dataset(self):
      input_names = os.listdir(self.input_directory)
      input_names.sort()
      index = 0
      for name in input_names:
        img_path = self.input_directory+'/'+name
        if not self.other :
          mask_path =self.mask_directory+'/'+'mask_'+name
        else:
          mask_path =self.mask_directory+'/'+name
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = img/255
        resize_img = cv2.resize(img, (128,128))

        mask = cv2.imread(mask_path)
        # print(mask.shape)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
        mask = mask/255
        resize_mask = cv2.resize(mask, (128,128))

        self.X[self.iter_index] = torch.tensor(resize_img)
        resized_mask = torch.from_numpy(resize_mask).float()
        self.Y[self.iter_index] = resized_mask
        self.iter_index += 1
        if self.augment:
          self.data_augmentation(resize_img, resize_mask)
      
    def data_augmentation(self, image, mask):
      # image = torch.tensor(image).float()
      choices = [1,2,4]
      first = random.choice(choices)
      choices.remove(first)
      second = random.choice(choices)
      option = [first, second]

      if 1 in option:
        newimg, newmask = self.randomHorizontalFlip(image, mask)
        self.X[self.iter_index] = newimg
        self.Y[self.iter_index] = newmask
        self.iter_index += 1
      if 2 in option:
        newimg, newmask = self.randomVerticalFlip(image, mask)
        self.X[self.iter_index] = newimg
        self.Y[self.iter_index] = newmask
        self.iter_index += 1
      if 3 in option: 
        newimg, newmask = self.randZoom(image, mask)
        self.X[self.iter_index] = newimg
        self.Y[self.iter_index] = newmask
        self.iter_index += 1
      if 4 in option:
        newimg, newmask = self.randGaussianBlur(image, mask)
        self.X[self.iter_index] = newimg
        self.Y[self.iter_index] = newmask
        self.iter_index += 1
    
    def randomHorizontalFlip(self, image, mask):
      for i in range(image.shape[0]):
        flip_index = torch.arange(len(image[i])-1, -1, -1)
        image[i] = image[i][flip_index]
        mask_flip_index = torch.arange(len(mask[i])-1, -1, -1)
        mask[i] = mask[i][mask_flip_index]
      return torch.tensor(image), torch.from_numpy(mask).float()
    def randomVerticalFlip(self, img, mask):
      flip_index_v = torch.arange(len(img)-1,-1,-1)
      img = img[flip_index_v]
      mask_flip_index_v = torch.arange(len(mask)-1,-1,-1)
      mask = mask[mask_flip_index_v]
      return torch.tensor(img), torch.from_numpy(mask).float()
    
    def randZoom(self, image, mask):
      resize_coefficient = random.uniform(1,2)
      new_size = int(128*resize_coefficient)
      new_image = cv2.resize(image, (new_size, new_size))
      new_mask = cv2.resize(mask, (new_size, new_size))
      startx = random.randint(0, new_size-128)
      starty = random.randint(0, new_size-128)
      image = new_image[startx:startx+128, starty:starty+128]
      mask = new_mask[startx:startx+128, starty:starty+128]
      return torch.tensor(image), torch.from_numpy(mask).float()
    
    def randGaussianBlur(self, image, mask):
      image = cv2.GaussianBlur(image,(5,5),0)
      return torch.tensor(image), torch.from_numpy(mask).float()

        

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return (self.X[idx], self.Y[idx])
      


In [5]:
class UnetModel(nn.Module):
  def conv(self, in_channels, out_channels):
    block = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=(1,1)),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(),
        nn.Conv2d(out_channels, out_channels, kernel_size=3,padding=(1,1)),
        nn.BatchNorm2d(out_channels),
        nn.ReLU()
    )
    return block
  
  def up_conv(self, in_channels, out_channels):
    block = nn.Sequential(
        nn.Upsample(scale_factor=2),
        nn.Conv2d(in_channels, out_channels, kernel_size=3,padding=(1,1)),
        nn.BatchNorm2d(out_channels),
        nn.ReLU()
    )
    return  block
  
  def __init__(self, in_channel, out_channel):
    super(UnetModel, self).__init__()
    
    self.conv1 = self.conv(in_channel,64)
    self.conv1_maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
    self.conv2 = self.conv(64, 128)
    self.conv2_maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
    self.conv3 = self.conv(128, 256)
    self.conv3_maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
    self.conv4 = self.conv(256, 512)
    self.conv4_maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
    self.conv5 = self.conv(512, 1024)
    
    self.up_conv4 = self.up_conv(1024, 512)
    self.up4_conv =self.conv(1024,512)
    self.up_conv3 = self.up_conv(512, 256)
    self.up3_conv = self.conv(512,256)
    self.up_conv2 = self.up_conv(256,128)
    self.up2_conv = self.conv(256,128)
    self.up_conv1 = self.up_conv(128,64)
    self.up1_conv = self.conv(128,64)
    
    self.conv_1x1 = nn.Conv2d(64,out_channel,kernel_size=1)
    self.sigmoid = nn.Sigmoid()
  def forward(self, x):
    out1 = self.conv1(x)
    
    out2 = self.conv1_maxpool(out1)
    out2 = self.conv2(out2)
    
    out3 = self.conv2_maxpool(out2)
    out3 = self.conv3(out3)
    
    out4 = self.conv3_maxpool(out3)
    out4 = self.conv4(out4)
    
    out5 = self.conv4_maxpool(out4)
    out5 = self.conv5(out5)
    
    exp5 = self.up_conv4(out5)
    exp5 = torch.cat((out4, exp5), dim=1)
    exp5 = self.up4_conv(exp5)
    
    exp4 = self.up_conv3(exp5)
    exp4 = torch.cat((out3, exp4), dim=1)
    exp4 = self.up3_conv(exp4)
    
    exp3 = self.up_conv2(exp4)
    exp3 = torch.cat((out2, exp3), dim=1)
    exp3 = self.up2_conv(exp3)
    
    exp2 = self.up_conv1(exp3)
    exp2 = torch.cat((out1, exp2), dim=1)
    exp2 = self.up1_conv(exp2)
    
    exp1 = self.conv_1x1(exp2)
    exp1 = self.sigmoid(exp1)
    return exp1
    
    


In [None]:
dataset_train = MyDataset(301,'/content/drive/My Drive/A3/person', augment=False, other_dataset=True)
trainloader = torch.utils.data.DataLoader(dataset_train, batch_size=20, shuffle=False)

# dataset_test = MyDataset(20,'/content/drive/My Drive/A3/cat_data/cat_data', 'Test')

model = UnetModel(1, 1)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.7)

epochs = 30
model.train()
a = True
for e in range(epochs):
  running_loss = 0
  for images, labels in tqdm(trainloader):
    optimizer.zero_grad()
    images = images.unsqueeze(1)
    labels = labels.unsqueeze(1)
    labels = labels.float()
    log_ps = model(images)
    loss = criterion(log_ps, labels)
    loss.backward()
    optimizer.step()
    
    running_loss += loss.item()
  else:
    print(f"Traning loss: {running_loss/len(trainloader)}")

dataset input path /content/drive/My Drive/A3/person/Train/input
dataset mask path /content/drive/My Drive/A3/person/Train/mask
301


100%|██████████| 16/16 [08:55<00:00, 33.50s/it]


Traning loss: 0.2721545221284032


100%|██████████| 16/16 [08:47<00:00, 32.98s/it]


Traning loss: 0.16573250200599432


100%|██████████| 16/16 [08:58<00:00, 33.69s/it]


Traning loss: 0.1466201520524919


100%|██████████| 16/16 [08:53<00:00, 33.37s/it]


Traning loss: 0.13419086951762438


100%|██████████| 16/16 [08:47<00:00, 32.95s/it]


Traning loss: 0.13100110366940498


100%|██████████| 16/16 [09:00<00:00, 33.78s/it]


Traning loss: 0.12037454638630152


100%|██████████| 16/16 [08:58<00:00, 33.64s/it]


Traning loss: 0.11638010898604989


100%|██████████| 16/16 [08:51<00:00, 33.23s/it]


Traning loss: 0.104615218937397


100%|██████████| 16/16 [08:48<00:00, 33.04s/it]


Traning loss: 0.10234654089435935


100%|██████████| 16/16 [09:00<00:00, 33.81s/it]


Traning loss: 0.09725007927045226


100%|██████████| 16/16 [09:04<00:00, 34.02s/it]


Traning loss: 0.08965142676606774


100%|██████████| 16/16 [09:00<00:00, 33.80s/it]


Traning loss: 0.08134133229032159


 44%|████▍     | 7/16 [04:10<05:22, 35.80s/it]

In [None]:
dataset_train_origin = MyDataset(60,'/content/drive/My Drive/A3/cat_data/cat_data', augment=True)
trainloader_origin = torch.utils.data.DataLoader(dataset_train_origin, batch_size=20, shuffle=True)

# dataset_test = MyDataset(20,'/content/drive/My Drive/A3/cat_data/cat_data', 'Test')

# model = UnetModel(1, 1)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.7)

epochs = 20
# model.train()
a = True
for e in range(epochs):
  running_loss = 0
  for images, labels in tqdm(trainloader_origin):
    optimizer.zero_grad()
    images = images.unsqueeze(1)
    labels = labels.unsqueeze(1)
    labels = labels.float()
    log_ps = model(images)
    loss = criterion(log_ps, labels)
    loss.backward()
    optimizer.step()
    
    running_loss += loss.item()
  else:
    print(f"Traning loss: {running_loss/len(trainloader)}")

In [None]:
dataset_test = MyDataset(21,'/content/drive/My Drive/A3/cat_data/cat_data', mode='Test', augment=False)
testloader = torch.utils.data.DataLoader(dataset_test, batch_size=1, shuffle=True)
running_loss=0
with torch.no_grad():
  for images, labels in tqdm(testloader):
      optimizer.zero_grad()
      input = images.unsqueeze(1)
      labels = labels.unsqueeze(1)
      log_ps = model(input)
      i = log_ps.squeeze()
      l = labels.squeeze()
      print(i.shape)
      print(l.shape)
      final = images.squeeze()
      final = i*final
      
      plt.subplot(1, 3, 1)
      plt.imshow(i,cmap='gray')
      plt.subplot(1, 3, 2)
      plt.imshow(final,cmap='gray')
      plt.subplot(1, 3, 3)
      plt.imshow(l,cmap='gray')
      plt.show()

      loss = criterion(log_ps, labels)
      running_loss += loss.item()
print(f"Test loss: {running_loss/len(testloader)}")