## 버킷 작업

In [None]:
# OS : Linux-5.10.0-26-cloud-amd64-x86_64-with-glibc2.31
# Python : 3.10.13
# Numpy : 1.25.2
# Pandas : 2.0.3
# Matplotlib : 3.7.3
# Seaborn : 0.12.2
# ipykernel : 6.26.0
# notebook : 6.5.6
# torch : 1.13.1+cu117
# Created: NOV. 20. 2023
# Author: D.W. SHIN
# 교통문제 해결을 위한 CCTV 교통 영상(고속도로)의 데이터 분석

In [None]:
# 버킷 리스트 확인
!gsutil list

In [None]:
# 버킷에서 파일 복사하기
!gsutil -m cp -r gs://cctv_storage_20231120/ cctv_datasets

In [None]:
# 현재 작업디렉토리 확인
import os
os.chdir('/home/jupyter')
HOME = os.getcwd()
print(HOME)

## zip 파일 복사

In [None]:
%cd {HOME}
zipPath1 = '/home/jupyter/cctv_datasets/cctv_storage_20231120/val/[라벨]1.수도권영동선.zip'
!unzip {zipPath1} -d /home/jupyter/cctv_datasets/cctv_storage_20231120/val/xml

In [None]:
%cd {HOME}
zipPath2 = '/home/jupyter/cctv_datasets/cctv_storage_20231120/val/[원천]1.수도권영동선.zip'
!unzip {zipPath2} -d /home/jupyter/cctv_datasets/cctv_storage_20231120/val/images

In [None]:
%cd {HOME}
zipPath3 = '/home/jupyter/mount_folder/Training/[라벨]1.수도권영동선.zip'
!unzip {zipPath3} -d /home/jupyter/cctv_datasets/cctv_storage_20231120/val/images

In [None]:
%cd {HOME}
zipPath4 = '/home/jupyter/cctv_datasets/cctv_storage_20231120/train/[원천]1-1.수도권영동선.zip'
!unzip {zipPath4} -d /home/jupyter/cctv_datasets/cctv_storage_20231120/train/images

In [None]:
# 이미지 경로 맞춰주기
#!mv * /home/jupyter/cctv_datasets/cctv_storage_20231120/val/images

In [None]:
# GPU 확인
!nvidia-smi

## YOLO 파일 설치

In [None]:
%cd {HOME}
!git clone https://github.com/ultralytics/ultralytics
%cd ultralytics

In [None]:
# Install YOLOv8
!pip install ultralytics==8.0.20

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

In [None]:
from ultralytics import YOLO

from IPython.display import display, Image

In [None]:
!pip install lxml

## XML -> TXT 파일 변환

In [None]:
# xml -> TXT로 변환
import numpy as np
import lxml
import os
import glob

from lxml import etree

CLASSES = ["car", "bus", "truck"]

def to_yolov8(y):
  """
  # change to yolo v8 format
  # [x_top_left, y_top_left, x_bottom_right, y_bottom_right] to
  # [x_center, y_center, width, height]
  """
  width = y[2] - y[0]
  height = y[3] - y[1]

  if width < 0 or height < 0:
      print("ERROR: negative width or height ", width, height, y)
      raise AssertionError("Negative width or height")
  return (y[0] + (width/2)), (y[1] + (height/2)), width, height


def load_xml_annotations(f):
  tree = etree.parse(f)
  anns = []
  for dim in tree.xpath("image"):
    image_filename = dim.attrib["name"]
    width = int(dim.attrib["width"])
    height = int(dim.attrib["height"])
    # print(image_filename)
    # print(len(dim.xpath("box")))
    boxes = []
    for box in dim.xpath("box"):
      label = CLASSES.index(box.attrib["label"])
      xtl, ytl = box.attrib["xtl"], box.attrib["ytl"]
      xbr, ybr = box.attrib["xbr"], box.attrib["ybr"]

      xc, yc, w, h = to_yolov8([float(xtl), float(ytl), float(xbr), float(ybr)])
      boxes.append([label, round(xc/width, 5), round(yc/height, 5), round(w/width, 5), round(h/height, 5)])

    anns.append([image_filename, width, height, boxes])

  return np.array(anns, dtype="object")

def write_yolov8_txt(folder, annotation):
  #print(annotation[0][:-3])
  out_filename = os.path.join(folder,str(annotation[0][:-3]))
  out_filename = os.path.splitext(out_filename)[0]
  out_filename = out_filename+'.txt'

  f = open(out_filename,"w+")
  for box in annotation[3]:
    f.write("{} {} {} {} {}\n".format(box[0], box[1], box[2], box[3], box[4]))

In [None]:
# load_xml_annotations 할때 버스 혹은 트럭만 가져오기

def load_xml_annotations_without_car(f):
  tree = etree.parse(f)
  anns = []
  
  truck_cnt = 0
  bus_cnt = 0
  car_cnt = 0
  
  
  for dim in tree.xpath("image"):
    image_filename = dim.attrib["name"]
    width = int(dim.attrib["width"])
    height = int(dim.attrib["height"])
    # print(image_filename)
    # print(len(dim.xpath("box")))
    boxes = []
    
    for box in dim.xpath("box"):
      cars = box.attrib["label"]
      
      if cars == 'car':
        car_cnt = car_cnt + 1
      
      if cars != 'car':
        label = CLASSES.index(box.attrib["label"])
        xtl, ytl = box.attrib["xtl"], box.attrib["ytl"]
        xbr, ybr = box.attrib["xbr"], box.attrib["ybr"]

        xc, yc, w, h = to_yolov8([float(xtl), float(ytl), float(xbr), float(ybr)])
        boxes.append([label, round(xc/width, 5), round(yc/height, 5), round(w/width, 5), round(h/height, 5)])
        
        if cars == 'truck':
          truck_cnt = truck_cnt + 1
        elif cars == 'bus':
          bus_cnt = bus_cnt + 1

    anns.append([image_filename, width, height, boxes])
  
  # print("truck_cnt : ", truck_cnt)
  # print("bus_cnt : ", bus_cnt)
  # print("car_cnt : ", car_cnt)
  
  # return np.array([["truck_cnt" , truck_cnt] , ["bus_cnt" , bus_cnt] , ["car_cnt" , car_cnt]])
  
  return np.array(anns, dtype="object")



In [None]:
# 파일명, 파일리스트, XML명, XML 리스트 반환
def get_file_n_xml_list(base_dir, file_list):
  file_name_list = []
  xml_name_list = []
  file_path_list = []
  xml_path_list = []

  for files in file_list:
    file_name = os.path.basename(files)    
    if os.path.splitext(file_name)[1] == '.png':
      file_name_list.append(file_name)
      under_file_path = files
      under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
      path_list = base_dir + under_file_path
      file_path_list.append(path_list)
    elif os.path.splitext(file_name)[1] == '.xml':
      xml_name_list.append(file_name)
      under_file_path = files
      under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
      path_list = base_dir + under_file_path
      xml_path_list.append(path_list)
  
  return np.array([file_name_list, file_path_list, xml_name_list, xml_path_list], dtype="object")

In [None]:
# get car type
def get_car_type(f):
  tree = etree.parse(f)
  car_type = []
  for meta_tag in tree.xpath("meta"):
    for task_tag in meta_tag.xpath("task"):
      for lables_tag in task_tag.xpath("labels"):
        for lable_tag in lables_tag.xpath("label"):
          for name_tag in lable_tag.xpath("name"):            
            car_type.append(name_tag.text)
  result = []
  truck_cnt = 0
  bus_cnt = 0
  car_cnt = 0
  for dim in tree.xpath("image"):
    for box in dim.xpath("box"):
      cars = box.attrib["label"]
      if cars == car_type[0]:
        truck_cnt = truck_cnt + 1
      elif cars == car_type[1]:
        bus_cnt = bus_cnt + 11
      elif cars == car_type[2]:
        car_cnt = car_cnt + 1
   
#  print("truck_cnt : ", truck_cnt)
#  print("bus_cnt : ", bus_cnt)
#  print("car_cnt : ", car_cnt)

  return np.array([[car_type[0] , truck_cnt] , [car_type[1] , bus_cnt] , [car_type[2] , car_cnt]])

In [None]:
# 파일명, 파일리스트, XML명, XML 리스트 반환
from pathlib import Path

def get_specific_file_n_xml_list_v2(base_dir, file_list, bra):
  file_name_list = []
  xml_name_list = []
  file_path_list = []
  xml_path_list = []

  for files in file_list:
    file_name = os.path.basename(files)
    if file_name.find('_') != -1 and len(file_name.split('_')) > 9:
        branch = file_name.split('_')[1]
        # print("branch : ", branch)
        # print("bra : ", bra)
        ## 필요한 지점, 정체여부, 차선정보, 날씨로 xml 파일리스트를 확인한다.
        if bra == branch:
            # print("branch2 : ", branch)
            # print("bra2 : ", bra)
            if os.path.splitext(file_name)[1] == '.png':
                file_name_list.append(file_name)
                under_file_path = files
                under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
                path_list = str(Path(base_dir)) + str(Path(under_file_path))
                file_path_list.append(path_list)
            elif os.path.splitext(file_name)[1] == '.xml':
                # print("branch3 : ", branch)
                # print("bra3 : ", bra)
                xml_name_list.append(file_name)
                under_file_path = files
                under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
                path_list = str(Path(base_dir)) + str(Path(under_file_path))
                xml_path_list.append(path_list)
  
  return np.array([file_name_list, file_path_list, xml_name_list, xml_path_list], dtype="object")

## Validation에서 작업

In [None]:
val_base_dir = '/home/jupyter/cctv_datasets/cctv_storage_20231120/val/labels/'
os.chdir(val_base_dir)
os.getcwd()

# Validation의 하위폴더에서 모든 파일을 리스트로 만들기
val_file_list = glob.glob('./**', recursive=True)
val_file_name = [os.path.basename(x) for x  in val_file_list]

val_file_name_list = []
val_xml_name_list = []

val_file_path_list = []
val_xml_path_list = []

for file in val_file_list:
  filename = os.path.basename(file)
  if os.path.splitext(filename)[1] == '.png':
    val_file_name_list.append(filename)
    under_file_path = file
    under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
    # path_list = val_base_dir + under_file_path
    path_list = str(Path(val_base_dir)) + str(Path(under_file_path))
    val_file_path_list.append(path_list)
  elif os.path.splitext(filename)[1] == '.xml':
    val_xml_name_list.append(filename)
    under_file_path = file
    under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
    # path_list = val_base_dir + under_file_path
    path_list = str(Path(val_base_dir)) + str(Path(under_file_path))
    val_xml_path_list.append(path_list)

In [None]:
import time

for label_file in val_xml_path_list:
    anns = load_xml_annotations(label_file)
    # print(anns)
    label_files = os.path.split(label_file)
    folderName=os.path.splitext(label_files[1])[0]
    os.makedirs(os.path.join(label_files[0],folderName), exist_ok=True)
    time.sleep(1)
    for ann in anns:
        write_yolov8_txt(os.path.join(label_files[0],folderName), ann)

## Training 에서 작업

In [None]:
train_base_dir = '/home/jupyter/cctv_datasets/cctv_storage_20231120/train/labels/'
os.chdir(train_base_dir)
os.getcwd()

# train 하위폴더에서 모든 파일을 리스트로 만들기
train_file_list = glob.glob('./**', recursive=True)
train_file_name = [os.path.basename(x) for x  in train_file_list]

train_file_name_list = []
train_xml_name_list = []

train_file_path_list = []
train_xml_path_list = []

for file in train_file_list:
  filename = os.path.basename(file)
  if os.path.splitext(filename)[1] == '.png':
    train_file_name_list.append(filename)
    under_file_path = file
    under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
    # path_list = train_base_dir + under_file_path
    path_list = str(Path(train_base_dir)) + str(Path(under_file_path))
    train_file_path_list.append(path_list)
  elif os.path.splitext(filename)[1] == '.xml':
    train_xml_name_list.append(filename)
    under_file_path = file
    under_file_path = under_file_path.replace(".\\", "/").replace("\\", "/").replace("./", "/")
    # path_list = train_base_dir + under_file_path
    path_list = str(Path(train_base_dir)) + str(Path(under_file_path))
    train_xml_path_list.append(path_list)

In [None]:
import time

for label_file in train_xml_path_list:
    anns = load_xml_annotations(label_file)
    # print(anns)
    label_files = os.path.split(label_file)
    folderName=os.path.splitext(label_files[1])[0]
    os.makedirs(os.path.join(label_files[0],folderName), exist_ok=True)
    time.sleep(1)
    for ann in anns:
        write_yolov8_txt(os.path.join(label_files[0],folderName), ann)

In [None]:
# 특정 지점, 정체여부, 차선정보, 날씨로 xml 파일리스트를 확인한다
BRANCH = "CH08"

file_name_list3, file_path_list3, xml_name_list3, xml_path_list3 = get_specific_file_n_xml_list_v2(train_base_dir, train_file_list, BRANCH)

In [None]:
xml_path_list3

In [None]:
## 트럭 버스만 바운딩 박스 사용
# 1. xml 리스트 변경하기 예) xml_path_list3 

import time

for label_file in xml_path_list3:
    anns = load_xml_annotations_without_car(label_file)
    # print(anns)
    label_files = os.path.split(label_file)
    folderName=os.path.splitext(label_files[1])[0]
    os.makedirs(os.path.join(label_files[0],folderName), exist_ok=True)
    time.sleep(1)
    for ann in anns:
        write_yolov8_txt(os.path.join(label_files[0],folderName), ann)

## 레이블 시각화 하기

In [None]:
import glob
import os

def glob_files(path, file_type="*"):
    search_string = os.path.join(path, file_type)
    files = glob.glob(search_string)

    # print('searching ', path)
    paths = []
    for f in files:
      if os.path.isdir(f):
        sub_paths = glob_files(f + '/')
        paths += sub_paths
      else:
        paths.append(f)

    # We sort the images in alphabetical order to match them
    #  to the annotation files
    paths.sort()

    return paths

In [None]:
import cv2
import numpy as np

IMAGE_SIZE = 600

def load_images(path):
  files = glob_files(path, "*001.png")

  # print(files)
  X_data = []
  for file in files:
    image = cv2.imread(file)
    # print(image.shape)
    # x = cv2.resize(image, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_AREA)

    X_data.append(image)
  return np.array(X_data)

X_test = load_images("/home/jupyter/cctv_datasets/cctv_storage_20231120/train/images/Suwon_CH08_20200720_1900_MON_9m_NH_highway_OW5_sunny_FHD")

In [None]:
WIDTH = 1080
HEIGHT = 1920

def load_labels(path):
  files = glob_files(path, "*001.txt")

  Y_data = []
  for file in files:
    with open(file) as f:
      lines = f.readlines()

      boxes = []
      for line in lines:
        tokens = line.split()

        class_id = int(tokens[0])
        xc = float(tokens[1]) * WIDTH
        yc = float(tokens[2]) * HEIGHT
        width = float(tokens[3]) * WIDTH
        height = float(tokens[4]) * HEIGHT

        boxes.append(np.array([class_id, xc, yc, width, height]))
        # print(class_id, xc, yc, width, height)

      Y_data.append(np.array(boxes))
      # print(lines)
  return np.array(Y_data)

Y_test = load_labels("/home/jupyter/cctv_datasets/cctv_storage_20231120/train/labels/Suwon_CH08_20200720_1900_MON_9m_NH_highway_OW5_sunny_FHD")

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def create_patch_rectangle(y, color):
  # # in yolov5
  width = int(y[2])
  height = int(y[3])
  return patches.Rectangle((int(y[0] - width/2), int(y[1] - height/2)),
                           width, height,
                           edgecolor=color, fill=False)

COLORS = [(0, 255/255, 0), (255/255, 255/255, 0), (255/255, 0, 0)]

def plot_image(image, boxes, axis):
  # # print(boxes.shape)
  for box in boxes:
    # print(box)
    class_id = int(box[0])
    # print(type(class_id), class_id)
    rect = create_patch_rectangle(box[1:], COLORS[class_id])
    axis.add_patch(rect)

  plt.imshow(image)

def plot_images(X, Y, limit=10):
  fig = plt.figure(figsize=(100, 80))

  last_id = min(limit, X.shape[0])
  for id in range(last_id):
    axis = fig.add_subplot(5, 3, id + 1)
    axis.get_xaxis().set_visible(False)
    axis.get_yaxis().set_visible(False)
    plot_image(X[id], Y[id], axis)

plot_images(np.array([X_test[-1]]), np.array([Y_test[-1]]))

## 데이터셋 경로 설정하기

In [None]:
import os
from glob import glob
dataPath = '/home/jupyter/cctv_datasets/cctv_storage_20231120/'
trainPath = os.path.join(dataPath,'train')
validPath = os.path.join(dataPath,'val')
# testPath  = os.path.join(dataPath,'test')

trainImagesPath = os.path.join(trainPath, 'images')
validImagesPath = os.path.join(validPath, 'images')
# testImagesPath  = os.path.join(testPath,  'images')

trainLabelsPath = os.path.join(trainPath, 'labels')
validLabelsPath = os.path.join(validPath, 'labels')
# testLabelsPath  = os.path.join(testPath,  'labels')

In [None]:
trainImagesPath, trainLabelsPath

In [None]:
import torch

In [None]:
from IPython.display import Image, clear_output  # to display images

clear_output()
print(f"Setup complete. Using torch {torch.__version__} \
    ({torch.cuda.get_device_properties(0).name if torch.cuda.is_available() else 'CPU'})")

## Train CCTV Detection Model

In [None]:
%cd {HOME}

In [None]:
%cd ultralytics
!ls -al

In [None]:
from IPython.core.magic import register_line_cell_magic

# yaml에 직접 쓰기를 실행하는 함수
@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, 'w') as f:
        f.write(cell.format(**globals()))

In [None]:
dataPath = '/home/jupyter/cctv_datasets/cctv_storage_20231120/'

In [None]:
dataYaml = os.path.join(dataPath,'data.yaml')
dataYaml

In [None]:
print(trainImagesPath)
print(validImagesPath)

In [None]:
%%writetemplate {dataYaml}
train: /home/jupyter/cctv_datasets/cctv_storage_20231120/train/images
val: /home/jupyter/cctv_datasets/cctv_storage_20231120/val/images

nc: 3
names: ['car', 'bus', 'truck']

In [None]:
%cd {HOME}

!yolo task=detect mode=train model=yolov8m.pt data=/home/jupyter/cctv_datasets/cctv_storage_20231120/data.yaml epochs=75 imgsz=800 plots=True batch=16 cache=True
# !yolo task=detect mode=train model=yolov8l.pt data=/home/jupyter/cctv_datasets/cctv_storage_20231120/data.yaml epochs=100 imgsz=1024 plots=True batch=8 cache=True workers=10

In [None]:
!ls {HOME}/runs/detect/train9/

In [None]:
%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train9/confusion_matrix.png', width=6000)

In [None]:
%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train9/results.png', width=6000)

In [None]:
%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train9/val_batch0_labels.jpg', width=6000)

In [None]:
%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train9/val_batch0_pred.jpg', width=6000)

In [None]:
# %cd {HOME}

# !yolo task=detect mode=predict model=yolov8m.pt data=/home/jupyter/cctv_datasets/cctv_storage_20231120/data.yaml epochs=100 imgsz=800 plots=True batch=16 cache=True

In [None]:
os.getcwd()

In [None]:
from ultralytics import YOLO

# Load a pretrained YOLOv8n model
model = YOLO('/home/jupyter/runs/detect/train11/weights/best.pt')

# Define source as YouTube video URL
source = '/home/jupyter/cctv_datasets/cctv_storage_20231120/IMG_0835.MOV'

# Run inference on the source
results = model(source, stream=True)  # generator of Results objects

In [None]:
model = '/home/jupyter/runs/detect/train11/weights/best.pt'
source = '/home/jupyter/cctv_datasets/cctv_storage_20231120/IMG_0835.MOV'

In [None]:
%cd ultralytics/

In [None]:
!pwd

In [None]:
# !pip install -r requirements.txt

In [None]:
# !pip install pafy

In [None]:
# !sudo apt-get install -y libgtk2.0-dev 
# !apt-get update
# !apt-get install -y qt5-default libxcb-xinerama0-dev

In [None]:
!pip list | grep ultra

In [None]:
!python -V

In [None]:
# !pip uninstall -y ultralytics
# !pip install ultralytics

In [None]:
!yolo task=detect mode=predict show=True model={model} source={source} imgsz=640 save=True name='highway_test'

In [None]:
# 데이터 추가
!unzip /home/jupyter/mount_folder/Training/'[원천]1-2.수도권영동선.zip' -d /home/jupyter/cctv_datasets/cctv_storage_20231120/train/images
!rm -rf Suwon_CH05*
!rm -rf Suwon_CH07*


### 테스트한 자동차 버스 트럭 수 찾기

In [None]:
testImages = '/home/jupyter/cctv_datasets/cctv_storage_20231120/train/images'
from glob import glob

testList =[]
for filename in glob(os.path.join(testImages, '*.png')):
    testList.append(filename)

In [None]:
from ultralytics import YOLO

resultList =[]

# Load a pretrained YOLOv8n model
model = YOLO('/home/jupyter/runs/detect/train11/weights/best.pt')

# Run inference on images
results = model(testList)

# View results
for result in results:
    resultList.append(result.cpu().numpy().boxes.cls)  # print the Boxes object containing the detection bounding boxes

In [None]:
import pandas as pd
sum = [0, 0, 0]

for result in resultList:
    for i in range(len(sum)):
        sum[i] += result.tolist().count(i)
print(sum)

print("전체 {}장의 이미지에서 승용차 {}대, 버스 {}대, 트럭 {}대를 발견하였습니다.".format(len(resultList), sum[0], sum[1], sum[2]))