# [DACON] 월간 데이콘 TV 손동작 제어 인식 AI 경진대회
[참고 | [엉망진창주의] resnet 3d를 만들어보자! (전처리 없이 public score > 0.95)!](https://dacon.io/competitions/official/236050/codeshare/7443?page=1&dtype=recent)

In [1]:
import os
import random
import pandas as pd
import numpy as np
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision.models as models

from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings("ignore")

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


## Hyperparameter Setting

In [2]:
CFG = {
    "FPS":30,
    "IMG_SIZE":128,
    "EPOCHS":200,
    "LEARNING_RATE":3e-4,
    "BATCH_SIZE":4,
    "SEED":42
}

def seed_everything(seed):
    random.seed(seed)
    os.environ["PYTHONHASHEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    
seed_everything(CFG["SEED"])

## Data Load

In [6]:
data_root_path = r"D:\AI_Data\TV_HMCR"

df = pd.read_csv(os.path.join(data_root_path, "train.csv"))
df["path"] = df["path"].apply(lambda x: os.path.join(data_root_path, os.path.join(x.split("/")[1], x.split("/")[2])))
df.shape

(610, 3)

## Train / Validation Split

In [7]:
train, val, _, _ = train_test_split(df, df["label"], test_size=0.2, random_state=CFG["SEED"])

train.shape, val.shapeb

((488, 3), (122, 3))

## Custom Dataset

In [9]:
class CustomDataset(Dataset):
    def __init__(self, video_path_list, label_list):
        self.video_path_list = video_path_list
        self.label_list = label_list
        
    def __getitem__(self, index):
        frames = self.get_video(self.video_path_list[index])
        if self.label_list is not None:
            label = self.label_list[index]
            return frames, label
        else: return frames
        
    def __len__(self):
        return len(self.video_path_list)
    
    def get_video(self, path):
        frames = []
        cap = cv2.VideoCapture(path)
        for _ in range(CFG["FPS"]):
            _, img = cap.read()
            img = cv2.resize(img, (CFG["IMG_SIZE"], CFG["IMG_SIZE"]))
            img = img / 255.
            frames.append(img)
        return torch.FloatTensor(np.array(frames)).permute(3, 0, 1, 2)
    
train_dataset = CustomDataset(train["path"].values, train["label"].values)
val_dataset = CustomDataset(val["path"].values, val["label"].values)

train_loader = DataLoader(train_dataset, batch_size=CFG["BATCH_SIZE"], shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=CFG["BATCH_SIZE"], shuffle=False, num_workers=0)

## Model

In [10]:
from functools import partial

def get_inplanes():
    return [64, 128, 256, 512]

def conv1x1x1(in_planes, out_planes, stride=1):
    return nn.Conv3d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

def conv3x3x3(in_planes, out_planes, stride=1):
    return nn.Conv3d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_planes, out_planes, stride=1, downsample=None):
        super().__init__()
        self.conv1 = conv3x3x3(in_planes, out_planes, stride)
        self.bn1 = nn.BatchNorm3d(out_planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3x3(out_planes, out_planes)
        self.bn2 = nn.BatchNorm3d(out_planes)
        self.downsample = downsample
        self.stride = stride
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        if self.downsample is not None:
            residual = self.downsample(x)
            
        out += residual
        out = self.relu(out)
        
        return out
    
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_planes, out_planes, stride=1, downsample=None):
        super().__init__()
        self.conv1 = conv1x1x1(in_planes, out_planes)
        self.bn1 = nn.BatchNorm3d(out_planes)
        self.conv2 = conv3x3x3(in_planes, out_planes, stride)
        self.bn2 = nn.BatchNorm3d(out_planes)
        self.conv3 = conv1x1x1(out_planes, out_planes*self.expansion)
        self.bn3 = nn.BatchNorm3d(out_planes*self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        
        out = self.conv3(out)
        out = self.bn3(out)
        
        if self.downsample is not None:
            residual = self.downsample(x)
            
        out += residual
        out = self.relu(out)
        
        return out
        
class ResNet3D(nn.Module):
    def __init__(self, block, layers, block_inplanes, n_input_channels=3, conv1_t_size=7, conv1_t_stride=1, no_max_pool=False, shortcut_type="B", widen_factor=1.0, n_classes=5):
        super().__init__()
        block_inplanes = [int(x*widen_factor) for x in block_inplanes]
        self.in_planes = block_inplanes[0]
        self.no_max_pool = no_max_pool
        
        self.conv1 = nn.Conv3d(n_input_channels, self.in_planes, kernel_size=(conv1_t_size, 7, 7), stride=(conv1_t_stride, 2, 2), padding=(conv1_t_size//2, 3, 3), bias=False)
        self.bn1 = nn.BatchNorm3d(self.in_planes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, block_inplanes[0], layers[0], shortcut_type)
        self.layer2 = self._make_layer(block, block_inplanes[1], layers[1], shortcut_type, stride=2)
        self.layer3 = self._make_layer(block, block_inplanes[2], layers[2], shortcut_type, stride=2)
        self.layer4 = self._make_layer(block, block_inplanes[3], layers[3], shortcut_type, stride=2)
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(block_inplanes[3]*block.expansion, n_classes)
        # 가중치 초기화
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                nn.init.kaiming_normal_(m.weight, model="fan_out", nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm3d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def _downsample_basic_block(self, x, planes, stride):
        out = F.avg_pool3d(x, kernel_size=1, stride=stride)
        zero_pads = torch.zeros(out.size(0), planes-out.size(1), out.size(2), out.size(3), out.size(4))
        if isinstance(out.data, torch.cuda.FloatTensor):
            zero_pads = zero_pads.cuda()
        out = torch.cat([out.data, zero_pads], dim=1)
        
    def _make_layer(self, block, planes, blocks, shortcut_type, stride=1):
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            if shortcut_type == 'A': downsample = partial(self._downsample_basic_block, planes=planes*block.expansion, stride=stride)
            else: downsample = nn.Sequential(conv1x1x1(self.in_planes, planes*block.expansion, stride), nn.BatchNorm3d(planes * block.expansion))

        layers = []
        layers.append(block(in_planes=self.in_planes,planes=planes,stride=stride,downsample=downsample))
        self.in_planes = planes*block.expansion
        for i in range(1, blocks):
            layers.append(block(self.in_planes, planes))
            
    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        if not self.no_max_pool: out = self.maxpool(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        
        return out

In [11]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=5):
        super(BaseModel, self).__init__()
        self.feature_extract = nn.Sequential(
            nn.Conv3d(3, 8, (3, 3, 3)),
            nn.ReLU(),
            nn.BatchNorm3d(8),
            nn.MaxPool3d(2),
            nn.Conv3d(8, 32, (2, 2, 2)),
            nn.ReLU(),
            nn.BatchNorm3d(32),
            nn.MaxPool3d(2),
            nn.Conv3d(32, 64, (2, 2, 2)),
            nn.ReLU(),
            nn.BatchNorm3d(64),
            nn.MaxPool3d(2),
            nn.Conv3d(64, 128, (2, 2, 2)),
            nn.ReLU(),
            nn.BatchNorm3d(128),
            nn.MaxPool3d((1, 7, 7)),
        )
        self.classifier = nn.Linear(512, num_classes)
        
    def forward(self, x):
        batch_size = x.size(0)
        x = self.feature_extract(x)
        x = x.view(batch_size, -1)
        x = self.classifier(x)
        return x

## Train

In [None]:
model_depth = 34

from datetime import datetime, timezone, timedelta

# 시간 고유값 
kst = timezone(timedelta(hours=9))        
train_serial = datetime.now(tz=kst).strftime("%Y%m%d_%H%M%S")

# 기록 경로
RECORDER_DIR = os.path.join('results', str(model_depth) ,train_serial)
# 현재 시간 기준 폴더 생성
os.makedirs(RECORDER_DIR, exist_ok=True)