In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
import os
import csv
import random
import numpy as np
from PIL import Image as PIL_Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as data
from torch.optim.lr_scheduler import StepLR
from torch.autograd import Variable

from torchvision import transforms
from torchvision.models import resnet50, resnet152

# 랜덤 시드 고정

In [None]:
seed = 365

os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)    
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

In [None]:
drivepath = "/content/gdrive/MyDrive"

In [None]:
device_type = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device(device_type)
print(device)

cpu


# 모델과 그에 맞는 데이터셋 설정

Convolution layer의 출력을 FC layer에 통과시킨 뒤 매 타임스탭마다 이미지를 입력받아 키 입력을 출력하는 다대다 lstm 모델을 구성합니다.

In [None]:
class KartModel5(nn.Module):
  def __init__(self, num_class = 6, cnn_to_lstm = 1000, lstm_hidden = 100, num_layers = 5):
    super(KartModel5, self).__init__()
    self.num_class = num_class
    self.num_layers = num_layers
    self.hidden_size = lstm_hidden

    self.resnet = resnet50(pretrained=False)
    self.resnet.fc = nn.Sequential(
      nn.Linear(in_features=2048, out_features=cnn_to_lstm, bias=True),
      nn.ReLU(),
    )
    self.lstm = nn.LSTM(
        input_size = cnn_to_lstm + num_class,
        hidden_size = lstm_hidden,
        num_layers = num_layers,
        batch_first = True,
    )
    self.fc_1 = nn.Linear(lstm_hidden, 512)
    self.relu = nn.ReLU()
    self.fc_2 = nn.Linear(512, num_class)
    # self.sigmoid = nn.Sigmoid()

  def forward(self, x_3d, key_inputs):
    hidden = None
    x = None
    feature_list = []
    for t in range(x_3d.size(1)):
      # with torch.no_grad():
      x = self.resnet(x_3d[:, t, :, :, :])
      key = key_inputs[:, t, :]
      features_and_key = torch.cat([x.unsqueeze(1), key.unsqueeze(1)], dim=2)
      out, hidden = self.lstm(features_and_key, hidden)
    # batch first = True
    # batch, seq, hidden_size

    out = self.fc_1(out[:, -1, :])
    # 마지막 sequence
    out = self.relu(out)
    out = self.fc_2(out)
    # out = self.sigmoid(out)

    return out

In [None]:
seq_size = 20

In [None]:
class KartDataSet5(data.Dataset):
  def __init__(self, csv_file):
    images = []
    labels = []

    with open(csv_file) as csvfile:
      csv_reader = csv.reader(csvfile)
      next(csv_reader, None)        # 첫번째 row 스킵
      
      for row in csv_reader:
        images.append(drivepath + '/' + row[0])
        labels.append([int(x) for x in list(row[1])])
    
    self.image_seqs = []
    self.label_seqs = []

    for i in range(len(images)):
      if i - seq_size >= 0:
        self.image_seqs.append(images[i - seq_size + 1:i + 1])
        self.label_seqs.append(labels[i - seq_size:i + 1])

  def __getitem__(self, index):
    image_paths = self.image_seqs[index]
    images = [PIL_Image.open(image_path) for image_path in image_paths]
    label = self.label_seqs[index]

    preprocess = transforms.Compose([
      transforms.Resize((224, 224)),
      transforms.ToTensor(),
      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),     # -1 ~ 1 로 normalize
    ])

    preprocessed_list = []
    for img in images:
      preprocessed_list.append(preprocess(img))
    
    return torch.stack(preprocessed_list), torch.Tensor(label[:-1]), torch.Tensor(label[-1])

  def __len__(self):
    return len(self.image_seqs)

In [None]:
dataset = KartDataSet5(drivepath + "/csv/kart_test.mp4.csv")
print((dataset[0][0]).shape)
print((dataset[0][1]).shape)
print((dataset[0][2]).shape)
# seq, channel, width, height
# label

torch.Size([20, 3, 224, 224])
torch.Size([20, 6])
torch.Size([6])


# 하이퍼 파라미터 설정

In [None]:
num_epochs = 1
lr = 1e-4
batch_size = 4
log_interval = 10

In [None]:
loader = data.DataLoader(
    dataset,
    batch_size=batch_size,
    num_workers=4,
    shuffle=True
)

  cpuset_checked))


# 모델 저장 설정 및 학습 가능한 레이어 확인

In [None]:
save_path = drivepath + "/test_model5.pt"

model = KartModel5()
model = model.to(device)
model.load_state_dict(torch.load(save_path))

for param, weight in model.named_parameters():
    print(f"param {param:20} required gradient? -> {weight.requires_grad}")
# model = model.to(device)

RuntimeError: ignored

# Loss, Optimizer, Scheduler 설정

In [None]:
# criterion = nn.CrossEntropyLoss()
pos_weight = torch.Tensor([5/95, 70/30, 65/35, 95/5, 85/15, 1.0])
pos_weight = pos_weight.to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight, reduction='none')
# criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
scheduler = StepLR(optimizer, 5, gamma=0.5)

In [None]:
it = iter(loader)

In [None]:
inputs, key_inputs, labels = next(it)
print(inputs.shape)
print(key_inputs.shape)
print(labels.shape)
# batch, seq, channel, width, height
# batch, seq, label

# 데이터로더, 모델 인풋, 아웃풋 점검

In [None]:
from torchvision.transforms import ToPILImage
from IPython.display import Image
to_img = ToPILImage()
sigmoid = nn.Sigmoid()

display(to_img(inputs[0][0]))
display(to_img(inputs[0][1]))
display(to_img(inputs[0][-1]))

inputs = inputs.to(device)
key_inputs = key_inputs.to(device)
labels = labels.to(device)
outs = model(inputs, key_inputs)
print(outs.shape)
print(sigmoid(outs) > 0.5)
print(labels)
print(((outs > 0.5) == labels).sum(axis=1))
print((((outs > 0.5) == labels).sum(axis=1) == 6).sum())
loss = criterion(outs, labels)
print(labels.shape)
print(loss.cpu())
# print(((labels == (outs > 0.5)).sum(axis=2) == 6).sum())
# print(outs > 0.5)
# print(labels)

NameError: ignored

# 학습 진행상황 확인을 위한 wandb 설정

In [None]:
!pip install wandb



In [None]:
import wandb

wandb.login()

[34m[1mwandb[0m: Currently logged in as: [33mjunhyeokk[0m (use `wandb login --relogin` to force relogin)


True

In [None]:
config = {}
config['n_epochs'] = num_epochs
config['batch_sze'] = batch_size
# config['lr'] = lr


wandb.init(project="boomhill24_4", config=config)

# 학습 진행 및 저장

In [None]:
for epoch in range(num_epochs):
  model.train()
  loss_value = 0
  matches = 0
  
  for idx, train_batch in enumerate(loader):
    inputs, key_inputs, labels = train_batch
    inputs = inputs.to(device)
    key_inputs = key_inputs.to(device)
    labels = labels.to(device)

    outs = model(inputs, key_inputs)
    preds = (outs > 0.5)
    loss = criterion(outs, labels)

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    loss_value += loss.item()
    # matches += (preds == labels).sum().item()
    matches += ((preds == labels).sum(axis = 1) == 6).sum()

    if (idx + 1) % log_interval == 0:
      train_loss = loss_value / log_interval
      train_acc = matches / batch_size / log_interval
      current_lr = scheduler.get_last_lr()

      wandb.log({"epoch" : epoch, "training_loss" : train_loss, "training_acc" : train_acc, "learning_rate" : current_lr})
      print(
          f"Epoch[{epoch + 1}/{num_epochs}]({idx + 1}/{len(loader)}) || "
          # f"training loss {train_loss:4.4} || lr {current_lr}"
          f"training loss {train_loss:4.4} || training accuracy {train_acc:4.2%} || lr {current_lr}"
      )

      loss_value = 0
      matches = 0

    scheduler.step()
    
  torch.save(model.state_dict(), save_path)

  cpuset_checked))


RuntimeError: ignored

In [None]:
print(len(loader))
print(len(dataset))

In [None]:
display(to_img(inputs[0][0]))
display(to_img(inputs[0][1]))
display(to_img(inputs[0][19]))

inputs = inputs.to(device)
labels = labels.to(device)
print(inputs.shape)
outs = model(inputs, key_inputs)
print(outs.shape)
print(outs > 0.5)
# print(((outs > 0.5) == labels).sum(axis=1))
print((((outs > 0.5) == labels).sum(axis=1) == 6).sum())
loss = criterion(outs, labels)
print(labels.shape)
print(loss.cpu())
# print(((labels == (outs > 0.5)).sum(axis=2) == 6).sum())
# print(outs > 0.5)
# print(labels)

In [None]:
a = 0
for i in loader:
  a = i
  break
  
inputs, key_inputs, labels = a
print(inputs.shape)
print(key_inputs.shape)
print(labels.shape)

display(to_img(inputs[0][1]))
display(to_img(inputs[0][19]))

inputs = inputs.to(device)
key_inputs = key_inputs.to(device)
labels = labels.to(device)
print(inputs.shape)
outs = model(inputs, key_inputs)
print(outs)
print(outs > 0.5)
# print(((outs > 0.5) == labels).sum(axis=1))
print((((outs > 0.5) == labels).sum(axis=1) == 6).sum())
loss = criterion(outs, labels)
print(labels.shape)
print(loss.cpu())

In [None]:
torch.cuda.empty_cache()