In [1]:
import random
import pandas as pd
import numpy as np
import os
import cv2
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

import torchvision.models as models
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [3]:
CFG = {
    'FPS' : 30,
    'IMG_SIZE' : 128,
    'LEARNING_RATE' : 3e-4,
    'BATCH_SIZE' : 4,
    'SEED' : 41,
    'EPOCHS' : 30
}

In [4]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # seed 고정

In [26]:
df = pd.read_csv('dataset/train.csv')

# 8:2 로 Train / Val 분할
train_data, val_data, _, _ = train_test_split(df, df['label'], test_size = 0.2, random_state=CFG['SEED'])

In [None]:
cv2.Video

In [36]:
class CustomDataset(Dataset):
    def __init__(self, video_path_list, label_list, transform = None):
        self.video_path_list = video_path_list
        self.label_lst = label_list
        self.tf = transform
        self.totensor = transforms.ToTensor()

    def __getitem__(self, idx):
        frames = self.get_video(self.video_path_list[idx])

        if self.label_list is not None:
            label = self.label_list[idx]
            return frames, label
        else:
            return frames

    def __len__(self):
        return len(self.video_path_list)

    def get_video(self, path): # 30 frame 비디오 >> 30장 이미지 얻는 코드
        frames = []
        cap = cv2.VideoCapture(path)
        for _ in range(CFG['FPS']):
            print(_)
            _, img = cap.read()
            print(img)
            img = cv2.resize(img, (CFG['IMG_SIZE'], CFG['IMG_SIZE']))
            img = img / 255.  # 이부분에 원래 cv2 resize 있었는데 계속 에러나서 그냥 Transform으로 바꿈

            frames.append(img)
        return torch.FloatTensor(np.array(frames)).permute(3,0,1,2)

In [37]:
train_dataset = CustomDataset(train_data['path'].values, train_data['label'].values)
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val_data['path'].values, val_data['label'].values)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [38]:
train_dataset.__getitem__(1)

0
None


error: OpenCV(4.7.0) /io/opencv/modules/imgproc/src/resize.cpp:4062: error: (-215:Assertion failed) !ssize.empty() in function 'resize'


## Model Define

In [8]:
class BaseModel(nn.Module):
    def __init__(self, num_classes = 5):
        super(BaseModel, self).__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv3d(3,8,(3,3,3)), # 커널 참 신기하게 생겼네
            nn.ReLU(),
            nn.BatchNorm3d(8),
            nn.MaxPool3d(2),
            nn.Conv3d(8,32,(2,2,2)),
            nn.ReLU(),
            nn.BatchNorm3d(64),
            nn.MaxPool3d(2),
            nn.Conv3d(64, 128, (2,2,2)),
            nn.ReLU(),
            nn.BatchNorm3d(128),
            nn.MaxPool3d((1,7,7))
        )
        self.classifier = nn.Linear(512, num_classes)
    
    def forward(self, x):
        batch_size = x.size(0)
        x = self.feature_extractor(x)
        x = x.view(batch_size, -1)
        x - self.classifier(x)
        return x

In [9]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)

    best_val_score = 0
    best_model = None

    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for videos, label in tqdm(train_loader):
            videos = videos.to(device)
            label = label.to(device)

            optimizer.zero_grad()

            output = model(videos)
            loss = criterion(output, label)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'EPOCH [{epoch}] | TRAIN LOSS [{_train_loss:.5f}] | VAL LOSS [{_val_loss:.5f}] | VAL F1 [{_val_score}]')

        if scheduler is not None:
            scheduler.step(_val_score)

        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
    return best_model


In [10]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, trues = [], []

    with torch.no_grad():
        for videos, label in tqdm(iter(val_loader)):
            videos = videos.to(device)
            label = label.to(device)

            logit = model(videos)
            loss = criterion(logit, label)

            val_loss.append(loss.item())
            
            preds += logit.argmax(1).detach().cpu().numpy().tolist() # argmax(1) : axis 1 로 최대 인덱스 반환
            trues += label.detach().cpu().numpy().tolist()
        
        _val_loss = np.mean(val_loss)

    _val_score = f1_score(trues, preds, average='macro')
    return _val_loss, _val_score

In [17]:
model = BaseModel()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG['LEARNING_RATE'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2,threshold_mode='abs',min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/122 [00:00<?, ?it/s]


TypeError: unsupported operand type(s) for /: 'NoneType' and 'float'