In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
import os
import cv2
from google.colab.patches import cv2_imshow

import torch
from torch import nn, optim
from torch.utils.data import Dataset
from torchvision import transforms, datasets

from pathlib import Path
import numpy as np
import glob
from tqdm import tqdm

BATCH_SIZE=64

---

**공개된 학습 데이셋 구조**

OpenDataset

　┖ Sequence01

　　┖ 00000.jpg ~ 00900.jpg

　┖ Sequence02

　　┖ 00000.jpg ~ 00900.jpg

　┖ Sequence03

　　┖ 00000.jpg ~ 00900.jpg

---

---

**공개된 학습용 데이터셋에 대한 설명을 위한 코드**

변수: ***dataset_path*** - 공개된 데이터셋 경로 

시퀀스 이름, 시퀀스 내 이미지 개수를 출력한다.

In [7]:
dataset_path = '/content/drive/MyDrive/OpenDataset'
for seq_name in os.listdir(dataset_path):
  seq_path = os.path.join(dataset_path, seq_name)
  print('시퀀스 폴더:', seq_name, ', 시퀀스 이미지 개수:', len(os.listdir(seq_path)))

  # for img_name in os.listdir(seq_path):
  #   img_path = os.path.join(seq_path, img_name)
  #   img = cv2.imread(img_path)
  #   cv2_imshow(img)

시퀀스 폴더: Sequence01 , 시퀀스 이미지 개수: 901
시퀀스 폴더: Sequence02 , 시퀀스 이미지 개수: 901
시퀀스 폴더: Sequence03 , 시퀀스 이미지 개수: 443


---

---
**테스트 데이터셋 구조**

TestSequence

　┖ 00000.jpg ~ 00900.jpg

---


---

**테스트를 위한 경로 입력**

변수: ***test_seq_path*** - 테스트에 사용될 시퀀스의 경로

(제출된 코드에서 다음 경로만 변경하여 채점할 예정)

In [151]:
test_seq_path = '/content/drive/MyDrive/OpenDataset/Sequence01'

---

---

**테스트 시퀀스의 이미지를 불러오는 class 정의**

테스트 시퀀스 안의 이미지들을 Tensor로 변환하여 내보내주는 Dataset class를 정의

**(각자 방법에 따라 class 내부를 수정해서 사용 )**

변수: ***imgPathes*** - 테스트 시퀀스 내 이미지들의 경로를 저장하는 list

함수: 

*__init__* - 시퀀스 내 이미지들의 경로를 imgPathes에 저장

*__getitem__* - 저장된 이미지들의 경로를 통해 이미지를 불러오고 텐서로 변환하여 출력한다. (PyTorch의 DataLoader, for 반복문 등에 의해 호출되는 함수)



In [152]:
class testSequenceDataset_section1(Dataset):
  def __init__(self, _test_seq_path):
    self.imgPathes = []

    # 시퀀스 내 이미지 경로들을 list에 저장(이름순 정렬)
    for _img_name in sorted(os.listdir(_test_seq_path)):
      img_path = os.path.join(_test_seq_path, _img_name)
      self.imgPathes.append(img_path)
  
  def __len__(self):
    return len(self.imgPathes)

  def __getitem__(self, idx):
    _img = cv2.imread(self.imgPathes[idx], cv2.IMREAD_GRAYSCALE)

    if _img is None:
      print('unable to open', f)
    else:
      pts1 = np.float32([[90, 90],
                        [170, 90],
                        [170, 145],
                        [90, 145]])

      pts2 = np.float32([[0, 0],
                        [240, 0],
                        [240, 320],
                        [0, 320]])

      h = 240
      w = 320

      mtrx = cv2.getPerspectiveTransform(pts1, pts2)
      dst = cv2.warpPerspective(_img, mtrx, (w, h))

    _img_tensor = torch.from_numpy(dst)
    return _img_tensor

In [153]:
class testSequenceDataset_section2(Dataset):
  def __init__(self, _test_seq_path):
    self.imgPathes = []

    # 시퀀스 내 이미지 경로들을 list에 저장(이름순 정렬)
    for _img_name in sorted(os.listdir(_test_seq_path)):
      img_path = os.path.join(_test_seq_path, _img_name)
      self.imgPathes.append(img_path)
  
  def __len__(self):
    return len(self.imgPathes)

  def __getitem__(self, idx):
    _img = cv2.imread(self.imgPathes[idx], cv2.IMREAD_GRAYSCALE)

    if _img is None:
      print('unable to open', f)
    else:
      pts1 = np.float32([[60, 140],
                        [130, 140],
                        [130, 240],
                        [60, 240]])

      pts2 = np.float32([[0, 0],
                        [240, 0],
                        [240, 320],
                        [0, 320]])
      
      h = 240
      w = 320

      mtrx = cv2.getPerspectiveTransform(pts1, pts2)
      dst = cv2.warpPerspective(_img, mtrx, (w, h))

    _img_tensor = torch.from_numpy(dst)
    return _img_tensor

In [154]:
class testSequenceDataset_section3(Dataset):
  def __init__(self, _test_seq_path):
    self.imgPathes = []

    # 시퀀스 내 이미지 경로들을 list에 저장(이름순 정렬)
    for _img_name in sorted(os.listdir(_test_seq_path)):
      img_path = os.path.join(_test_seq_path, _img_name)
      self.imgPathes.append(img_path)
  
  def __len__(self):
    return len(self.imgPathes)

  def __getitem__(self, idx):
    _img = cv2.imread(self.imgPathes[idx], cv2.IMREAD_GRAYSCALE)

    if _img is None:
      print('unable to open', f)
    else:
      pts1 = np.float32([[250, 130],
                        [320, 130],
                        [320, 240],
                        [250, 240]])

      pts2 = np.float32([[0, 0],
                        [240, 0],
                        [240, 320],
                        [0, 320]])
      
      h = 240
      w = 320

      mtrx = cv2.getPerspectiveTransform(pts1, pts2)
      dst = cv2.warpPerspective(_img, mtrx, (w, h))

    _img_tensor = torch.from_numpy(dst)
    return _img_tensor

In [155]:
class testSequenceDataset_section4(Dataset):
  def __init__(self, _test_seq_path):
    self.imgPathes = []

    # 시퀀스 내 이미지 경로들을 list에 저장(이름순 정렬)
    for _img_name in sorted(os.listdir(_test_seq_path)):
      img_path = os.path.join(_test_seq_path, _img_name)
      self.imgPathes.append(img_path)
  
  def __len__(self):
    return len(self.imgPathes)

  def __getitem__(self, idx):
    _img = cv2.imread(self.imgPathes[idx], cv2.IMREAD_GRAYSCALE)

    if _img is None:
      print('unable to open', f)
    else:
      pts1 = np.float32([[220, 100],
                        [300, 100],
                        [300, 117],
                        [220, 117]])

      pts2 = np.float32([[0, 0],
                        [319, 0],
                        [319, 239],
                        [0, 239]])

      h = 240
      w = 320

      mtrx = cv2.getPerspectiveTransform(pts1, pts2)
      dst = cv2.warpPerspective(_img, mtrx, (w, h))

    _img_tensor = torch.from_numpy(dst)
    return _img_tensor

In [156]:
class testSequenceDataset_direction(Dataset):
  def __init__(self, _test_seq_path):
    self.imgPathes = []

    # 시퀀스 내 이미지 경로들을 list에 저장(이름순 정렬)
    for _img_name in sorted(os.listdir(_test_seq_path)):
      img_path = os.path.join(_test_seq_path, _img_name)
      self.imgPathes.append(img_path)
  
  def __len__(self):
    return len(self.imgPathes)

  def __getitem__(self, idx):
    _img = cv2.imread(self.imgPathes[idx], cv2.IMREAD_GRAYSCALE)

    if _img is None:
      print('unable to open', f)
    else:
      cut_img = cv2.imread('CUT.png', cv2.IMREAD_GRAYSCALE)
      dst = cv2.bitwise_and(_img, cut_img)

    _img_tensor = torch.from_numpy(dst)
    return _img_tensor



테스트 데이터셋 생성

In [157]:
testDataset_section1 = testSequenceDataset_section1(test_seq_path)
print('1구역 테스트 데이터셋의 이미지 개수:', len(testDataset_section1.imgPathes))
print(testDataset_section1.imgPathes)

testDataset_section2 = testSequenceDataset_section2(test_seq_path)
print('2구역 테스트 데이터셋의 이미지 개수:', len(testDataset_section2.imgPathes))
print(testDataset_section2.imgPathes)

testDataset_section3 = testSequenceDataset_section3(test_seq_path)
print('3구역 테스트 데이터셋의 이미지 개수:', len(testDataset_section3.imgPathes))
print(testDataset_section3.imgPathes)

testDataset_section4 = testSequenceDataset_section4(test_seq_path)
print('4구역 테스트 데이터셋의 이미지 개수:', len(testDataset_section4.imgPathes))
print(testDataset_section4.imgPathes)

testDataset_direction = testSequenceDataset_direction(test_seq_path)
print('방향 테스트 데이터셋의 이미지 개수:', len(testDataset_direction.imgPathes))
print(testDataset_direction.imgPathes)

1구역 테스트 데이터셋의 이미지 개수: 901
['/content/drive/MyDrive/OpenDataset/Sequence01/00000.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00001.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00002.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00003.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00004.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00005.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00006.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00007.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00008.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00009.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00010.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00011.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00012.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00013.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00014.jpg', '/content/drive/MyDrive/OpenDataset/Sequence01/00015.jpg', '/content/drive/MyDrive/OpenD

---

---

**문제풀이를 위한 카운트 변수 선언**

문제를 푸는 코드에서 해당 변수를 적절한 시점에 카운트한다.

In [158]:
# 문제 변수
num_sec1 = 0       # section1을 통과한 차량 대수
num_sec2 = 0       # section2을 통과한 차량 대수
num_sec3 = 0       # section3을 통과한 차량 대수
num_sec4 = 0       # section4을 통과한 차량 대수
num_straight = 0   # 직진 차량 대수
num_left_turn = 0  # 좌회전 차량 대수
num_right_turn = 0 # 우회전 차량 대수

---

---

**
***문제풀이 코드 입력***
**

***dataloader***

In [159]:
testDataLoader_section1 = torch.utils.data.DataLoader(testDataset_section1,
                                                      batch_size=BATCH_SIZE,
                                                      shuffle=True,
                                                      num_workers=8,
                                                      drop_last=True)
testDataLoader_section2 = torch.utils.data.DataLoader(testDataset_section2,
                                                      batch_size=BATCH_SIZE,
                                                      shuffle=True,
                                                      num_workers=8,
                                                      drop_last=True)
testDataLoader_section3 = torch.utils.data.DataLoader(testDataset_section3,
                                                      batch_size=BATCH_SIZE,
                                                      shuffle=True,
                                                      num_workers=8,
                                                      drop_last=True)
testDataLoader_section4 = torch.utils.data.DataLoader(testDataset_section4,
                                                      batch_size=BATCH_SIZE,
                                                      shuffle=True,
                                                      num_workers=8,
                                                      drop_last=True)

testDataLoader_direction = torch.utils.data.DataLoader(testDataset_direction,
                                                      batch_size=BATCH_SIZE,
                                                      shuffle=True,
                                                      num_workers=8,
                                                      drop_last=True)

In [160]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [161]:
class SampleModel2(nn.Module):

  def __init__(self):
    super().__init__()

    self.layer1 = nn.Sequential(
        nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.MaxPool2d(2)
    )

    self.fc1 = nn.Linear(in_features=8*240*320, out_features=64)
    self.drop = nn.Dropout2d(0.25)
    self.fc2 = nn.Linear(in_features=64, out_features=2)

  def forward(self, x):
    out = self.layer1(x)
    out = out.view(out.size(0), -1)
    out = self.fc1(out)
    out = self.drop(out)
    out = self.fc2(out)
    return out

In [162]:
class SampleModel4(nn.Module):

  def __init__(self):
    super().__init__()

    self.layer1 = nn.Sequential(
        nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.MaxPool2d(2)
    )

    self.fc1 = nn.Linear(in_features=8*240*320, out_features=64)
    self.drop = nn.Dropout2d(0.25)
    self.fc2 = nn.Linear(in_features=64, out_features=4)

  def forward(self, x):
    out = self.layer1(x)
    out = out.view(out.size(0), -1)
    out = self.fc1(out)
    out = self.drop(out)
    out = self.fc2(out)
    return out

In [163]:
model_section1 = SampleModel2()
model_section2 = SampleModel2()
model_section3 = SampleModel2()
model_section4 = SampleModel2()

model_directionSR = SampleModel4()
model_directionL = SampleModel2()

In [164]:
model_section1.load_state_dict(torch.load('Section1CNN.pth'))
model_section1.to(device)
model_section2.load_state_dict(torch.load('Section2CNN.pth'))
model_section2.to(device)
model_section3.load_state_dict(torch.load('Section3CNN.pth'))
model_section3.to(device)
model_section4.load_state_dict(torch.load('Section4CNN.pth'))
model_section4.to(device)

model_directionSR.load_state_dict(torch.load('directionSRCNN.pth'))
model_directionSR.to(device)
model_directionL.load_state_dict(torch.load('directionLCNN.pth'))
model_directionL.to(device)

SampleModel2(
  (layer1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Linear(in_features=614400, out_features=64, bias=True)
  (drop): Dropout2d(p=0.25, inplace=False)
  (fc2): Linear(in_features=64, out_features=2, bias=True)
)

In [165]:
# 1구역 통과
model_section1 = model_section1.eval()

with torch.no_grad():
  for image in testDataLoader_section1:
    x = image.to(device)

    output = model_section1.forward(x.unsqueeze(1).float())
    _, output_index = torch.max(output, 1)

    num_sec1 += (output_index == 1).sum()

In [166]:
# 2구역 통과
model_section2 = model_section2.eval()

with torch.no_grad():
  for image in testDataLoader_section2:
    x = image.to(device)

    output = model_section2.forward(x.unsqueeze(1).float())
    _, output_index = torch.max(output, 1)
    
    num_sec2 += (output_index == 0).sum()

In [167]:
# 3구역 통과
model_section3 = model_section3.eval()

with torch.no_grad():
  for image in testDataLoader_section3:
    x = image.to(device)

    output = model_section3.forward(x.unsqueeze(1).float())
    _, output_index = torch.max(output, 1)

    num_sec3 += (output_index == 1).sum()

In [168]:
# 4구역 통과
model_section4 = model_section4.eval()

with torch.no_grad():
  for image in testDataLoader_section4:
    x = image.to(device)

    output = model_section4.forward(x.unsqueeze(1).float())
    _, output_index = torch.max(output, 1)
    
    num_sec4 += (output_index == 0).sum()

In [169]:
# 직진, 우회전
model_directionSR = model_directionSR.eval()

with torch.no_grad():
  for image in testDataLoader_direction:
    x = image.to(device)

    output = model_directionSR.forward(x.unsqueeze(1).float())
    _, output_index = torch.max(output, 1)
    
    num_straight += (output_index == 2).sum()
    num_right_turn += (output_index == 3).sum()

In [170]:
# 좌회전
model_directionL = model_directionL.eval()

with torch.no_grad():
  for image in testDataLoader_direction:
    x = image.to(device)

    output = model_directionL.forward(x.unsqueeze(1).float())
    _, output_index = torch.max(output, 1)

    num_left_turn += (output_index == 0).sum()

---

---

**문제 정답 출력 부분**

In [171]:
print('문제 정답')
print(f'Section 1: {num_sec1}대')
print(f'Section 2: {num_sec2}대')
print(f'Section 3: {num_sec3}대')
print(f'Section 4: {num_sec4}대')
print(f'직진: {num_straight}대')
print(f'좌회전: {num_left_turn}대')
print(f'우회전: {num_right_turn}대')
print("50 26 11 10 11 10 24")
# print("58 41 19 5 15 7 38")
# print("66 40 19 7 19 10 36")

문제 정답
Section 1: 50대
Section 2: 27대
Section 3: 10대
Section 4: 13대
직진: 9대
좌회전: 13대
우회전: 26대
50 26 11 10 11 10 24


---