<a href="https://colab.research.google.com/github/ksj1999/KN_Vision/blob/main/efficientnet%2Blstm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

In [2]:
!pip install efficientnet_pytorch

Collecting efficientnet_pytorch
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: efficientnet_pytorch
  Building wheel for efficientnet_pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for efficientnet_pytorch: filename=efficientnet_pytorch-0.7.1-py3-none-any.whl size=16428 sha256=1796c09b6570fe79dc37e0606ac5d2bc8f05ce1bd702af83107a38403a3a4861
  Stored in directory: /root/.cache/pip/wheels/03/3f/e9/911b1bc46869644912bda90a56bcf7b960f20b5187feea3baf
Successfully built efficientnet_pytorch
Installing collected packages: efficientnet_pytorch
Successfully installed efficientnet_pytorch-0.7.1


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import random
import pandas as pd
import numpy as np
import os
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from efficientnet_pytorch import EfficientNet


import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore')

In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [6]:
import os
import pandas as pd

dataset_path = os.listdir("/content/drive/MyDrive/data")
rooms = []
class_mapping = {"normal": 0}
class_counter = 1

for category in dataset_path:
    if category == 'normal':
        subfolders = os.listdir("/content/drive/MyDrive/data/" + category)
        for subfolder in subfolders:
            all_rooms = os.listdir("/content/drive/MyDrive/data/" + category + "/" + subfolder)
            for room in all_rooms:
                rooms.append((category, "/content/drive/MyDrive/data/" + category + "/" + subfolder + "/" + room))
    elif category == 'abnormal':
        subfolders = os.listdir("/content/drive/MyDrive/data/" + category)
        for subfolder in subfolders:
            class_mapping[subfolder] = class_counter
            class_counter += 1
            all_rooms = os.listdir("/content/drive/MyDrive/data/" + category + "/" + subfolder)
            for room in all_rooms:
                rooms.append((subfolder, "/content/drive/MyDrive/data/" + category + "/" + subfolder + "/" + room))

train_df = pd.DataFrame(data=rooms, columns=["tag", "video_name"])
data = train_df.loc[:, ["video_name", "tag"]]

# 인코딩
data["tag"] = data["tag"].map(class_mapping)

# 인코딩 확인
print("Class mapping:")
print(class_mapping)
print("After encoding:")
print(data.head())

# 원-핫 인코딩
one_hot_labels = pd.get_dummies(data['tag'], prefix='class')
data = pd.concat([data, one_hot_labels], axis=1)

# 인코딩 확인
print("After one-hot encoding:")
print(data.head())

train = data.sample(frac=0.8, random_state=42)
tmp = data.drop(train.index)
valid = tmp.sample(frac=0.5, random_state=42)
test = tmp.drop(valid.index)

# train, valid, test 세트의 크기 출력
print("Train set size:", len(train))
print("Validation set size:", len(valid))
print("Test set size:", len(test))

train.to_csv("/content/drive/MyDrive/data/train_test.csv")
valid.to_csv("/content/drive/MyDrive/data/valid_test.csv")
test.to_csv("/content/drive/MyDrive/data/test_test.csv")


Class mapping:
{'normal': 0, '파손': 1, '절도': 2, '폭행': 3}
After encoding:
                                          video_name  tag
0  /content/drive/MyDrive/data/normal/선택/C_2_...    0
1  /content/drive/MyDrive/data/normal/선택/C_2_...    0
2  /content/drive/MyDrive/data/normal/선택/C_2_...    0
3  /content/drive/MyDrive/data/normal/선택/C_2_...    0
4  /content/drive/MyDrive/data/normal/선택/C_2_...    0
After one-hot encoding:
                                          video_name  tag  class_0  class_1  \
0  /content/drive/MyDrive/data/normal/선택/C_2_...    0        1        0   
1  /content/drive/MyDrive/data/normal/선택/C_2_...    0        1        0   
2  /content/drive/MyDrive/data/normal/선택/C_2_...    0        1        0   
3  /content/drive/MyDrive/data/normal/선택/C_2_...    0        1        0   
4  /content/drive/MyDrive/data/normal/선택/C_2_...    0        1        0   

   class_2  class_3  
0        0        0  
1        0        0  
2    

In [7]:
print("NaN values after encoding:")
print(data[data['tag'].isna()])


NaN values after encoding:
Empty DataFrame
Columns: [video_name, tag, class_0, class_1, class_2, class_3]
Index: []


In [8]:
train = pd.read_csv("/content/drive/MyDrive/data/train_test.csv")
test = pd.read_csv("/content/drive/MyDrive/data/test_test.csv")
valid = pd.read_csv("/content/drive/MyDrive/data/valid_test.csv")

In [9]:
len(train)

4985

In [10]:
import gc
import torch

gc.collect()
torch.cuda.empty_cache()

In [11]:
class CustomDataset(Dataset):
  def __init__(self, video_path_list,label_list):
    self.video_path_list = video_path_list
    self.label_list = label_list

  def __getitem__(self, index):
    frames = self.get_video(self.video_path_list[index])
    if self.label_list is not None:
      label = self.label_list[index]
      return frames, label
    else:
      return frames

  def __len__(self):
    return len(self.video_path_list)

  def get_video(self, path):
    frames = []
    cap = cv2.VideoCapture(path)
    for _ in range(77):
      _, img = cap.read()

      if img is not None:
        img = cv2.resize(img, (128,128))
        img = img/255
        frames.append(img)

    cap.release()

    # return torch.FloatTensor(np.array(frames)).permute(3,0,1,2)

    return torch.FloatTensor(np.array(frames)).permute(0,3,1,2)

  def check_frame(self, frames):
    return frames.shape

In [12]:
train_dataset = CustomDataset(train["video_name"].values, train["tag"].values)
print(train_dataset[0][0].size()) #[데이터수, 채널, 프레임수, 크기,크기]
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)
val_dataset = CustomDataset(valid["video_name"].values, valid["tag"].values)
val_loader = DataLoader(val_dataset, batch_size =4, shuffle=False, num_workers=0)

torch.Size([77, 3, 128, 128])


In [13]:
from torchvision.models import resnet101

class CNNLSTM(nn.Module):
    def __init__(self, num_classes=2):
        super(CNNLSTM, self).__init__()
        self.resnet = resnet101(pretrained=True)
        self.resnet.fc = nn.Sequential(nn.Linear(self.resnet.fc.in_features, 300))
        self.lstm = nn.LSTM(input_size=300, hidden_size=256, num_layers=3)
        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x_3d):
        hidden = None

        # Iterate over each frame of a video in a video of batch * frames * channels * height * width
        for t in range(x_3d.size(1)):
            with torch.no_grad():
                x = self.resnet(x_3d[:, t])
            # Pass latent representation of frame through lstm and update hidden state
            out, hidden = self.lstm(x.unsqueeze(0), hidden)

        # Get the last hidden state (hidden is a tuple with both hidden and cell state in it)
        x = self.fc1(hidden[0][-1])
        x = F.relu(x)
        x = self.fc2(x)

        return x

class EfficientLstm(nn.Module):

  def  __init__(self, num_classes=4, in_channels=3):
    super(EfficientLstm, self).__init__()
    self.efficient = EfficientNet.from_pretrained('efficientnet-b0', in_channels=in_channels)
    self.lstm = nn.LSTM(input_size=1000, hidden_size=256, num_layers=3)
    self.fc1 = nn.Linear(256, num_classes)


  def forward(self, x_3d):
    hidden = None
    for t in range(x_3d.size(1)):
      with torch.no_grad():
        x = self.efficient(x_3d[:,t]) #4*1000

      out, hidden = self.lstm(x.unsqueeze(0), hidden) #1000*256

    x = F.relu(hidden[0][-1])
    x = self.fc1(x)

    return x

In [14]:
def validation(model, criterion, val_loader, device):
  model.eval()
  val_loss = []
  preds, trues = [],[]

  with torch.no_grad():
    for videos, labels in tqdm(iter(val_loader)):
      videos = videos.to(device)
      labels = labels.to(device)
      print(labels)

      logit = model(videos)

      loss = criterion(logit, labels)
      val_loss.append(loss.item())

      preds +=logit.argmax(1).detach().cpu().numpy().tolist()
      trues += labels.detach().cpu().numpy().tolist()

    _val_loss = np.mean(val_loss)

  _val_score = f1_score(trues, preds, average="macro")
  return _val_loss, _val_score

In [15]:
def train_(model, optimizer, train_loader, val_loader, schedule, device):
  model.to(device)
  criterion = nn.CrossEntropyLoss().to(device)

  best_val_score = 0
  best_model = None

  for epoch in range(1,3): #테스트 용이니까.
    model.train()
    train_loss=[]
    for videos, labels in tqdm(iter(train_loader)):
      videos = videos.to(device)
      labels = labels.to(device)

      optimizer.zero_grad()
      output = model(videos)

      loss = criterion(output, labels)

      loss.backward()
      optimizer.step()

      train_loss.append(loss.item())

    _val_loss, _val_score = validation(model, criterion, val_loader, device)
    _train_loss = np.mean(train_loss)
    print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val F1 : [{_val_score:.5f}]')

    if schedule is not None:
      schedule.step(_val_score)

    if best_val_score < _val_score:
      best_val_score = _val_score
      best_model = model

  return best_model

In [16]:
model = EfficientLstm()
# model = CRNN()

model.eval()
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=2, threshold_mode="abs",
                                                      min_lr=1e-8, verbose=True)
infer_model = train_(model, optimizer, train_loader, val_loader, scheduler, device)

Downloading: "https://github.com/lukemelas/EfficientNet-PyTorch/releases/download/1.0/efficientnet-b0-355c32eb.pth" to /root/.cache/torch/hub/checkpoints/efficientnet-b0-355c32eb.pth
100%|██████████| 20.4M/20.4M [00:00<00:00, 39.1MB/s]


Loaded pretrained weights for efficientnet-b0


  0%|          | 0/1247 [00:00<?, ?it/s]

x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :

  0%|          | 0/156 [00:00<?, ?it/s]

tensor([0, 0, 3, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 3, 2, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 2, 1, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 2, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 0, 0, 2], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 2, 3, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([3, 0, 0, 1], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([3, 0, 0, 1], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 3, 3, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([1, 0, 0, 3], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([1, 0, 0, 0], device='cuda:

  0%|          | 0/1247 [00:00<?, ?it/s]

x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :  torch.Size([4, 256])
x test :

  0%|          | 0/156 [00:00<?, ?it/s]

tensor([0, 0, 3, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 3, 2, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 2, 1, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 2, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 0, 0, 2], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 2, 3, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([3, 0, 0, 1], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([3, 0, 0, 1], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([0, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 3, 3, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([1, 0, 0, 3], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([2, 0, 0, 0], device='cuda:0')
x test :  torch.Size([4, 256])
tensor([1, 0, 0, 0], device='cuda:

In [None]:
from google.colab import files
# 코랩에 저장
local_path = "/content"+"/model.pt"
torch.save(model.state_dict(), local_path)
# 로컬에 다운로드
files.download(local_path)

In [24]:
def __init__(self, video_path_list, label_list):
    self.video_path_list = video_path_list
    self.label_list = label_list


In [25]:
def __getitem__(self, index):
    video_path = self.video_path_list[index]
    frames = self.get_video(video_path)
    if self.label_list is not None:
        label = self.label_list[index]
        return frames, label, video_path
    else:
        return frames, video_path


In [28]:
def evaluate_and_find_errors(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    wrong_indices = []

    with torch.no_grad():
        for idx, (videos, labels) in tqdm(enumerate(data_loader)):
            videos = videos.to(device)
            labels = labels.to(device)

            # Predict class labels
            outputs = model(videos)
            predicted = outputs.argmax(dim=1)

            # Check if predictions are correct
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # Find and store wrong indices
            wrong_idx = (predicted != labels).nonzero(as_tuple=True)[0]
            wrong_batch_indices = [idx * data_loader.batch_size + i for i in wrong_idx]
            wrong_indices.extend(wrong_batch_indices)

    accuracy = 100 * correct / total
    return accuracy, wrong_indices


In [30]:
# Evaluate model on the test set
accuracy, wrong_indices = evaluate_and_find_errors(infer_model, test_loader, device)

# Retrieve paths of videos with wrong predictions
wrong_predictions = [test_loader.dataset.video_path_list[i] for i in wrong_indices]

# Print results
print(f"Accuracy on test set: {accuracy:.2f}%")
print("Videos with wrong predictions:")
for video_name in wrong_predictions:
    print(video_name)


0it [00:00, ?it/s]

ValueError: ignored