## import module...

In [3]:
import torch
import torchvision
import torch.nn as nn  # used to build network

import torch.optim as optim # optimizer
from torch.utils.data import DataLoader,Dataset # build dataset and dataloader

import torchvision.transforms.functional as func
import torchvision.transforms as transforms # transform to do data augmentation
from PIL import Image # PIL to read image

from tqdm import tqdm # progress bar
import os # read img from file system
import time

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

## set random seed

In [None]:
seed = 7
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

<img src="./../src/img/unet.png" width=720>

In [17]:
class DoubleConv(nn.Module):
    def __init__(self, in_channel, out_channel):
        super(DoubleConv, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, 3, 1, 0, bias=False),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channel, out_channel, 3, 1, 0, bias=False),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        return self.block(x)


class DownBlock(nn.Module):
    """
    down part of unet, follow the struct of: maxpool -> conv -> conv
    """
    def __init__(self, in_channel, out_channel):
        super(DownBlock, self).__init__()
        self.block = nn.Sequential(
            nn.MaxPool2d(2, 2),
            DoubleConv(in_channel, out_channel)
        )

    def forward(self, x):
        return  self.block(x)

class UpBlock(nn.Module):
    """
    up part of unet, follow the struct of: conv -> conv -> up conv
    mind that: the first conv get its input from both previous layer and down part's output
    """
    def __init__(self, in_channel, inter_channel, out_channel):
        super(UpBlock, self).__init__()
        self.block = nn.Sequential(
            DoubleConv(in_channel, inter_channel),
            nn.ConvTranspose2d(inter_channel, out_channel, 2, 2)
        )

    def forward(self, x, down_part_data):
        x = torch.cat((x, down_part_data), dim=1)
        return self.block(x)

In [20]:
class UNet(nn.Module):
    """
    model to do segmentation, out_dim represent the num class in segmentation.
    """
    def __init__(self, img_channel, out_dim):
        super(UNet, self).__init__()
        self.img_channel = img_channel
        self.out_dim = out_dim


        self.init = DoubleConv(3, 64)
        self.down1 = DownBlock(64, 128)
        self.down2 = DownBlock(128,256)
        self.down3 = DownBlock(256, 512)
        self.down4 = DownBlock(512, 1024)

        self.bottle_neck = nn.ConvTranspose2d(1024, 512, 2, 2)

        self.up1 = UpBlock(1024, 512, 256)
        self.up2 = UpBlock(512, 256, 128)
        self.up3 = UpBlock(256, 128, 64)
        self.up4 = DoubleConv(128, 64)

        # final conv's is a conv1x1
        self.final_conv = nn.Conv2d(64, out_dim, 1)

    def forward(self, x):
        d1 = self.init(x)
        d2 = self.down1(d1)
        d3 = self.down2(d2)
        d4 = self.down3(d3)
        d5 = self.down4(d4)
        bottle = self.bottle_neck(d5)

        u1 = self.up1(bottle, func.resize(d4, [56,56]))
        u2 = self.up2(u1, func.resize(d3, [104, 104]))
        u3 = self.up3(u2, func.resize(d2, [200, 200]))
        u4_input = torch.cat((u3, func.resize(d1, [392, 392])), dim=1)
        u4 = self.up4(u4_input)
        return self.final_conv(u4)

In [21]:
def test_model():
    model = UNet(3, 2)
    x = torch.randn(8, 3, 572, 572)
    print(model(x).shape)
test_model()

torch.Size([8, 512, 56, 56]) torch.Size([8, 512, 56, 56])
torch.Size([8, 256, 104, 104]) torch.Size([8, 256, 104, 104])
torch.Size([8, 128, 200, 200]) torch.Size([8, 128, 200, 200])
torch.Size([8, 2, 388, 388])


## define params

In [None]:
LEARNING_RATE = 1e-3
IMAGE_CHANNEL = 3
OUT_DIM = 2
NUM_EPOCH = 5
BATCH_SIZE = 64
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
model = UNet(IMAGE_CHANNEL, OUT_DIM).to(DEVICE) # instance model
# define optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# define loss function
criterion = nn.BCEWithLogitsLoss()
# define checkpoint for model saving
model_checkpoint = {
    'model':None,
    'optimizer':None,
}

## prepare data

In [67]:
class CarDataset(Dataset):
    def __init__(self, transform, data_path="./../dataset/segmentation/"):
        self.data_path = data_path
        self.img_lst = os.listdir(self.data_path + "train/")
        self.transform = transform

    def __getitem__(self, idx):
        img_name = self.img_lst[idx]
        x_path = self.data_path + "train/" + img_name
        y_path = (self.data_path + "train_masks/" + img_name).replace(".jpg", "_mask.gif")
        x = Image.open(x_path).convert("RGB")
        y = Image.open(y_path).convert("L")
        if self.transform:
            x = self.transform["x_transform"](x)
            y = self.transform["y_transform"](y)
        return x, y

    def __len__(self):
        return len(self.img_lst)

In [68]:
data_transform = {
"x_transform" : transforms.Compose([
    transforms.Resize((572, 572)),
    transforms.ToTensor()
]),
"y_transform" : transforms.Compose([
    transforms.Resize((388, 388)),
    transforms.ToTensor()
])}

In [69]:
train_dataset = CarDataset(data_transform)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

## training loop

In [None]:
for epoch in range(NUM_EPOCH):
    loop = tqdm(train_dataloader, leave=True)
    for batch_idx, (x, y) in enumerate(loop):
        model.train()
        ### define training loop here  ###
        pred = model(x)
        loss = criterion(pred, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss=loss.item())

        ##################################
        if batch_idx % 10 == 0:
            model.eval()
            with torch.no_grad():
                # check acc
                check_acc()

    # saving models
    model_checkpoint['model'] = model.state_dict()
    model_checkpoint['optimizer'] = optimizer.state_dict()
    save_checkpoint(model_checkpoint)

In [77]:
import scipy.io
mat = scipy.io.loadmat("./../dataset/human_pose/joints.mat")

In [82]:
data=mat["joints"]

In [86]:
data[:,:,0]

array([[  0.        , -26.10911452,   0.        ],
       [ 26.03094352,  86.41022512,   1.        ],
       [ 50.60790641,  75.70079791,   1.        ],
       [ 71.72971103,  87.56715594,   1.        ],
       [ 49.16955998,  89.00550237,   1.        ],
       [ 23.13861646,  94.79015649,   1.        ],
       [131.31164842,  13.80499887,   1.        ],
       [111.64382442,  34.62975369,   1.        ],
       [ 93.71139667,  49.09138898,   1.        ],
       [102.38837784,  68.46216316,   1.        ],
       [110.78394341,  88.42703696,   1.        ],
       [139.12874857,  89.58396778,   1.        ],
       [ 96.88513933,  56.61143933,   1.        ],
       [116.56859752,  53.42206246,   1.        ]])