In [1]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torchvision.datasets as dset
import torchvision.transforms as transforms
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.models as models
import cv2
import sys
import math
import random
import splitfolders
import torchsummary
from tqdm.auto import tqdm
from ResNet_18 import resnet
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.metrics import f1_score

In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

device(type='cuda')

In [3]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':50,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':32,
    'SEED':42
}

In [4]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED'])

In [5]:
train_df = pd.read_csv('./new_train_data.csv')
test_df = pd.read_csv('./new_test_data.csv')

In [6]:
le = LabelEncoder()
le = le.fit(train_df['action'])
train_df['action'] = le.transform(train_df['action'])
test_df['action'] = le.transform(test_df['action'])

In [7]:
train, val, _, _ = train_test_split(train_df, train_df['action'], test_size=0.1, random_state=CFG['SEED'])

In [8]:
tfms = A.Compose([
    A.Resize(width=CFG['IMG_SIZE'], height=CFG['IMG_SIZE']),
    A.Normalize()
], p=1)

In [9]:
class RPDataset(Dataset):
    def __init__(self, df, rp_path_list, label_list, tfms=None):
        super().__init__()
        self.df = df
        self.rp_path_list = rp_path_list
        self.label_list = label_list
        self.tfms = tfms
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        img = cv2.imread(self.rp_path_list[idx])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        image = self.tfms(image=img)['image']
        image = torch.tensor(np.array(image)).permute(2, 0, 1)
        
        if self.label_list is not None:
            label = self.label_list[idx]
            return image, label
        else:
            return image

In [10]:
train_dataset = RPDataset(df=train, rp_path_list=train['img_path'].values, label_list=train['action'].values, tfms=tfms)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = RPDataset(df=val,rp_path_list=val['img_path'].values, label_list=val['action'].values, tfms=tfms)
val_loader = DataLoader(val_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [11]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_val_score = 0
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for images, labels in tqdm(iter(train_loader)):
            images = images.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            output = model(images)
            loss = criterion(output, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val F1 : [{_val_score:.5f}]')
        
        if scheduler is not None:
            scheduler.step(_val_score)
            
        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
            
            torch.save(model, 'save_model/' + '0430_res_{}_att.pth'.format(epoch))

    
    return best_model

In [12]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, trues = [], []
    
    with torch.no_grad():
        for images, labels in tqdm(iter(val_loader)):
            images = images.to(device)
            labels = labels.to(device)
            
            logit = model(images)
            
            loss = criterion(logit, labels)
            
            val_loss.append(loss.item())
            
            preds += logit.argmax(1).detach().cpu().numpy().tolist()
            trues += labels.detach().cpu().numpy().tolist()
        
        _val_loss = np.mean(val_loss)
    
    _val_score = f1_score(trues, preds, average='micro')
    return _val_loss, _val_score

In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F

in_planes = 64
num_classes = 4

def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        bias=False,
        padding = 1,
        padding_mode='zeros'
    )


class Residual(nn.Module):
    def __init__(self, numIn, numOut, stride = 1):
        super(Residual, self).__init__()
        self.numIn = numIn
        self.numOut = numOut
        self.cab = CAB(numOut)
        self.stride = stride
        self.conv1 = nn.Conv2d(self.numIn, self.numOut, bias = False, kernel_size = 3,stride = self.stride,padding = 1)
        self.bn1 = nn.BatchNorm2d(self.numOut)
        self.relu = nn.ReLU(inplace = True)
        self.conv2 = nn.Conv2d(self.numOut, self.numOut, bias = False, kernel_size = 3, stride = self.stride, padding = 1)
        self.bn2 = nn.BatchNorm2d(self.numOut)
        
        if self.numIn != self.numOut:
            self.conv4 = nn.Conv2d(self.numIn, self.numOut, bias = True, kernel_size = 1)
            
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        
        if self.numIn != self.numOut:
            residual = self.conv4(x)
        
        return out + residual

    

class ChannelAttention(nn.Module):
    def __init__(self, channel):
        super(ChannelAttention, self).__init__()
        self.attention = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channel, channel, 1, padding=0, bias=True),
            nn.ReLU(inplace=True),
            nn.Conv2d(channel, channel, 1, padding=0, bias=True),
            nn.Sigmoid())
    def forward(self, x):
        y = self.attention(x)
        return x * y
    
    
class CAB(nn.Module):
    def __init__(self, channel):
        super(CAB, self).__init__()
        self.cab = nn.Sequential(
            nn.Conv2d(channel, channel, kernel_size=3, padding=1, stride=1),
            nn.GELU(),
            nn.Conv2d(channel, channel, kernel_size=3, padding=1, stride=1),
            ChannelAttention(channel)
            )
    def forward(self, x):
        return self.cab(x)
    
    
class ResNet(nn.Module):
    def __init__(self, in_planes, num_classes):
        super(ResNet, self).__init__()
        self.in_planes = in_planes
        self.conv = nn.Conv2d(3, self.in_planes, kernel_size = 3, stride = 1, padding = 3, padding_mode='zeros', bias=False)
        self.conv_g = nn.Conv2d(2, self.in_planes, kernel_size = 3, stride = 1, padding = 1, padding_mode='zeros', bias=False)
        self.cab = CAB(in_planes)
        self.bn = nn.BatchNorm2d(self.in_planes)
        self.relu = nn.ReLU(inplace = True)
        self.res1 = Residual(in_planes,in_planes)
        self.res2 = Residual(in_planes,in_planes)
        self.res3 = Residual(in_planes,in_planes)
        self.res4 = Residual(in_planes,in_planes)
        self.res5 = Residual(in_planes,in_planes)
        self.res6 = Residual(in_planes,in_planes)
        self.res7 = Residual(in_planes,in_planes)
        self.res8 = Residual(in_planes,in_planes)
        self.res7 = Residual(in_planes,in_planes)
        self.res8 = Residual(in_planes,in_planes)
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.avgpool = nn.AvgPool2d(2, stride=2)
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
#         self.linear  = nn.Linear(in_planes * 2, num_classes)
        self.linear  = nn.Linear(in_planes, num_classes)
    
    def forward(self, x):
        out = self.conv(x)
        out = self.cab(out)
        out = self.bn(out)
        out = self.relu(out)
        out = self.maxpool(out)
        out = self.res1(out)
        out = self.res2(out)
        out = self.maxpool(out)
        out = self.res3(out)
        out = self.res4(out)
        out = self.maxpool(out)
        out = self.res5(out)
        out = self.res6(out)
        out = self.maxpool(out)
        out = self.gap(out)
        
#         g = g.unsqueeze(-1).unsqueeze(-1)
#         g = g.expand(-1, -1, 16, 16).float()
#         g = self.conv_g(g)
#         g = self.cab(g)
#         g = self.gap(g)
        
#         out = torch.cat((out, g), dim=1)
        out = torch.flatten(out, 1)
        out = self.linear(out)
        return out

In [None]:
model = ResNet(64, 4)
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2,threshold_mode='abs',min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/39 [00:00<?, ?it/s]

Epoch [1], Train Loss : [1.00488] Val Loss : [0.89725] Val F1 : [0.63192]


  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/39 [00:00<?, ?it/s]

Epoch [2], Train Loss : [0.84741] Val Loss : [0.90049] Val F1 : [0.63031]


  0%|          | 0/351 [00:00<?, ?it/s]

In [None]:
test_dataset = RPDataset(test_df, test_df['img_path'].values, None, tfms=tfms)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    preds = []
    with torch.no_grad():
        for videos in tqdm(iter(test_loader)):
            videos = videos.to(device)
            logit = model(videos)
            preds += logit.argmax(1).detach().cpu().numpy().tolist()
    return preds

In [None]:
preds = inference(model, test_loader, device)

In [None]:
from sklearn.metrics import confusion_matrix

confusion_matrix = confusion_matrix(test_df['action'], preds, labels=[x for x in range(0, 4)])
confusion_matrix

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

plt.figure(figsize = (5, 5))
plt.title('Confusion Matrix')

sns.heatmap(confusion_matrix, annot=True)

In [None]:
from sklearn.metrics import classification_report
y_true = test_df['action']
y_pred = preds
target_names = ['0', '1', '2', '3']
print(classification_report(y_true, y_pred, digits=4, target_names=target_names))