In [2]:
import os
from pprint import pprint
import json
from tqdm.notebook import tqdm
import shutil
from random import shuffle
import cv2
import numpy as np

## 디렉토리 생성

In [28]:
image_root = '/home/work/road_mark/data_pothole/train_image/'
label_root = '/home/work/road_mark/data_pothole/train_json_label/'
preprocessed_image_root = '/home/work/road_mark/preprocessed_data_pothole/images/'
preprocessed_label_root = '/home/work/road_mark/preprocessed_data_pothole/labels/'
os.makedirs(preprocessed_image_root, exist_ok=True)
os.makedirs(preprocessed_label_root, exist_ok=True)

dataset_root = '/home/work/road_mark/data/pothole_data/'
train_image_root = f'{dataset_root}train/images/'
train_label_root = f'{dataset_root}train/labels/'
valid_image_root = f'{dataset_root}valid/images/'
valid_label_root = f'{dataset_root}valid/labels/'
test_image_root = f'{dataset_root}test/images/'
test_label_root = f'{dataset_root}test/labels/'
os.makedirs(train_image_root, exist_ok=True)
os.makedirs(train_label_root, exist_ok=True)
os.makedirs(valid_image_root, exist_ok=True)
os.makedirs(valid_label_root, exist_ok=True)
os.makedirs(test_image_root, exist_ok=True)
os.makedirs(test_label_root, exist_ok=True)

## 데이터 통계

In [30]:
count = 0
pothole_file_path_list = []
for file_name in tqdm(os.listdir(label_root)):
    file_path = os.path.join(label_root, file_name)
    with open(file_path, 'r', encoding='utf-8') as fp:
        data = json.load(fp)
    for annotation in data['annotations']:
        if annotation['category_id'] == 1:
            pothole_file_path_list.append(file_path)
            count += 1

unique_file_path_list = list(set(pothole_file_path_list))
unique_file_path_list.sort()
print(f'(포트홀 이미지 개수) {len(unique_file_path_list)}')
print(f'(포트홀 라벨 개수) {count}')

  0%|          | 0/140000 [00:00<?, ?it/s]

(포트홀 이미지 개수) 139235
(포트홀 라벨 개수) 625577


## 포트홀 라벨링 데이터 추출 및 복사

In [15]:
for file_path in tqdm(unique_file_path_list):
    shutil.copyfile(file_path, file_path.replace('train_json_label', 'train_pot_json_label'))
    shutil.copyfile(file_path.replace('json_label', 'image').replace('json', 'jpg'), file_path.replace('json_label', 'pot_image').replace('json', 'jpg'))

  0%|          | 0/13015 [00:00<?, ?it/s]

## 데이터 확인

### 데이터 통계

In [20]:
label_root = '/home/work/road_mark/data_pothole/train_pot_json_label/'
image_root = '/home/work/road_mark/data_pothole/train_pot_image/'

img_count = 0
label_count = 0
for file_name in tqdm(os.listdir(label_root)):
    with open(f'{label_root}{file_name}', 'r') as fp:
        data = json.load(fp)
    img_count += 1
    for annotation in data['annotations']:
        if annotation['category_id'] == 2:
            label_count += 1

print(f'(포트홀 이미지 개수) {img_count}')
print(f'(포트홀 라벨 개수) {label_count}')

  0%|          | 0/13015 [00:00<?, ?it/s]

(포트홀 이미지 개수) 13015
(포트홀 라벨 개수) 42075


### 이미지 상에 라벨 표시 및 확인

In [17]:
label_root = '/home/work/road_mark/data_pothole/train_pot_json_label/'
image_root = '/home/work/road_mark/data_pothole/train_pot_image/'
output_root = '/home/work/road_mark/data_pothole/train_pot_draw_image/'

label_name_list = os.listdir(label_root)
for file_name in tqdm(label_name_list[:10]):
    with open(f'{label_root}{file_name}', 'r') as fp:
        data = json.load(fp)
    
    image_file_name = file_name.replace('.json', '.jpg')
    image_path = os.path.join(image_root, image_file_name)
    image = cv2.imread(image_path)
    
    if image is None:
        print(f"Image {image_file_name} not found.")
        continue
    
    for annotation in data['annotations']:
        if annotation['category_id'] == 2:
            segmentation = annotation['segmentation'][0]
            x, y, w, h = segmentation
            start_point = (int(x), int(y))
            end_point = (int(x + w), int(y + h))
            color = (0, 255, 0)  # Green color
            thickness = 2
            image = cv2.rectangle(image, start_point, end_point, color, thickness)

    output_path = os.path.join(output_root, image_file_name)
    
    cv2.imwrite(output_path, image)

  0%|          | 0/10 [00:00<?, ?it/s]

## 데이터 전처리

### BBOX 좌표 YOLO 포맷으로 변환

In [18]:
def convert_to_yolo_format(bbox, image_width, image_height):
    yolo_bboxes = []
    x_min, y_min, box_width, box_height = bbox
    x_center = (x_min + box_width / 2) / image_width
    y_center = (y_min + box_height / 2) / image_height
    norm_width = box_width / image_width
    norm_height = box_height / image_height
    yolo_bboxes = [0, x_center, y_center, norm_width, norm_height]
    
    return yolo_bboxes

In [21]:
label_name_list = os.listdir(label_root)
label_name_list.sort()

for json_file_name in tqdm(label_name_list):
    with open(f'{label_root}{json_file_name}', 'r') as fp:
        data = json.load(fp)
    
    image_width = data['images'][0]['width']
    image_height = data['images'][0]['height']
    image_file_name = json_file_name.replace('.json', '.jpg')
    image_path = os.path.join(image_root, image_file_name)
    preprocessed_image_path = os.path.join(preprocessed_image_root, image_file_name)
    preprocessed_label_path = os.path.join(preprocessed_label_root, json_file_name.replace('.json', '.txt'))
    
    if image is None:
        print(f"Image {image_file_name} not found.")
        continue

    shutil.copyfile(image_path, preprocessed_image_path)

    yolo_label_list = []
    for annotation in data['annotations']:
        if annotation['category_id'] == 2:
            bbox = annotation['segmentation'][0]
            yolo_label = convert_to_yolo_format(bbox, image_width, image_height)
            yolo_label_list.append(yolo_label)

    with open(preprocessed_label_path, 'w') as fp:
        for val in yolo_label_list:
            input_data = list(map(str, val))
            fp.write(' '.join(input_data) + '\n')

  0%|          | 0/13015 [00:00<?, ?it/s]

### 데이터 분리

In [25]:
def split_list(lst, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    n = len(lst)
    train_end = int(n * train_ratio)
    val_end = train_end + int(n * val_ratio)
    
    train_list = lst[:10000]
    val_list = lst[10000:11000]
    test_list = lst[11000:12000]
    
    return train_list, val_list, test_list

In [26]:
label_root = '/home/work/road_mark/preprocessed_data_pothole/labels/'
image_root = '/home/work/road_mark/preprocessed_data_pothole/images/'

label_name_list = os.listdir(label_root)
shuffle(label_name_list)

train_label_name_list, valid_label_name_list, test_label_name_list = split_list(label_name_list)

for label_file_name in tqdm(train_label_name_list):
    image_file_name = label_file_name.replace('.txt', '.jpg')

    label_path = os.path.join(label_root, label_file_name)
    image_path = os.path.join(image_root, image_file_name)

    ouput_label_path = os.path.join(train_label_root, label_file_name)
    ouput_image_path = os.path.join(train_image_root, image_file_name)

    shutil.copyfile(label_path, ouput_label_path)
    shutil.copyfile(image_path, ouput_image_path)

for label_file_name in tqdm(valid_label_name_list):
    image_file_name = label_file_name.replace('.txt', '.jpg')

    label_path = os.path.join(label_root, label_file_name)
    image_path = os.path.join(image_root, image_file_name)

    ouput_label_path = os.path.join(valid_label_root, label_file_name)
    ouput_image_path = os.path.join(valid_image_root, image_file_name)

    shutil.copyfile(label_path, ouput_label_path)
    shutil.copyfile(image_path, ouput_image_path)

for label_file_name in tqdm(test_label_name_list):
    image_file_name = label_file_name.replace('.txt', '.jpg')

    label_path = os.path.join(label_root, label_file_name)
    image_path = os.path.join(image_root, image_file_name)

    ouput_label_path = os.path.join(test_label_root, label_file_name)
    ouput_image_path = os.path.join(test_image_root, image_file_name)

    shutil.copyfile(label_path, ouput_label_path)
    shutil.copyfile(image_path, ouput_image_path)

  0%|          | 0/10000 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]