In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
%cd drive/My\ Drive/FinalProject/

/content/drive/My Drive/FinalProject


# YOLO_V8 model load

In [3]:
!pip install ultralytics



In [4]:
from ultralytics import YOLO
import cv2
model = YOLO("yolov8s.pt")

# Real-time detecting using webcam

In [None]:
# webcam 연결
cap = cv2.VideoCapture(0)

while cap.isOpened():
    success, image = cap.read()     # 1개 frame을 image로 읽는다.
    if not success:
        print("Ignoring empty camera frame.")
        # If loading a video, use 'break' instead of 'continue'.
        break

    # To improve performance
    frame = cv2.flip(image, 1)
    img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # inference result
    result = model(img,
                   conf = 0.5,        # 'inference prob > 0.5'인 것만 결과로 return
                   verbose = False)   # 추론 결과 로그를 출력하지 않음

    # 결과에서 위치, 클래스 정보 추출
    xyxy_list = result.boxes.xyxy.to('cpu').numpy().astype('int32')
    cls_list = result.boxes.cls.to('cpu').numpy().astype('int32')
    conf_list = result.boxes.conf.to('cpu').numpy()

    # for in 문을 이용해 찾은 object 별로 bbox 처리를 한다.
    for xyxy, cls, conf in zip(xyxy_list, cls_list, conf_list):
        pt1, pt2 = xyxy[:2], xyxy[2:]
        txt = f'{result.names[cls]} - {conf*100:.3f}%'
        # box
        cv2.rectangle(frame, pt1, pt2, color=(255,0,0), thickness=2)
        cv2.putText(frame, txt, org=pt1, fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=0.5,
                    color=(200,0,0), thickness=1, lineType=cv2.LINE_AA)

    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()             # 웹캠연결 종료
cv2.destroyAllWindows()   # 출력 윈도우 종료

# Dataset pre-processing for YOLO Model

In [13]:
##### 사용안함 #####
label = ['PE드럼 정상', 'PE드럼 파손', 'PE안내봉 정상', 'PE안내봉 파손', '라바콘 정상', '라바콘 파손',
         '시선유도봉 정상', '시선유도봉 파손', '제설함 정상', '제설함 파손','PE입간판 정상', 'PE입간판 파손',
         'PE휀스 정상', 'PE휀스 파손', 'PE방호벽 정상', 'PE방호벽 파손']

label_dict = {label_name: idx for idx, label_name in enumerate(label)}

In [1]:
import glob
import json
import pandas as pd

json_file_path = '/content/drive/MyDrive/FinalProject/dataset/real_sunny_AM_20220614_112456_sedan_Dong-gu_00026.json'
with open(json_file_path, 'r') as json_file:
    data = json.load(json_file)

In [7]:
# 'images'에서 'width', 'height', 'file_name' 추출
image_data = [{'width': img['width'],
               'height': img['height'],
               'file_name': img['file_name']} for img in data['images']]

# 'annotations'에서 'category_id', 'bbox' 추출
annotation_data = [{'category_id': ann['category_id'],
                    'bbox': ann['bbox']} for ann in data['annotations']]

df_images = pd.DataFrame(image_data)
df_annotations = pd.DataFrame(annotation_data)

df_images

Unnamed: 0,width,height,file_name
0,1920,1080,real_sunny_AM_20220614_112456_sedan_Dong-gu_00...


In [25]:
df_annotations

Unnamed: 0,category_id,bbox
0,9,"[545.07, 216.31, 56.87, 208.71]"
1,9,"[167.11, 382.08, 132.67, 411.91]"
2,9,"[669.23, 156.16, 37.19, 149.74]"
3,9,"[409.57, 274.31, 78.65, 282.04]"
4,12,"[153.92, 185.8, 288.22, 139.0]"


### 여러 .json 파일에서 한번에 .txt 추출하기

In [30]:
# bbox 좌표: 좌상단 x, 좌상단 y, bbox의 W, box의 H
def convert_to_yolov8_format(bbox, image_width, image_height):
    x_center = (bbox[0] + bbox[2] / 2) / image_width
    y_center = (bbox[1] + bbox[3] / 2) / image_height
    width = bbox[2] / image_width
    height = bbox[3] / image_height

    return x_center, y_center, width, height

In [None]:
dst_dir = '/content/drive/MyDrive/FinalProject'
dataset_dir = '/content/drive/MyDrive/FinalProject/dataset'

# Iterate through all JSON files in the dataset directory
for json_file in os.listdir(dataset_dir):
    if json_file.endswith('.json'):
        json_path = os.path.join(dataset_dir, json_file)
        print(len(json_path))      # print

        with open(json_path, 'r') as f:
            data = json.load(f)

        # Process each annotation in the JSON file
        for annotation in data.get('annotations', []):
            label_path = os.path.join(dst_dir, "labels")

            if not os.path.exists(label_path):
                os.makedirs(label_path)

            image_name = data['images'][0]['file_name'].split('.')[0]
            print(f'image_name: {image_name}')

            image_width = data['images'][0]['width']
            image_height = data['images'][0]['height']
            print(f'image_width: {image_width}, image_height: {image_height}')

            label = annotation.get('category_id', None)
            bbox = annotation.get('bbox', None)
            print(f'label: {label}, bbox: {bbox}')

            if label is not None and bbox is not None:
                x_center, y_center, width, height = convert_to_yolov8_format(bbox=bbox,
                                                                             image_width=image_width,
                                                                             image_height=image_height)

                x_center = format(x_center, '.6f')
                y_center = format(y_center, '.6f')
                width = format(width, '.6f')
                height = format(height, '.6f')

                arr = list(map(str, [label, x_center, y_center, width, height]))
                data_line = ' '.join(arr) + '\n'

                # Save the data to a text file
                txt_filename = os.path.join(label_path, f"{image_name}.txt")
                with open(txt_filename, 'a', encoding='UTF-8') as txt_file:
                    txt_file.write(data_line)

In [46]:
eg = '/content/drive/MyDrive/FinalProject/labels/real_sunny_AM_20220614_112456_sedan_Dong-gu_00026.txt'

with open(eg, 'r', encoding='UTF-8') as file:
    content = file.read()

# (label, x_center, y_center, w, h) 순서로 데이터가 들어가 있는 것 확인!
print(content)

9 0.298701 0.296912 0.029620 0.193250
9 0.121586 0.544477 0.069099 0.381398
9 0.358242 0.213917 0.019370 0.138648
9 0.233799 0.384565 0.040964 0.261148
12 0.155224 0.236389 0.150115 0.128704



### AI hub dataset(.json) load - reference

In [32]:
import xmltodict

#label_dict
label = ['bicycle', 'bus', 'car', 'carrier', 'cat', 'dog', 'motorcycle',
        'movable_signage', 'person', 'scooter', 'stroller', 'truck', 'wheelchair',
        'barricade', 'bench', 'bollard', 'chair', 'fire_hydrant', 'kiosk', 'parking_meter',
        'pole', 'potted_plant', 'power_controller', 'stop', 'table', 'traffic_light',
        'traffic_light_controller', 'traffic_sign', 'tree_trunk']
label_dict = { label_name: idx for idx, label_name in enumerate(label)}

In [27]:
# reference data 확인
xml_path = '/content/drive/MyDrive/FinalProject/bbox_sample.xml'
with open(xml_path, 'rt', encoding="UTF-8") as f:
  doc = xmltodict.parse(f.read())

json_data = json.loads(json.dumps(doc))

In [28]:
json_data

{'annotations': {'version': '1.1',
  'meta': {'task': {'id': '1068',
    'name': 'bbox_sample',
    'size': '300',
    'mode': 'annotation',
    'overlap': '0',
    'bugtracker': None,
    'flipped': 'False',
    'created': '2019-08-20 07:51:38.306862+03:00',
    'updated': '2019-09-18 12:26:30.562392+03:00',
    'start_frame': '0',
    'stop_frame': '0',
    'frame_filter': '0',
    'labels': {'label': [{'name': 'traffic_light_controller',
       'attributes': None},
      {'name': 'power_controller', 'attributes': None},
      {'name': 'wheelchair', 'attributes': None},
      {'name': 'truck', 'attributes': None},
      {'name': 'tree_trunk', 'attributes': None},
      {'name': 'traffic_sign', 'attributes': None},
      {'name': 'traffic_light', 'attributes': None},
      {'name': 'table', 'attributes': None},
      {'name': 'stroller', 'attributes': None},
      {'name': 'stop', 'attributes': None},
      {'name': 'scooter', 'attributes': None},
      {'name': 'potted_plant', 'attri

In [30]:
dst_dir = "/content/drive/MyDrive/FinalProject"

In [None]:
# bbox 정보가 dict로 주어진 경우, 해당 정보를 YOLO 형식의 레이블로 변환하여 파일에 기록하자.
for image in json_data['annotations']['image']:
        # dst_dir과 "labels"를 결합하여 새로운 디렉토리 경로를 생성
        label_path = os.path.join(dst_dir, "labels")
        print(label_path)

        file_name = image['@name'].split('.')[0]
        print(file_name)

        image_w = int(image['@width'])
        image_h = int(image['@height'])
        print(f'image_w: {image_w}, image_h: {image_h}')

        if not os.path.exists(label_path):
          os.makedirs(label_path)

        f = open(os.path.join(label_path,file_name+".txt"), 'w', encoding='UTF-8')

        # 해당 이미지에 bbox가 하나만 있는지 확인
        # image['box']가 딕셔너리 형태인지 확인
        if isinstance(image['box'], dict):
            xl = float(box['@xtl'])          # bbox 왼쪽 상단 모서리의 x 좌표
            xr = float(box['@xbr'])          # bbox 오른쪽 하단 모서리 x 좌표
            yt = float(box['@ytl'])          # bbox 왼쪽 상단 모서리의 y 좌표
            yb = float(box['@ybr'])          # bbox 오른쪽 하단 모서리의 y 좌표

            x_center = ((xl+xr)/2)           # bbox의 중심 x 좌표 계산
            x_center = x_center/image_w      # img_width로 나눠서 x 좌표 정규화

            y_center = ((yt+yb)/2)           # bbox의 중심 y 좌표 계산
            y_center = y_center/image_h      # img_height로 나눠서 y 좌표 정규화

            w = (xr-xl)/image_w                 # bbox_width를 image_width로 나눠서 정규화
            h = (yb-yt)/image_h                 # bbox_height를 image_height로 나눠서 정규화

            x_center = format(x_center, '.6f')
            y_center = format(y_center, '.6f')
            w = format(w, '.6f')
            h = format(h, '.6f')

            label = label_dict[box['@label']]   # bbox label 추출

            # 레이블과 좌표 정보를 문자열로 변환하고 리스트로 변환
            arr = list(map(str, [label, x_center, y_center, w, h]))
            data = ' '.join(arr) + '\n'
            f.write(data)


        # 이미지에 여러 개의 바운딩 박스가 있는 경우
        # 리스트에 있는 각 바운딩 박스에 대해 반복문을 실행합
        else:
            for box in image['box']:
                xl = float(box['@xtl'])
                xr = float(box['@xbr'])
                yt = float(box['@ytl'])
                yb = float(box['@ybr'])

                x_center = ((xl+xr)/2)
                x_center = x_center/image_w

                y_center = ((yt+yb)/2)
                y_center = y_center/image_h

                w = (xr-xl)/image_w
                h = (yb-yt)/image_h

                x_center = format(x_center, '.6f')
                y_center = format(y_center, '.6f')
                w = format(w, '.6f')
                h = format(h, '.6f')

                label = label_dict[box['@label']]
                arr = list(map(str, [label, x_center, y_center, w, h]))
                data = ' '.join(arr) + '\n'
                f.write(data)
        f.close()

In [41]:
eg_ref = '/content/drive/MyDrive/FinalProject/labels/MP_SEL_B027534.txt'

with open(eg_ref, 'r', encoding='UTF-8') as file:
    content = file.read()

# (label, x_center, y_center, w, h) 순서로 데이터가 들어가 있는 것 확인!
print(content)

2 0.831401 0.685509 0.337198 0.628981
21 0.070563 0.710620 0.078250 0.151352
21 0.260490 0.647046 0.024021 0.106833
2 0.465635 0.516472 0.079354 0.111685
20 0.589583 0.265648 0.011458 0.531296
20 0.346458 0.335324 0.037396 0.670648
21 0.326380 0.626958 0.036927 0.098861
2 0.498966 0.473537 0.038318 0.065519
21 0.118172 0.747815 0.138135 0.222519
21 0.295354 0.632574 0.032313 0.100778
0 0.383185 0.533773 0.027276 0.092454
8 0.179016 0.595593 0.059375 0.301852
27 0.506518 0.379431 0.022026 0.026565
2 0.541771 0.484583 0.046250 0.066574



### Final; Inference code

In [6]:
import os
import cv2
import torch
import json
import math
from pathlib import Path
#from app.utils import dir_func
import numpy as np

In [None]:
from ultralytics.nn.autobackend import AutoBackend
from ultralytics.yolo.data.dataloaders.stream_loaders import LoadImages
from ultralytics.yolo.utils.checks import check_imgsz
from ultralytics.yolo.utils.plotting import Annotator, colors
from ultralytics.yolo.utils.ops import scale_boxes, non_max_suppression
from ultralytics.yolo.engine.results import Results

In [10]:
# 바운딩 박스가 이미지 내 어느 영역에 위치하는지를 결정
def find_location_idx(img_w, x_min, x_max):  # bbox의 최소,최대 x 좌표
    left_th = img_w // 3                     # 좌측 임계값은 이미지 폭의 1/3
    center_th = img_w * 2 // 3               # 중앙 임계값은 이미지 폭의 2/3
    x_center = (x_min + x_max) // 2
    if x_center < left_th:
        return 0
    elif x_center < center_th:
        return 1
    else:
        return 2

In [11]:
# bbox 위치에 따라 해당 bbox와 이미지 중심 간의 거리와 헤딩(방향)을 계산
def distance_heading(img_w, img_h, x_min, x_max, y_min, y_max):    # bbox 좌표
    delta_x = (x_min + x_max) / 2 - img_w / 2                      # x 좌표 중심이 이미지의 가운데로부터 얼마나 떨어져 있는지
    delta_y = y_max - img_h                                        # bbox의 상단이 이미지의 하단으로부터 얼마나 떨어져 있는지

    distance = math.sqrt(delta_x ** 2 + delta_y ** 2)              # bbox의 중심과 이미지 중심 간의 거리를 계산(유클리드)
    heading = -math.atan2(-delta_y, delta_x) * 180 / math.pi       # bbox의 중심을 향하는 방향을 나타내는 헤딩을 계산(아크탄젠트)
    return distance, heading                                       # 계산된 거리와 헤딩을 튜플로 묶어 반환

In [None]:
def detect(src: str, session_id: str, conf_thres=0.25, THRESHOLD_y=0.7):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print('Inference with', device)

    tmp_path = os.path.join(APP_PATH, "tmp", session_id)
    img_dst = os.path.join(tmp_path, "img_dir")
    TXT_FILE = os.path.join(tmp_path, f'dist_degree.txt')
    dir_func(img_dst, rmtree=False, mkdir=True)

    model = AutoBackend(weights=os.path.join(MODEL_DIR, "yolov8", 'yolov8n_custom.pt'), device=device, dnn=False, fp16=False)
    model.eval()

    stride, names, pt = model.stride, model.names, model.pt
    imgsz = check_imgsz((640, 640), stride=stride)
    dataset = LoadImages(src, imgsz=imgsz, stride=stride, auto=pt, vid_stride=1)

    for batch in dataset:
        _, img_nparr, im0s, vid_cap, s = batch
        mask_h, mask_w, _ = im0s.shape
        mask = np.zeros(im0s.shape, np.uint8)
        mask_thres1 = np.zeros((mask_h, mask_w), np.uint8)
        mask_thres2 = np.zeros((mask_h, mask_w), np.uint8)
        break

    json_obj = {}
    # threshold line settings
    far = 0.18
    middle = 0.145
    near = 0.1

    # threshold angle settings
    angle_far = 15
    angle_middle = 30
    angle_near = 90
    angle_center = 270

    # ellipse ratio
    ellipse_value = 2.5

    # color settings
    YELLOW = (0, 255, 255)
    RED = (0, 0, 255)

    # masking value
    bitmask = 255

    # mask = np.zeros((720,1280,3),np.uint8)
    cv2.ellipse(mask, (int(mask_w / 2), mask_h), (int(mask_h * far * ellipse_value), int(mask_h * far)), 0, angle_center - angle_near, angle_center + angle_near, YELLOW, -1)
    cv2.ellipse(mask, (int(mask_w / 2), mask_h), (int(mask_h * near * ellipse_value), int(mask_h * near)), 0, angle_center - angle_near, angle_center + angle_near, RED, -1)
    cv2.ellipse(mask, (int(mask_w / 2), mask_h), (int(mask_h * middle * ellipse_value), int(mask_h * middle)), 0, angle_center - angle_middle, angle_center + angle_middle, RED, -1)
    cv2.ellipse(mask, (int(mask_w / 2), mask_h), (int(mask_h * far * ellipse_value), int(mask_h * far)), 0, angle_center - angle_far, angle_center + angle_far, RED, -1)

    # mask_thres1
    cv2.ellipse(mask_thres1, (int(mask_w / 2), mask_h), (int(mask_h * far * ellipse_value), int(mask_h * far)), 0, angle_center - angle_near, angle_center + angle_near, bitmask, -1)
    # mask_thres2
    cv2.ellipse(mask_thres2, (int(mask_w / 2), mask_h), (int(mask_h * near * ellipse_value), int(mask_h * near)), 0, angle_center - angle_near, angle_center + angle_near, bitmask, -1)
    cv2.ellipse(mask_thres2, (int(mask_w / 2), mask_h), (int(mask_h * middle * ellipse_value), int(mask_h * middle)), 0, angle_center - angle_middle, angle_center + angle_middle, bitmask, -1)
    cv2.ellipse(mask_thres2, (int(mask_w / 2), mask_h), (int(mask_h * far * ellipse_value), int(mask_h * far)), 0, angle_center - angle_far, angle_center + angle_far, bitmask, -1)

    import time
    t1 = time.time()

    for frame_idx, batch in enumerate(dataset, 1):
        _, img_nparr, im0s, vid_cap, s = batch
        annotator = Annotator(im0s, line_width=2, example=str(names))  # disable when time measurement
        img_nparr = torch.from_numpy(img_nparr)

        img_nparr = img_nparr.float()
        img_nparr /= 255.0
        if len(img_nparr.shape) == 3:
            img_nparr = img_nparr[None]

        img_nparr = img_nparr.to(device)
        bboxes_data = non_max_suppression(model(img_nparr), conf_thres=conf_thres, iou_thres=0.45, classes=None, agnostic=False, max_det=1000)

        for i, bbox in enumerate(bboxes_data):
            shape = im0s[i].shape if isinstance(im0s, list) else im0s.shape
            img_h, img_w, _ = shape
            bbox[:, :4] = scale_boxes(img_nparr.shape[2:], bbox[:, :4], (img_h, img_w)).round()

            json_obj[f'{frame_idx:04d}'] = {}
            # warn_obj=[]
            with open(TXT_FILE, 'a') as f:
                f.write(f'{frame_idx:04d}:\n')
                for obj_id, obj in enumerate(reversed(Results(boxes=bbox, orig_shape=(img_h, img_w)).boxes), 1):
                    bbox = obj.xyxy.squeeze()
                    x_min, y_min, x_max, y_max = bbox_list = bbox.tolist()

                    dist, angle = distance_heading(img_w, img_h, *bbox_list)

                    f.write(f'{obj_id:02d} {dist:.1f} {angle:.1f} {bbox}\n')

                    warn = 3
                    np_size = (int(y_max - y_min), int(x_max - x_min))

                    if np.any((mask_thres1[int(y_min):int(y_max), int(x_min):int(x_max)] & np.ones(np_size, np.uint8)) > 0):
                        warn = 2
                        if np.any((mask_thres2[int(y_min):int(y_max), int(x_min):int(x_max)] & np.ones(np_size, np.uint8)) > 0):
                            warn = 1
                    cls = obj.cls.squeeze()
                    c = int(cls)
                    label = f'{model.names[c]}'
                    annotator.box_label(bbox, label, color=colors(4 * (warn - 1), True))  # disable when time measurement

                    json_obj[f'{frame_idx:04d}'][f'{obj_id:02d}'] = {"class": f'{model.names[c]}',
                                                                     "warning_lv": f"{warn}",
                                                                     "location": f'{find_location_idx(img_w, x_min, x_max)}',
                                                                     "distance": round(dist, 2),
                                                                     "heading": round(angle, 1)}

        cv2.imwrite(os.path.join(img_dst, f"{frame_idx:04}.jpg"), cv2.addWeighted(mask, 0.2, annotator.result(), 0.8, 0))  # disable when time measurement
        # cv2.imwrite(os.path.join(img_dst, f"{frame_idx:04}.jpg"), im0s)  # enable when time measurement

    elapsedtime = time.time() - t1
    print(f'Inference time {elapsedtime}/Frames {frame_idx}')  # time measurement

    return json.dumps(json_obj, ensure_ascii=False, indent=None, sort_keys=True)