### Label과 Class 정보 뽑아서 백업 만들기

In [None]:
##### 0번 label과 class만 뽑아 오기 #####
import os
import shutil

def copy_directories_and_clean(labels_dir, images_dir, new_labels_dir, new_images_dir):
    os.makedirs(new_labels_dir, exist_ok=True)
    os.makedirs(new_images_dir, exist_ok=True)

    # 기존 디렉토리의 모든 파일을 새로운 디렉토리로 복사
    for label_file in os.listdir(labels_dir):
        if label_file.endswith('.txt'):
            # 라벨 파일 복사
            old_label_path = os.path.join(labels_dir, label_file)
            new_label_path = os.path.join(new_labels_dir, label_file)
            shutil.copy2(old_label_path, new_label_path)

            # 대응하는 이미지 파일 복사
            image_file = label_file.replace('.txt', '.jpg')
            old_image_path = os.path.join(images_dir, image_file)
            new_image_path = os.path.join(new_images_dir, image_file)
            
            if os.path.exists(old_image_path):
                shutil.copy2(old_image_path, new_image_path)

    # 복사된 파일들을 대상으로 클래스 0만 남기고 나머지 삭제
    for label_file in os.listdir(new_labels_dir):
        if label_file.endswith('.txt'):
            label_path = os.path.join(new_labels_dir, label_file)
            
            with open(label_path, 'r') as f:
                lines = f.readlines()

            # 클래스 0인 라인만 남기기
            class_0_lines = [line for line in lines if line.strip().split()[0] == '0']

            if class_0_lines:
                # 클래스 0이 있는 경우: 라벨 파일을 덮어쓰기
                with open(label_path, 'w') as f:
                    f.writelines(class_0_lines)
            else:
                # 클래스 0이 없는 경우, 해당 라벨과 이미지 파일 삭제
                os.remove(label_path)
                image_file = label_file.replace('.txt', '.jpg')
                image_path = os.path.join(new_images_dir, image_file)
                if os.path.exists(image_path):
                    os.remove(image_path)

# 기존 디렉토리 경로
train_labels_dir = 'ultralytics_dataset/train/labels'
train_images_dir = 'ultralytics_dataset/train/images'
new_train_labels_dir = 'ultralytics_clustering/train/labels_class_0'
new_train_images_dir = 'ultralytics_clustering/train/images_class_0'

val_labels_dir = 'ultralytics_dataset/val/labels'
val_images_dir = 'ultralytics_dataset/val/images'
new_val_labels_dir = 'ultralytics_clustering/val/labels_class_0'
new_val_images_dir = 'ultralytics_clustering/val/images_class_0'

# 클래스 0만 추출하여 새로운 디렉토리에서 정제 (train set)
copy_directories_and_clean(train_labels_dir, train_images_dir, new_train_labels_dir, new_train_images_dir)

# 클래스 0만 추출하여 새로운 디렉토리에서 정제 (validation set)
copy_directories_and_clean(val_labels_dir, val_images_dir, new_val_labels_dir, new_val_images_dir)

### FPN 출력 결합

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
from ultralytics import RTDETR

model = RTDETR('rtdetr-l.pt')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.eval()

# 후크를 사용하여 FPN 출력 추출
fpn_features = []

# 후크를 등록할 함수 (FPN 레이어의 출력을 추출)
def get_fpn_features(module, input, output):
    fpn_features.append(output)

# FPN 관련된 RepC3 레이어들에 후크 등록
fpn_layer_indices = [16, 21, 24, 27]  # FPN에 해당하는 레이어들로 추정
for idx in fpn_layer_indices:
    module = model.model.model[idx]
    print(f"Registering hook on RepC3 at layer {idx}")
    module.register_forward_hook(get_fpn_features)
    
# 이미지 전처리 함수
def preprocess_image(image_path, input_size=1024):
    img = Image.open(image_path).convert('RGB')
    transform = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    img = transform(img).unsqueeze(0)  # 배치 차원 추가
    img = torch.clamp(img, 0.0, 1.0)

    return img
    
# 라벨 파일에서 바운딩 박스 정보 읽기 (YOLO 형식 -> 픽셀 좌표 변환)
def read_bboxes(label_path, img_width, img_height):
    bboxes = []
    with open(label_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            parts = line.strip().split()
            class_id = int(parts[0])
            if class_id == 0:  # general trash (클래스 번호 0번)
                # YOLO 형식 (x_center, y_center, width, height)을 픽셀 좌표로 변환
                x_center, y_center, width, height = map(float, parts[1:])
                x_center *= img_width
                y_center *= img_height
                width *= img_width
                height *= img_height
                x_min = int(x_center - width / 2)
                y_min = int(y_center - height / 2)
                x_max = int(x_center + width / 2)
                y_max = int(y_center + height / 2)
                bboxes.append([x_min, y_min, x_max, y_max])
    return bboxes

# 바운딩 박스 좌표를 feature map 크기에 맞게 스케일링
def scale_bboxes(bboxes, img_size, feature_map_size):
    img_width, img_height = img_size
    fpn_width, fpn_height = feature_map_size

    scaled_bboxes = []
    for bbox in bboxes:
        x_min, y_min, x_max, y_max = bbox
        x_min = int(x_min * fpn_width / img_width)
        y_min = int(y_min * fpn_height / img_height)
        x_max = int(x_max * fpn_width / img_width)
        y_max = int(y_max * fpn_height / img_height)
        scaled_bboxes.append([x_min, y_min, x_max, y_max])
    
    return scaled_bboxes

def extract_features(image_path, label_path):
    img = preprocess_image(image_path)
    img = img.to(model.device)  # 모델 장치로 이미지 이동
    
    # 원본 이미지 크기 (전처리된 이미지 크기가 아님)
    original_img = Image.open(image_path)
    img_width, img_height = original_img.size
    
    # 라벨 파일에서 바운딩 박스 정보 추출
    bboxes = read_bboxes(label_path, img_width, img_height)
    print(f"Found {len(bboxes)} bounding boxes in {image_path}")
    # 모델 추론을 수행하여 후크에서 FPN 출력 추출
    model(img)
    
    # FPN 출력 결합
    if fpn_features:  # 4개의 FPN 출력이 저장되어 있음
        print(f"Extracting features from {len(fpn_features)} FPN outputs")
        bbox_features = []
        
        for fpn_output in fpn_features:
            # FPN 피쳐맵 크기
            fpn_height, fpn_width = fpn_output.shape[2], fpn_output.shape[3]
            
            # 바운딩 박스 좌표를 feature map 크기에 맞게 변환
            scaled_bboxes = scale_bboxes(bboxes, (img_width, img_height), (fpn_width, fpn_height))
            
            # 바운딩 박스 내의 특징을 추출하여 결합
            for bbox in scaled_bboxes:
                x_min, y_min, x_max, y_max = bbox
                # 바운딩 박스 영역의 피쳐맵을 추출
                bbox_feature = fpn_output[:, :, y_min:y_max, x_min:x_max]
                # Global Average Pooling
                pooled_feature = torch.mean(bbox_feature, dim=[2, 3])  # (N, C)
                bbox_features.append(pooled_feature)

        fpn_features.clear()  # 다음 이미지를 위해 초기화

        if bbox_features:
            # 바운딩 박스별로 추출된 특징을 평균하여 하나의 벡터로 결합
            final_feature = torch.mean(torch.stack(bbox_features), dim=0)
            print(f"Feature extracted from {image_path}, shape: {final_feature.shape}")
            return final_feature.cpu().detach().numpy()
        else:
            print(f"No features extracted for {image_path}")
            return None
    else:
        print(f"No FPN features extracted for {image_path}")
        return None


### Clustering, 시각화 및 label 수정

In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
from tqdm import tqdm  
from ultralytics import RTDETR
from PIL import Image

# 모델 로드 및 전처리 함수는 이전 코드 재사용

# 이미지 경로 및 라벨 경로 설정
train_image_dir = 'ultralytics_clustering/train/images_class_0'
train_label_dir = 'ultralytics_clustering/train/labels_class_0'

# 서브 클래스로 나누기 위한 클러스터링 수행 함수
def perform_clustering(image_paths, label_paths, n_clusters=3):
    all_features = []
    all_bboxes = []
    all_image_info = []

    # 각 이미지에 대해 특징 추출
    for image_path, label_path in zip(image_paths, label_paths):
        feature_vector = extract_features(image_path, label_path)
        if feature_vector is not None:
            all_features.append(feature_vector)
            all_bboxes.append(read_bboxes(label_path, *Image.open(image_path).size))  # 바운딩 박스 정보 저장
            all_image_info.append((image_path, label_path))  # 이미지와 라벨 정보 저장

    # all_features를 numpy 배열로 변환
    all_features = np.array(all_features)
    
    # 3D 배열을 2D로 변환 (예: (N, C, H, W) -> (N, C*H*W))
    if len(all_features) > 0:
        flattened_features = all_features.reshape(all_features.shape[0], -1)  # (N, C*H*W)
        
        # NaN 값이 포함된 샘플을 찾기
        nan_mask = np.isnan(flattened_features).any(axis=1)
        
        # NaN 값이 있는 샘플을 제거
        flattened_features_clean = flattened_features[~nan_mask]
        all_bboxes_clean = [bbox for bbox, is_nan in zip(all_bboxes, nan_mask) if not is_nan]
        all_image_info_clean = [info for info, is_nan in zip(all_image_info, nan_mask) if not is_nan]
        
        # NaN이 모두 제거된 후 KMeans 클러스터링 수행
        if len(flattened_features_clean) > 0:
            kmeans = KMeans(n_clusters=n_clusters, random_state=7)
            clusters = kmeans.fit_predict(flattened_features_clean)
            return clusters, all_bboxes_clean, all_image_info_clean, flattened_features_clean
        else:
            raise ValueError("No valid features left after NaN removal.")
    else:
        raise ValueError("No features extracted for clustering.")


# 모든 이미지 경로 및 라벨 경로 수집
train_image_paths = [os.path.join(train_image_dir, img) for img in os.listdir(train_image_dir) if img.endswith('.jpg')]
train_label_paths = [os.path.join(train_label_dir, lbl) for lbl in os.listdir(train_label_dir) if lbl.endswith('.txt')]

# Train 데이터를 클러스터링
train_clusters, train_bboxes, train_image_info, train_features = perform_clustering(train_image_paths, train_label_paths)


# 클러스터링 결과를 바탕으로 라벨 파일 수정
def update_labels_with_clusters(clusters, bboxes, image_info):
    for cluster, bbox_list, (image_path, label_path) in zip(clusters, bboxes, image_info):
        with open(label_path, 'r') as f:
            lines = f.readlines()

        new_lines = []
        for i, line in enumerate(lines):
            parts = line.strip().split()

            cluster_to_class = {0: 11, 1: 12, 2: 13}
            new_class_id = cluster_to_class.get(cluster)
            parts[0] = str(new_class_id)

            new_lines.append(' '.join(parts) + '\n')

        # 변경된 내용을 다시 라벨 파일에 저장
        with open(label_path, 'w') as f:
            f.writelines(new_lines)

# Train 라벨 업데이트
update_labels_with_clusters(train_clusters, train_bboxes, train_image_info)

# 클러스터링 결과 시각화
def visualize_clusters(features, clusters, n_clusters=3):
    # 데이터 샘플 수
    n_samples = features.shape[0]
    
    if n_samples <= 1:
        print("ERROR: Not enough samples for clustering visualization.")
        return

    # 차원 축소 (t-SNE 사용하여 2D로 축소)
    perplexity = min(30, n_samples - 1)  # perplexity는 항상 n_samples보다 작아야 함
    print(f"Using perplexity = {perplexity} for {n_samples} samples.")
    
    tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42)
    reduced_features = tsne.fit_transform(features)

    # 클러스터별로 색상 지정
    plt.figure(figsize=(10, 8))
    colors = ['r', 'g', 'b']  # 클러스터 수에 맞는 색상 선택
    for i in range(n_clusters):
        cluster_points = reduced_features[clusters == i]
        plt.scatter(cluster_points[:, 0], cluster_points[:, 1], c=colors[i], label=f'Cluster {i}', alpha=0.6)

    plt.title('Clustering Visualization (t-SNE)')
    plt.xlabel('Dimension 1')
    plt.ylabel('Dimension 2')
    plt.legend()
    plt.grid(True)
    plt.show()

# Train 데이터의 클러스터링 결과 시각화
visualize_clusters(train_features, train_clusters)

print("클러스터링 및 라벨 업데이트 완료!")


### 각 클래스 별로 TSNE

In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from tqdm import tqdm  
from ultralytics import RTDETR
from PIL import Image

# 모델 로드 및 전처리 함수는 이전 코드 재사용

# 이미지 경로 및 라벨 경로 설정
train_image_dir = 'ultralytics_dataset/train(copy)/images'
train_label_dir = 'ultralytics_dataset/train(copy)/labels'

# 라벨 파일에서 클래스 정보를 추출하는 함수
def read_classes(label_path):
    with open(label_path, 'r') as f:
        lines = f.readlines()
        classes = [int(line.split()[0]) for line in lines]  # 첫 번째 요소가 클래스 레이블
    return classes

# 각 이미지에 대해 특징을 추출하고 클래스별로 시각화 준비
def extract_features_by_class(image_paths, label_paths):
    all_features = []
    all_labels = []
    all_image_info = []

    # 각 이미지에 대해 특징 추출
    for image_path, label_path in zip(image_paths, label_paths):
        feature_vector = extract_features(image_path, label_path)
        if feature_vector is not None:
            all_features.append(feature_vector)
            labels = read_classes(label_path)
            all_labels.append(labels)  # 각 이미지에 있는 모든 클래스 레이블 저장
            all_image_info.append((image_path, label_path))  # 이미지와 라벨 정보 저장

    # all_features를 numpy 배열로 변환
    all_features = np.array(all_features)

    # 3D 배열을 2D로 변환 (예: (N, C, H, W) -> (N, C*H*W))
    if len(all_features) > 0:
        flattened_features = all_features.reshape(all_features.shape[0], -1)  # (N, C*H*W)
        
        # NaN 값이 포함된 샘플을 찾기
        nan_mask = np.isnan(flattened_features).any(axis=1)
        
        # NaN 값이 있는 샘플을 제거
        flattened_features_clean = flattened_features[~nan_mask]
        all_labels_clean = [lbl for lbl, is_nan in zip(all_labels, nan_mask) if not is_nan]
        all_image_info_clean = [info for info, is_nan in zip(all_image_info, nan_mask) if not is_nan]
        
        return flattened_features_clean, all_labels_clean, all_image_info_clean
    else:
        raise ValueError("No features extracted for visualization.")

# 모든 이미지 경로 및 라벨 경로 수집
train_image_paths = [os.path.join(train_image_dir, img) for img in os.listdir(train_image_dir) if img.endswith('.jpg')]
train_label_paths = [os.path.join(train_label_dir, lbl) for lbl in os.listdir(train_label_dir) if lbl.endswith('.txt')]

# Train 데이터에서 특징 추출 및 클래스별 라벨 수집
train_features, train_labels, train_image_info = extract_features_by_class(train_image_paths, train_label_paths)

# 특징 시각화
def visualize_by_class(features, labels, class_names, n_components=2):
    # 데이터를 2D로 축소 (t-SNE 사용)
    n_samples = features.shape[0]
    perplexity = min(30, n_samples - 1)  # perplexity는 항상 n_samples보다 작아야 함
    
    print(f"Using perplexity = {perplexity} for {n_samples} samples.")
    tsne = TSNE(n_components=n_components, perplexity=perplexity, random_state=42)
    reduced_features = tsne.fit_transform(features)

    # 각 클래스에 대해 색상 지정
    plt.figure(figsize=(10, 8))
    unique_labels = sorted(set([lbl for label_list in labels for lbl in label_list]))  # 모든 고유한 클래스 추출
    colors = plt.cm.get_cmap('tab10', len(unique_labels))  # 고유 클래스 수에 맞는 색상 선택
    
    for class_idx in unique_labels:
        class_points = [reduced_features[i] for i, label_list in enumerate(labels) if class_idx in label_list]
        class_points = np.array(class_points)
        plt.scatter(class_points[:, 0], class_points[:, 1], label=class_names[class_idx], alpha=0.6, color=colors(class_idx))

    plt.title('Class-wise Feature Visualization (t-SNE)')
    plt.xlabel('Dimension 1')
    plt.ylabel('Dimension 2')
    plt.legend(loc='best')
    plt.grid(True)
    plt.show()

# 클래스 이름 목록 (예시로 추가, 사용자 환경에 맞게 변경)
class_names = ['General trash', 'Paper', 'Paper pack', 'Metal', 'Glass', 'Plastic', 'Styrofoam', 'Plastic bag', 'Battery', 'Clothing']

# Train 데이터의 특징을 클래스별로 시각화
visualize_by_class(train_features, train_labels, class_names)

print("클래스별 TSNE 시각화 완료!")


In [None]:
import os

# 기존 라벨 파일 경로 설정
train_label_dir = 'ultralytics_dataset/train/labels'
val_label_dir = 'ultralytics_dataset/val/labels'

# 클러스터링된 결과가 저장된 경로
clustered_label_dir = 'ultralytics_clustering/train/labels_class_0'

# 클러스터링된 레이블을 반영하여 기존 0번 클래스를 수정하는 함수
def update_labels_with_clustering(clustered_label_dir, original_label_dir):
    print("Train label files:", sorted(os.listdir(train_label_dir)))
    # 클러스터링된 레이블 파일 목록
    clustered_label_files = [f for f in os.listdir(clustered_label_dir) if f.endswith('.txt')]
    
    for clustered_file in clustered_label_files:
        # 기존 라벨 파일 경로
        original_label_path = os.path.join(original_label_dir, clustered_file)
        clustered_label_path = os.path.join(clustered_label_dir, clustered_file)
        
        # 기존 라벨 파일이 존재하는지 확인
        if not os.path.exists(original_label_path):
            print(f"WARNING: {original_label_path} not found. Skipping this file.")
            continue  # 파일이 없으면 스킵

        # 클러스터링된 라벨 파일 열기
        with open(clustered_label_path, 'r') as f_clustered:
            clustered_lines = f_clustered.readlines()
        
        # 기존 라벨 파일 열기
        with open(original_label_path, 'r') as f_original:
            original_lines = f_original.readlines()

        # 기존 라벨 파일에서 0번 클래스(General trash)만 클러스터링된 결과로 대체
        new_lines = []
        for original_line in original_lines:
            parts = original_line.strip().split()
            class_id = int(parts[0])

            # 0번 클래스인 경우 클러스터링된 레이블로 대체
            if class_id == 0:
                # 클러스터링된 레이블과 바운딩 박스 정보로 교체
                new_lines.extend(clustered_lines)
            else:
                # 다른 클래스는 그대로 유지
                new_lines.append(original_line)

        # 변경된 내용을 기존 라벨 파일에 저장
        with open(original_label_path, 'w') as f_original:
            f_original.writelines(new_lines)

# Train 데이터의 라벨 수정
update_labels_with_clustering(clustered_label_dir, train_label_dir)

# Val 데이터의 라벨 수정
update_labels_with_clustering(clustered_label_dir, val_label_dir)

print("0번 클래스에 대한 라벨이 클러스터링된 결과로 수정되었습니다.")


In [None]:
results = model.train(data='dataset_yaml/rtdetr.yaml', batch=8, epochs=10, imgsz=1024)
results = model.predict(source='ultralytics_dataset/test/images', batch=8, save=True)

import os
import pandas as pd
from tqdm import tqdm

# 저장할 폴더 경로
save_dir = 'results'

# 결과 저장 폴더가 없으면 생성
if not os.path.exists(save_dir):
    os.makedirs(save_dir)
    
# COCO 형식으로 저장할 데이터를 담을 딕셔너리
csv_data = {}

# 각 결과 처리
for result in tqdm(results):
    image_id = result.path.split('/')[-3] + '/' + result.path.split('/')[-1]  # 파일 이름에서 image_id 추출
    width, height = result.orig_shape[1], result.orig_shape[0]  # 이미지의 원본 크기 (width, height)
    prediction_string = ''
    for box in result.boxes:
        # YOLO 형식에서 COCO 형식으로 변환 (상대 좌표를 절대 좌표로 변환)
        bbox = box.xywh.tolist()[0]  # YOLO 형식에서 [x_center, y_center, width, height] 가져옴 (상대 좌표)
        # 상대 좌표 (비율)를 절대 좌표 (픽셀 단위)로 변환
        x_center, y_center, w, h = bbox
        x_min = max(0, x_center - w / 2)
        y_min = max(0, y_center - h / 2)
        x_max = min(width, x_center + w / 2)
        y_max = min(height, y_center + h / 2)

        
        category_id = int(box.cls.item())
        score = box.conf.item()

        # 11, 12, 13번 클래스를 0번으로 변경
        if category_id in [11, 12, 13]:
            category_id = 0

        # COCO 형식으로 bbox 저장
        bbox_str = f"{x_min} {y_min} {x_max} {y_max}"
        
        # 같은 image_id의 예측 결과를 한 줄로 합침
        prediction_string += f"{category_id} {score} {bbox_str} "

    # image_id에 대해 기존 예측이 있으면 이어 붙임
    if image_id in csv_data:
        csv_data[image_id] += prediction_string
    else:
        csv_data[image_id] = prediction_string

# 최종적으로 image_id와 PredictionString을 CSV로 저장
df = pd.DataFrame([{"PredictionString": pred_str, "image_id": image_id}  for image_id, pred_str in csv_data.items()])

df.to_csv(os.path.join(save_dir, 'rtdetr_l_epoch10_speicific_class.csv'), index=False)

print(f"Predictions saved to {save_dir}/rtdetr_l_epoch10_speicific_class.csv")
