In [None]:
import torch
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import glob
import re
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from typing import Optional, Tuple, List, Callable, Any
import torch.nn as nn
from torch import Tensor
from torch.optim.lr_scheduler import StepLR

class path_1(Dataset):

    def __init__(self, root_path):
        self.mDataX = []
        self.mDataY = []

        for img_path in glob.glob(root_path + r'/*'):
            img = Image.open(img_path)
            img_data = np.array(img, dtype = float)/255
            self.mDataX.append(img_data)
            pattern = '[/\\\.]'
            img_na = re.split(pattern, img_path)[-2]
            
            if list(img_na)[-1] == 'T':
                self.mDataY.append([1])
            elif list(img_na)[-1] == 'F':
                self.mDataY.append([0])

        self.mDataX = torch.permute(torch.tensor(self.mDataX), (0,3,1,2))
        self.mDataY = torch.tensor(self.mDataY)
        
    def __getitem__(self, data_index):
        input_tensor = torch.tensor(self.mDataX[data_index])
        output_tensor = torch.tensor(self.mDataY[data_index])
        return input_tensor, output_tensor

    def __len__(self):
        return len(self.mDataX)

train_set = path_1(r'pro_set')
train_loader = torch.utils.data.DataLoader(train_set, batch_size = 1, shuffle = True)

$$output\_shape = \frac{input\_shape + 2{\times}padding - dilation{\times}(kernel\_size - 1) - 1}{stride} + 1$$

In [None]:
test_set = path_1(r'test_set')
test_loader = torch.utils.data.DataLoader(test_set, batch_size = 1, shuffle = False)

In [8]:
class BasicConv2d(torch.nn.Module):
    def __init__(self, in_channels: int, out_channels: int, **kwargs: Any) -> None:
        super().__init__()
        self.conv = torch.nn.Conv2d(in_channels, out_channels, bias=False, **kwargs)
        self.bn = torch.nn.BatchNorm2d(out_channels, eps=0.001)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv(x)
        x = self.bn(x)
        return F.relu(x, inplace=True)

class Res_inception(torch.nn.Module):
    def __init__(
        self, 
        in_channels: int,
        ch1x1: int,
        ch3x3red: int,
        ch3x3: int,
        ch5x5red: int,
        ch5x5: int,
        pool_proj: int,
        scale: float,
        conv_block: Optional[Callable[..., torch.nn.Module]] = None,
    ) -> None:
        super().__init__()
        if conv_block is None:
            conv_block = BasicConv2d
          
        cn_overall = ch1x1 + ch3x3 + ch5x5 + pool_proj

        self.scale = scale

        self.branch1 = conv_block(in_channels, ch1x1, kernel_size = 1)
        
        self.branch2 = nn.Sequential(
            conv_block(in_channels, ch3x3red, kernel_size=1), 
            conv_block(ch3x3red, ch3x3, kernel_size=3, padding=1)
        )
        
        self.branch3 = nn.Sequential(
            conv_block(in_channels, ch5x5red, kernel_size=1),
            conv_block(ch5x5red, ch5x5, kernel_size=3, padding=1)
        )
        
        self.branch4 = nn.Sequential(
            nn.MaxPool2d(kernel_size=3, stride=1, padding=1, ceil_mode=True),
            conv_block(in_channels, pool_proj, kernel_size=1)
        )

        self.res = conv_block(in_channels, cn_overall, kernel_size = 3, padding=1)

    def _forward(self, x: Tensor) -> List[Tensor]:
        branch1 = self.branch1(x)
        branch2 = self.branch2(x)
        branch3 = self.branch3(x)
        branch4 = self.branch4(x)
        res = self.res(x)
        branch_block = [branch1, branch2, branch3, branch4]
        
        return branch_block

    def forward(self, x: Tensor) -> Tensor:
        outputs_incep = self._forward(x) 
        outputs_incep = torch.cat(outputs_incep, 1)
        x = self.res(x)
        return F.relu(x + self.scale * outputs_incep)


class googleNet(torch.nn.Module):
    def __init__(self,
                dropout = 0.2,
                num_classes = 1):
        super().__init__()
        
        self.conv1 = BasicConv2d(3, 64, kernel_size=5, stride=2, padding=3)
        self.maxpool1 = nn.MaxPool2d(3, stride=2, ceil_mode=True)
        self.conv2 = BasicConv2d(64, 64, kernel_size=1)
        self.conv3 = BasicConv2d(64, 192, kernel_size=3, padding=1)
        self.maxpool2 = nn.MaxPool2d(3, stride=2, ceil_mode=True)
        
        self.inception3a = Res_inception(192, 64, 96, 128, 16, 32, 32, 1.0)
        self.inception3b = Res_inception(256, 128, 128, 192, 32, 96, 64, 1.0)
        self.maxpool3 = nn.MaxPool2d(3, stride=2, ceil_mode=True)
        
        self.inception4a = Res_inception(480, 192, 96, 208, 16, 48, 64, 1.0)
        self.inception4b = Res_inception(512, 160, 112, 224, 24, 64, 64, 1.0)
        self.inception4c = Res_inception(512, 128, 128, 256, 24, 64, 64, 1.0)
        self.inception4d = Res_inception(512, 112, 144, 288, 32, 64, 64, 1.0)
        self.inception4e = Res_inception(528, 256, 160, 320, 32, 128, 128, 1.0)
        self.maxpool4 = nn.MaxPool2d(2, stride=2, ceil_mode=True)
        
        self.inception5a = Res_inception(832, 256, 160, 320, 32, 128, 128, 1.0)
        self.inception5b = Res_inception(832, 384, 192, 384, 48, 128, 128, 1.0)
    
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.dropout = nn.Dropout(p = dropout)
        
        self.fc1 = torch.nn.Linear(1024, 256)
        self.fc2 = torch.nn.Linear(256, 32)
        self.fc3 = torch.nn.Linear(32, num_classes)
        
    def forward(self, x):
        batch_size = x.size(0)
        x = self.maxpool1(self.conv1(x))
        x = self.conv3(self.conv2(x))
        x = self.maxpool2(x)
        
        x = self.inception3a(x)
        x = self.inception3b(x)
        x = self.maxpool3(x)
        x = self.inception4a(x)
        
        x = self.inception4b(x)
        x = self.inception4c(x)
        x = self.inception4d(x)
        
        x = self.inception4e(x)
        x = self.maxpool4(x)
        x = self.inception5a(x)
        x = self.inception5b(x)
        
        x = self.avgpool(x)
        
        x = x.view(batch_size, -1)#flatten
        x = self.dropout(x)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))

        return x

def train(epoch):
    loss_val = []
    running_loss = 0.
    
    for batch_idx, data in enumerate(train_loader, 0):#minibatch..
        
        inputs, labels = data#scalar X, Y
        inputs = inputs.float()
        labels = labels.float()
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()#optimizer RESET

        #forward #backward
        outputs = model(x = inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        
        optimizer.step()
        running_loss += loss.item()
        loss_val.append(running_loss)
        if batch_idx % len(train_set) == len(train_set) - 1 and epoch % 100 == 0:
            print('[%d, %5d] loss: %.3f' % (epoch+1, batch_idx+1, running_loss))
            running_loss = 0.
        if batch_idx % len(train_set) == len(train_set) - 1 and epoch == 999:
            print('[%d, %5d] loss: %.3f' % (epoch+1, batch_idx+1, running_loss))
            running_loss = 0.
    return np.mean(loss_val) 

def test():
    correct, total = 0, 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images = images.float()
            labels = labels.float()
            images, labels = images.to(device), labels.to(device)
            outputs = model(x = images)
            predicted, _ = torch.max(outputs.data, dim = 1)
            #total += torch.squeeze(labels)
            predicted = torch.squeeze(predicted)
            correct += (torch.round(predicted) == labels).sum().item()
            print(f'predict:{torch.round(predicted)}\nTrue label:{labels}')
    print('Accuracy on test set: %d %%' % (100 * correct / len(test_set)))    

In [None]:
#Training process
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = googleNet().to(device)
criterion = torch.nn.BCELoss(size_average = True)#??##?_?

optimizer = optim.SGD(model.parameters(), lr = 0.0075, momentum = 0.9)
scheduler = StepLR(optimizer, step_size=30, gamma=0.96)


if __name__ == '__main__':
    loss_curv = []
    for epoch in range(1000):
        #train(epoch)
        loss_val = train(epoch)
        loss_curv.append(loss_val)
        scheduler.step()
    plt.plot(loss_curv)
    #test()

torch.save(model.state_dict(),
'models_cache/googlenet_decay2.pth')

In [None]:
#testing process
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
PATH = 'models_cache/googlenet_decay2.pth'
model = googleNet().to(device)
model.load_state_dict(torch.load(PATH))
model.eval()
test()

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)