# 전처리

In [None]:
import os
import json
from tqdm import tqdm

def convert_imcoords_to_bbox(imcoords):
    # imcoords는 문자열 형태의 좌표를 파싱하여 리스트로 변환
    coords = list(map(float, imcoords.split(',')))
    x_values = coords[0::2]  # 짝수 인덱스는 x 좌표
    y_values = coords[1::2]  # 홀수 인덱스는 y 좌표
    
    # 좌표로부터 bbox의 중심 좌표(cx, cy), 너비(width), 높이(height) 계산
    x_min = min(x_values)
    x_max = max(x_values)
    y_min = min(y_values)
    y_max = max(y_values)
    
    cx = (x_min + x_max) / 2
    cy = (y_min + y_max) / 2
    width = x_max - x_min
    height = y_max - y_min
    
    return cx, cy, width, height

def filter_and_modify_json(directory, output_directory):
    # 지정된 디렉토리 내 모든 JSON 파일 확인
    for filename in tqdm(os.listdir(directory), desc="Processing files", unit="file"):
        if filename.endswith(".json"):
            input_file_path = os.path.join(directory, filename).replace('\\', '/')
            
            with open(input_file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)
            
            # 새로운 피처 저장 리스트
            filtered_features = []

            # 'features' 필드가 있는지 확인하고 반복
            if 'features' in data:
                for feature in data['features']:
                    if 'properties' in feature:
                        type_name = feature['properties'].get('type_name')
                        
                        # 'type_name'이 'ship(S)' 또는 'ship(L)'인 경우만 처리
                        if type_name in ['ship(S)', 'ship(L)']:
                            imcoords = feature['properties'].get('object_imcoords')
                            if imcoords:
                                # 좌표 변환 후 새로운 값 추가
                                cx, cy, width, height = convert_imcoords_to_bbox(imcoords)
                                feature['properties']['cx'] = cx
                                feature['properties']['cy'] = cy
                                feature['properties']['width'] = width
                                feature['properties']['height'] = height
                                
                                # 기존 데이터와 함께 새로운 properties로 업데이트
                                filtered_features.append(feature)

            # 새로운 JSON 구조 생성
            if filtered_features:
                new_data = {
                    "type": "FeatureCollection",
                    "features": filtered_features
                }

                # 출력 경로 설정 (출력 폴더에 저장)
                output_file_path = os.path.join(output_directory, filename).replace('\\', '/')
                with open(output_file_path, 'w', encoding='utf-8') as outfile:
                    json.dump(new_data, outfile, ensure_ascii=False, indent=4)

def check_for_other_classes(directory):
    other_classes_found = False
    other_classes = set()

    # 지정된 디렉토리 내 모든 JSON 파일 확인
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            file_path = os.path.join(directory, filename).replace('\\', '/')
            
            with open(file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)
            
            # 'features' 필드가 있는지 확인하고 반복
            if 'features' in data:
                for feature in data['features']:
                    if 'properties' in feature:
                        type_name = feature['properties'].get('type_name')
                        
                        # 'ship(S)' 또는 'ship(L)' 외의 다른 클래스가 있는지 확인
                        if type_name not in ['ship(S)', 'ship(L)']:
                            other_classes_found = True
                            other_classes.add(type_name)

    # 결과 출력
    if other_classes_found:
        print("다른 클래스가 발견되었습니다:", other_classes)
    else:
        print("모든 JSON 파일이 'ship(S)' 또는 'ship(L)'만 포함하고 있습니다.")
        
# Train
input_directory = './train_objects_labels'  # 입력 JSON 파일들이 있는 폴더
output_directory = './datasets/labels/train'  # 필터링 후 JSON 파일을 저장할 폴더

# 출력 폴더가 없으면 생성
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# 필터링 및 변환 실행
filter_and_modify_json(input_directory, output_directory)
check_for_other_classes(output_directory)

# valid
input_directory = './vaildate_objects_labels'  # 입력 JSON 파일들이 있는 폴더
output_directory = './datasets/labels/val'  # 필터링 후 JSON 파일을 저장할 폴더

# 출력 폴더가 없으면 생성
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# 필터링 및 변환 실행
filter_and_modify_json(input_directory, output_directory)
check_for_other_classes(output_directory)

## 네거티브 샘플 추가

In [None]:
import os
import json
import random

def create_empty_ship_json(input_directory, output_directory, num_empty_files=80):
    # JSON 파일 목록에서 num_empty_files 개수만큼 무작위로 선택
    json_files = [f for f in os.listdir(input_directory) if f.endswith(".json")]
    selected_files = random.sample(json_files, min(num_empty_files, len(json_files)))
    
    # 배가 없는 빈 JSON 파일 생성
    for filename in tqdm(selected_files, desc="Creating empty ship files", unit="file"):
        empty_data = {
            "type": "FeatureCollection",
            "features": []
        }
        # 원래 파일 이름을 그대로 사용하여 출력
        output_file_path = os.path.join(output_directory, filename).replace('\\', '/')
        
        with open(output_file_path, 'w', encoding='utf-8') as outfile:
            json.dump(empty_data, outfile, ensure_ascii=False, indent=4)


## 이미지 복사

In [None]:
import os
from tqdm import tqdm
import cv2

# .json 파일을 기반으로 .png 파일을 찾고, 해당 파일을 바로 복사하는 함수
def get_and_copy_img(json_path, img_path, save_dir):
    # save_dir이 없으면 디렉토리 생성
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # .json 파일을 기반으로 .png 파일을 찾아서 복사
    for filename in tqdm(os.listdir(json_path), desc="Processing files", unit="file"):
        # .json 파일명을 .png 파일명으로 변환
        png_file = filename.replace('.json', '.png')
        png_file = os.path.join(img_path, png_file).replace('\\', '/')
        
        # .png 파일이 존재하는지 확인하고, 존재하면 복사
        if os.path.exists(png_file):
            img = cv2.imread(png_file)
            
            cv2.imwrite(os.path.join(save_dir, png_file.split('/')[-1]), img)
            # shutil.copy(png_file, save_dir)
        else:
            print(f"Warning: {png_file} not found!")

## CSV 파일 생성

In [None]:
import csv
# .json 파일과 .png 파일 경로를 리스트로 반환하는 함수
def get_img_json_list(json_path, img_path):
    img_json_list = []
    for filename in tqdm(os.listdir(json_path), desc="Processing files", unit="file"):
        # .json 파일명을 .png 파일명으로 변환
        json_file = os.path.join(json_path, filename).replace('\\', '/')
        png_file = filename.replace('.json', '.png')
        png_file = os.path.join(img_path, png_file).replace('\\', '/')

        # .png 파일이 실제로 존재하는지 확인 후 리스트에 추가
        if os.path.exists(png_file):
            img_json_list.append([png_file, json_file])
        else:
            print(f"Warning: {png_file} not found!")

    return img_json_list

# 추출된 경로 리스트를 CSV 파일로 저장하는 함수
def save_to_csv(data, output_csv):
    # CSV 파일로 저장
    with open(output_csv, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        # 헤더 작성
        writer.writerow(["Image", "JSON"])
        # 데이터 작성
        writer.writerows(data)
        
# Train
label_dir = './datasets/labels/train'  # .json 파일들이 있는 디렉토리
img_dir = './datasets/images/train'  # .png 파일들이 있는 디렉토리
output_csv = 'train.csv'  # 경로 정보를 저장할 CSV 파일명

# .json과 .png 경로 추출
img_json_list = get_img_json_list(label_dir, img_dir)

# 경로 정보를 CSV 파일로 저장
save_to_csv(img_json_list, output_csv)

# Valid
label_dir = './datasets/labels/val'  # .json 파일들이 있는 디렉토리
img_dir = './datasets/images/val'  # .png 파일들이 있는 디렉토리
output_csv = 'valid.csv'  # 경로 정보를 저장할 CSV 파일명

# .json과 .png 경로 추출
img_json_list = get_img_json_list(label_dir, img_dir)

# 경로 정보를 CSV 파일로 저장
save_to_csv(img_json_list, output_csv)

## 라벨 데이터 생성(JSON -> CSV)

In [None]:
import os
import csv
import json
import matplotlib.pyplot as plt
from PIL import Image
import matplotlib.patches as patches
import numpy as np

# 유클리드 거리 계산 함수
def euclidean_distance(p1, p2):
    return np.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)

def manhattan_distance(coords):
    x_values = coords[0::2]  # 짝수 인덱스는 x 좌표
    y_values = coords[1::2]  # 홀수 인덱스는 y 좌표
    
    # 좌표로부터 bbox의 중심 좌표(cx, cy), 너비(width), 높이(height) 계산
    x_min = min(x_values)
    x_max = max(x_values)
    y_min = min(y_values)
    y_max = max(y_values)

    width = x_max - x_min
    height = y_max - y_min
    
    return width, height


# JSON 파일을 읽어들이는 함수 (BBox 좌표와 angle 포함)
def get_bbox_from_json(json_file):
    with open(json_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    results = []
    
    if 'features' in data:
        for feature in data['features']:
            if 'properties' in feature:
                # object_imcoords를 파싱하여 좌표 추출
                imcoords = feature['properties'].get('object_imcoords', '')
                if imcoords:
                    coords = list(map(float, imcoords.split(',')))

                    # 맨해튼 거리 계산
                    # width, height = manhattan_distance(coords)

                    coords = np.array(coords).reshape(-1, 2)  # 2D 좌표로 변환

                    # 중심 좌표 계산
                    cx = np.mean(coords[:, 0])
                    cy = np.mean(coords[:, 1])

                    # 인접한 두 점으로부터 너비와 높이 계산
                    width = euclidean_distance(coords[0], coords[1])
                    height = euclidean_distance(coords[1], coords[2])

                    # 각도 추출
                    angle = feature['properties'].get('object_angle', 0.0)

                    # 회전된 바운딩 박스 좌표 저장 (cx, cy, width, height, angle)
                    results.append((cx, cy, width, height, angle))
    
    return results

def get_coords_from_json(json_file):
    with open(json_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    results = []
    
    if 'features' in data:
        for feature in data['features']:
            if 'properties' in feature:
                # object_imcoords를 파싱하여 좌표 추출
                imcoords = feature['properties'].get('object_imcoords', '')
                class_name = feature['properties'].get('type_name', '')
                
                if imcoords:
                    coords = list(map(float, imcoords.split(',')))

                    results.append((class_name,coords))
    return results

def save_coords_to_txt(coords_list, output_file, img_path):
    with open(output_file, 'w') as f:
        for coords in coords_list:
            class_name, coords = coords
            coords = np.array(coords) / 1024.0
    
             # 각 좌표값이 0 미만이면 0, 1 이상이면 1로 클리핑
            coords = np.clip(coords, 0, 1)

            x1, y1, x2, y2, x3, y3, x4, y4 = coords
            
            if class_name == 'ship(S)':
                f.write(f"0 {x1:.6f} {y1:.6f} {x2:.6f} {y2:.6f} {x3:.6f} {y3:.6f} {x4:.6f} {y4:.6f}\n")
            elif class_name == 'ship(L)': 
                f.write(f"1 {x1:.6f} {y1:.6f} {x2:.6f} {y2:.6f} {x3:.6f} {y3:.6f} {x4:.6f} {y4:.6f}\n")

# 바운딩 박스 정보를 txt 파일로 저장하는 함수
def save_bbox_to_txt(bbox_list, output_file, img_path):
    with open(output_file, 'w') as f:
        for bbox in bbox_list:
            cx, cy, width, height, angle = bbox
            cx, cy, width, height, angle = cx/1024, cy/1024, width/1024, height/1024, angle

            # if cx < 0 or cy < 0 or width < 0 or height < 0 or cx > 1 or cy > 1 or width > 1 or height > 1:
                # print(f"Warning: {img_path} - bbox가 이미지 밖으로 벗어납니다.")
                # visualize_image_with_bbox(img_path, bbox_list)
            if cx < 0 or cx > 1:
                cx = max(0, min(cx, 1))
            if cy < 0 or cy > 1:
                cy = max(0, min(cy, 1))
            if width < 0 or width > 1:
                width = max(0, min(width, 1))
            if height < 0 or height > 1:
                height = max(0, min(height, 1))

            f.write(f"0 {cx:.6f} {cy:.6f} {height:.6f} {width:.6f} {angle:.6f}\n")
            # f.write(f"0 {cx:.6f} {cy:.6f} {height:.6f} {width:.6f}\n")


def visualize_image_with_coords(image_path, coords_list):
    img = Image.open(image_path)
    fig, ax = plt.subplots(1)
    ax.imshow(img)

    for class_name, coords in coords_list:
        # coords: [x1, y1, x2, y2, x3, y3, x4, y4]
        coords = np.array(coords)
        coords = coords.reshape(4, 2)  # 4개의 좌표쌍 (x, y)으로 변환
        
        # 다각형 그리기 (4개의 좌표를 사용한 회전된 바운딩 박스)
        polygon = patches.Polygon(coords, closed=True, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(polygon)
        
        # 클래스 이름 표시 (옵션)
        cx, cy = np.mean(coords[:, 0]), np.mean(coords[:, 1])  # 중심 좌표
        ax.text(cx, cy, class_name, color='blue', fontsize=12, ha='center')

    plt.axis('off')
    plt.show()


# CSV에서 이미지 경로와 JSON 경로를 불러오는 함수
def load_csv(csv_path):
    data = []
    with open(csv_path, newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        next(reader)  # 헤더 스킵
        for row in reader:
            img_path, json_path = row
            data.append((img_path, json_path))
    return data

# Train
csv_path = 'train.csv'  # CSV 파일 경로
data = load_csv(csv_path)  # 이미지와 JSON 경로 불러오기

# 5개의 이미지와 바운딩 박스를 시각화하고, BBox 정보를 txt로 저장
for i, (img_path, json_path) in enumerate(data):
    if os.path.exists(img_path) and os.path.exists(json_path):
        bbox_list = get_coords_from_json(json_path)  # JSON에서 BBox 추출
        
        # .txt 파일로 저장할 파일명 정의
        output_file = json_path.replace('.json', '.txt')  # .json 파일명을 .txt로 변경
        save_coords_to_txt(bbox_list, output_file, img_path)  # bbox 정보를 txt로 저장
        
        if i < 5:
            # 이미지와 BBox 시각화
            visualize_image_with_coords(img_path, bbox_list)
            
# Valid
csv_path = 'valid.csv'  # CSV 파일 경로
data = load_csv(csv_path)  # 이미지와 JSON 경로 불러오기

# 5개의 이미지와 바운딩 박스를 시각화하고, BBox 정보를 txt로 저장
for i, (img_path, json_path) in enumerate(data):
    if os.path.exists(img_path) and os.path.exists(json_path):
        bbox_list = get_coords_from_json(json_path)  # JSON에서 BBox 추출
        
        # .txt 파일로 저장할 파일명 정의
        output_file = json_path.replace('.json', '.txt')  # .json 파일명을 .txt로 변경
        save_coords_to_txt(bbox_list, output_file, img_path)  # bbox 정보를 txt로 저장
        
        if i < 5:
            # 이미지와 BBox 시각화
            visualize_image_with_coords(img_path, bbox_list)


## 데이터셋 병합(AIHub + DataON)

In [None]:
import os
import shutil

# 기존 클래스 ID에서 새 클래스 ID로 매핑
class_mapping = {
    0: 0,  # motorboat -> ship(s)
    1: 0,  # sailboat -> ship(s)
    2: 0,  # tugboat -> ship(L)
    3: 1,  # barge -> ship(L)
    4: 0,  # fishing boat -> ship(s)
    5: 1,  # ferry -> ship(L)
    6: 1,  # container ship -> ship(L)
    7: 1,  # oil tanker -> ship(L)
    8: 1,  # drill ship -> ship(L)
    9: 1,  # warship -> ship(L)
}

# 라벨과 이미지 원본 디렉터리
original_labels_dir = './dataset2/labels'
original_images_dir = './dataset2/images'

# 새로 복사할 디렉터리
new_labels_dir = './datasets/labels'
new_images_dir = './datasets/images'

# 폴더가 없는 경우 생성
if not os.path.exists(new_labels_dir):
    os.makedirs(new_labels_dir, exist_ok=True)
if not os.path.exists(new_images_dir):
    os.makedirs(new_images_dir, exist_ok=True)

# 라벨 파일을 업데이트하여 새 디렉터리에 저장하는 함수
def update_label_file(file_path, new_file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    new_lines = []
    for line in lines:
        parts = line.strip().split()
        original_class_id = int(parts[0])
        if original_class_id in class_mapping:
            new_class_id = class_mapping[original_class_id]
            parts[0] = str(new_class_id)
            new_line = ' '.join(parts)
            new_lines.append(new_line)
        else:
            print(f"경고: 클래스 ID {original_class_id}가 매핑에 없습니다.")
    
    # 새 라벨 파일로 저장
    os.makedirs(os.path.dirname(new_file_path), exist_ok=True)  # 하위 디렉터리 생성
    with open(new_file_path, 'w') as file:
        file.write('\n'.join(new_lines))

# 라벨 파일 업데이트 및 복사
for root, dirs, files in os.walk(original_labels_dir):
    for filename in files:
        if filename.endswith('.txt'):
            old_file_path = os.path.join(root, filename)
            # 기존 경로에서 새로운 라벨 경로로 변환 (dataset/labels/train, dataset/labels/val 등)
            relative_path = os.path.relpath(old_file_path, original_labels_dir)
            new_file_path = os.path.join(new_labels_dir, relative_path)
            update_label_file(old_file_path, new_file_path)
            print(f"{new_file_path} 업데이트 완료")

# 이미지 파일 복사
for root, dirs, files in os.walk(original_images_dir):
    for filename in files:
        if filename.endswith(('.png', '.jpg', '.jpeg')):  # 이미지 확장자에 따라 필터링
            old_file_path = os.path.join(root, filename)
            # 기존 경로에서 새로운 이미지 경로로 변환 (dataset/images/train, dataset/images/val 등)
            relative_path = os.path.relpath(old_file_path, original_images_dir)
            new_file_path = os.path.join(new_images_dir, relative_path)
            os.makedirs(os.path.dirname(new_file_path), exist_ok=True)  # 하위 디렉터리 생성
            shutil.copy2(old_file_path, new_file_path)
            print(f"{new_file_path} 이미지 복사 완료")


# Yaml 파일 생성

In [7]:
import yaml

data = {
    'train': './images/train',  # 학습 데이터셋 경로
    'val': './images/val',  # 검증 데이터셋 경로
    'nc': 2,  # 클래스 개수
    'names': {
        0: 'ship(S)',
        1: 'ship(L)'
    }
}

# Save the data to a YAML file
with open('data_report.yaml', 'w') as file:
    yaml.dump(data, file, default_flow_style=False, sort_keys=False)

print("YAML file 'data_augment.yaml' created successfully.")


YAML file 'data_augment.yaml' created successfully.


# 파인튜닝

In [None]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
import json
# 하이퍼파라미터 튜닝
from ultralytics import YOLO
model = YOLO("yolo11x-obb.pt")
best_params = model.tune(
    data="data_report.yaml",
    batch=16,
    epochs=15,
    iterations=50,
    optimizer="AdamW",
    plots=True,
    save=False,   # 모든 iteration에서 저장하지 않도록 설정
    val=True,
    lr0=0.001,
    scale=0.2,
    seed=44,
)

# 최적 하이퍼파라미터 저장
with open("best_params.json", "w") as f:
    json.dump(best_params, f)
print("Best parameters saved to best_params.json.")

# 학습

In [None]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
from ultralytics import YOLO

# YOLO 모델 초기화
model = YOLO("yolo11x-obb.pt")
results = model.train(
    data = "data_report.yaml",
    name = "x_s512_best_args_report",
    seed=44,
    epochs=20,
    batch=12,
    imgsz=512,
    patience=15,
    device="0",
    optimizer = "AdamW",
    lr0= 0.00088,
    lrf= 0.01134,   
    momentum= 0.82858,
    weight_decay= 0.0006,
    warmup_epochs= 2.33952,
    warmup_momentum= 0.58988,
    box = 9.1,
    cls= 0.56241,
    dfl= 1.36767,
    hsv_h= 0.01707,
    hsv_s= 0.68688,
    hsv_v= 0.28102,
    degrees= 0.0,
    translate= 0.11263,
    shear= 0.0,
    perspective= 0.0,
    flipud= 0.45936,
    fliplr= 0.46892,
    bgr= 0.1,
    mosaic= 0.828,
    mixup= 0.0,
    copy_paste= 0.0,
    scale = 0.2,
    dropout = 0.3,
    )


# TASK 데이터 전처리, 추론 및 후처리

In [None]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True' 
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True  # 이미지 파일이 손상되었을 때 에러 발생 방지
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
from tqdm import tqdm
import shutil
import cv2
from ultralytics import YOLO
from ultralytics.utils.ops import nms_rotated


def get_imglist(dir="/workspace/dataset/"):
    imglist = [os.path.join(dir, f).replace("\\", "/") for f in os.listdir(dir) if f.endswith(('.png', '.npy'))]
    return imglist


def create_temp_dir(base_dir='./temp_cropped_patches'):
    
    # 임시 폴더가 이미 존재하면 삭제하고 새로 생성
    if os.path.exists(base_dir):
        shutil.rmtree(base_dir)
    os.makedirs(base_dir)
    return base_dir


def cluster_and_crop(image_path, k=3, size=1024, crop_num=43, threshold=0.7):
    # 이미지 로드 및 리사이즈
    img = Image.open(image_path)
    img_np = np.array(img)
    img_np = cv2.resize(img_np, (size, size), interpolation=cv2.INTER_LANCZOS4)
    
    
    # 이미지 데이터를 준비 (픽셀 수, 3) 형태로 변형
    img_data = img_np.reshape((-1, 3))
    img_data = np.float32(img_data)

    # K-Means 적용
    criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
    _, labels, centers = cv2.kmeans(img_data, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)

    # 클러스터링 결과를 이미지로 변환
    centers = np.uint8(centers)
    segmented_img = centers[labels.flatten()]
    segmented_img = segmented_img.reshape(img_np.shape)

    # 검은색에 가장 가까운 클러스터 찾기
    black_cluster_index = np.argmin(np.linalg.norm(centers, axis=1))  # 가장 작은 RGB 값이 검은색
    
    # crop 영역 추출
    num_patches = []
    crop_size = segmented_img.shape[1] // crop_num
    for i in range(crop_num):
        for j in range(crop_num):
            
            crop_img = segmented_img[i * crop_size:(i + 1) * crop_size, j * crop_size:(j + 1) * crop_size]

            # 검은색 클러스터 비율 확인
            cluster_0_value = centers[black_cluster_index]
            cluster_0_mask = np.all(crop_img == cluster_0_value, axis=-1)
            if np.mean(cluster_0_mask) > threshold:
                num_patches.append((i, j))

    
    return num_patches

def save_cropped_patches_as_numpy(image_path, 
                                  crop_size, 
                                  resize_size,
                                  cluster_img_size,
                                  cluster_threshold,
                                  save_dir,
                                  is_cluster):
    image_name = os.path.basename(image_path)
    print(f"Processing {image_name}")
    image = Image.open(image_path).convert('RGB')
    
    image_width, image_height = image.size
    
    if is_cluster:
        print("Procssing Clustering...",end="")
        num_patches = cluster_and_crop(image_path, 
                                    k=3, 
                                    size=cluster_img_size, 
                                    crop_num=(image_width + crop_size - 1) // crop_size, 
                                    threshold=cluster_threshold)
        print("Done. ")
    else:
        num_patches = [(i, j) for i in range((image_width + crop_size - 1) // crop_size) for j in range((image_height + crop_size - 1) // crop_size)]
        
    for i, j in tqdm(num_patches):
        top_left_x = i * crop_size
        top_left_y = j * crop_size
        bottom_right_x = min(top_left_x + crop_size, image_width)
        bottom_right_y = min(top_left_y + crop_size, image_height)
        
        if bottom_right_x - top_left_x < crop_size:
            top_left_x = max(image_width - crop_size, 0)
            bottom_right_x = image_width
        if bottom_right_y - top_left_y < crop_size:
            top_left_y = max(image_height - crop_size, 0)
            bottom_right_y = image_height
        
        cropped_image = image.crop((top_left_x, top_left_y, bottom_right_x, bottom_right_y))
        cropped_image = cropped_image.resize((resize_size, resize_size), resample=Image.Resampling.LANCZOS)
        
        # 기존 코드에서 cropped_image를 numpy 배열로 변환
        # OpenCV를 사용하여 리사이즈
        # cropped_image = np.array(cropped_image)
        # cropped_image = cv2.resize(cropped_image, (resize_size, resize_size), interpolation=cv2.INTER_LANCZOS4)

        # NumPy 배열로 저장
        crop_filename = f"{os.path.splitext(image_name)[0]}_{top_left_x}_{top_left_y}.npy"  # NumPy로 저장
        crop_path = os.path.join(save_dir, crop_filename)
        np.save(crop_path, np.array(cropped_image, dtype=np.uint8))  # NumPy 배열로 저장 (dtype 명시)


    del image

def load_numpy_image(npy_path):
    return np.load(npy_path)

    
class CroppedPatchDataset(Dataset):
    def __init__(self, crop_image_paths, resize_size):
        self.crop_image_paths = crop_image_paths
        self.transform = transforms.ToTensor()
        self.resize_size = resize_size

    def __len__(self):
        return len(self.crop_image_paths)

    def __getitem__(self, idx):
        crop_image_path = self.crop_image_paths[idx]
        try:
            cropped_image = load_numpy_image(crop_image_path)
            cropped_image_tensor = self.transform(cropped_image)
        except Exception as e:
            print(f"Error loading image {crop_image_path}: {e}")
            return None  # 이미지 로드 실패 시 None 반환

        # 패치의 위치 정보를 반환
        image_name = os.path.basename(crop_image_path)
        name_parts = image_name.split('_')
        top_left_x = int(name_parts[-2])
        top_left_y = int(name_parts[-1].split('.')[0])
        position = torch.tensor([top_left_x, top_left_y])

        original_image_name = '_'.join(name_parts[:-2]) + '.png'

        return {
            'image_name': original_image_name,
            'image': cropped_image_tensor,
            'top_left_position': position
        }

def collate_fn(batch):
    batch = list(filter(lambda x: x is not None, batch))  # None 제거
    return torch.utils.data.dataloader.default_collate(batch)

def run_model(image_names, images, positions, model, scale_factor, device,result):
    # 모델 예측을 GPU에서 수행
    preds = model.predict(images, conf=0.15, save=False, device=device)  # 예측 결과: [batch_size]

    for img_name, pred, pos in zip(image_names, preds, positions):
        if img_name not in result:
            result[img_name] = []

        top_left_x, top_left_y = pos[0].item(), pos[1].item()

        # pred를 반복문 전에 CPU로 이동
        pred = pred.cpu()

        # 좌표 변환 및 conf 값 포함하여 저장
        for i in range(len(pred)):
            bbox = pred.obb.xywhr[i]  # [x, y, w, h, r]
            confidence = pred.obb.conf[i]  # confidence 값

            # bbox가 비어 있는 경우 해당 항목을 넘김
            if len(bbox) == 0:
                continue

            # 예측된 좌표를 원본 이미지 좌표로 변환
            x = bbox[0].item() * scale_factor + top_left_x
            y = bbox[1].item() * scale_factor + top_left_y
            w = bbox[2].item() * scale_factor
            h = bbox[3].item() * scale_factor
            r = bbox[4].item()  # 각도 값은 변환 불필요

            # 변환된 좌표와 confidence를 결과 리스트에 추가
            result[img_name].append({
                'xywhr': [x, y, w, h, r],
                'conf': confidence.item()
            })
            
import os
import torch
from ultralytics import YOLO
from ultralytics.utils.ops import nms_rotated
from torch.utils.data import DataLoader

# 데이터 경로 설정
directory_path = '/workspace/dataset/'
# directory_path = './'
base_dir = './temp_cropped_patches'
img_list = get_imglist(directory_path)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 모델 설정
model_name = "./runs/obb/x_s512_best_args_report/weights/best.pt"
crop_size = 256
resize_size = 512
cluster_img_size = 1024
cluster_threshold = 0.0
batch_size = 32*1024*1024 // resize_size**2  
divide_num = 54  # 원하는 분할 수로 설정

model = YOLO(model_name)

result = {}  # 최종 결과를 저장할 딕셔너리

length = len(img_list)
img_list_chunks = [img_list[i:i + length // divide_num] for i in range(0, length, length // divide_num)]

for chunk_idx, img_chunk in enumerate(img_list_chunks):
    # 임시 폴더 생성
    temp_dir = create_temp_dir(base_dir=base_dir)
    print(f"Processing chunk {chunk_idx + 1}/{len(img_list_chunks)}")

    # 현재 청크의 이미지별 결과 리스트 초기화
    for image_path in img_chunk:
        image_name = os.path.basename(image_path)
        result[image_name] = []

    # 모든 이미지를 크롭하여 임시 폴더에 저장
    for image_path in img_chunk:
        image_name = os.path.basename(image_path)
        print(f"Cropping image: {image_name}")
        # 패치 저장 시 일관된 이름 지정
        save_cropped_patches_as_numpy(image_path, 
                                          crop_size, 
                                          resize_size, 
                                          cluster_img_size,
                                          cluster_threshold,
                                          temp_dir,
                                          is_cluster=False)

    # 임시 폴더 내의 모든 패치 이미지 리스트 가져오기
    temp_list = get_imglist(temp_dir)

    # 데이터셋 및 DataLoader 설정
    dataset = CroppedPatchDataset(temp_list, resize_size=resize_size)
    dataloader = DataLoader(dataset, batch_size=batch_size, num_workers=0)  # 필요에 따라 num_workers 조정

    for batch in dataloader:
        images = batch['image']
        positions = batch['top_left_position']
        patch_image_names = batch['image_name']

        # 모델 실행 및 결과 저장
        run_model(patch_image_names, images, positions, model, crop_size / resize_size, device, result)

    # 현재 청크의 모든 이미지에 대해 NMS 적용
    for image_path in img_chunk:
        image_name = os.path.basename(image_path)
        per_image_result = result[image_name]
        if per_image_result:
            # boxes와 scores 추출
            boxes = torch.tensor([pred['xywhr'] for pred in per_image_result])
            scores = torch.tensor([pred['conf'] for pred in per_image_result])

            # NMS 적용
            keep_indices = nms_rotated(boxes, scores, threshold=0.45)

            # NMS 결과를 최종적으로 업데이트
            result[image_name] = [per_image_result[i] for i in keep_indices]

    # 쿠다 캐시 제거 및 불필요한 메모리 제거
    torch.cuda.empty_cache()
    del dataset
    del dataloader
    # 임시 폴더 삭제
    shutil.rmtree(temp_dir)


# Submission 파일 생성

In [None]:
import csv
import math

# 저장할 CSV 경로
csv_file = "./submission.csv"
data = []  # CSV에 저장할 데이터를 담을 리스트

# 이미지 이름별로 데이터 변환
for image_name, predictions in result.items():
    if not predictions:
        continue
    
    # 각 예측 결과를 변환하여 data 리스트에 추가
    for pred in predictions:
        cx, cy, width, height, angle = pred['xywhr']
        
        # 각도 변환: 라디안 -> 도(degrees)
        angle_deg = math.degrees(angle)
        if angle_deg < 0:
            angle_deg += 360
        
        # 예측 결과를 리스트로 추가
        data.append([image_name, cx, cy, width, height, angle_deg])

# CSV 파일로 저장
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    
    # CSV의 헤더 작성
    writer.writerow(['image_name', 'cx', 'cy', 'width', 'height', 'angle'])
    
    # 각 행을 작성
    writer.writerows(data)

print(f"CSV 파일 '{csv_file}'이(가) 성공적으로 생성되었습니다.")

# 제출 코드

In [None]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
import aifactory.score as aif
import time

t = time.time()
aif.submit(model_name="top_256_512_00",
           key="128fd22e-34e1-4e7a-b9c9-3423c2e859ce")
print("time:", time.time() - t)