작업 디렉토리 설정

In [None]:
!pip install git+https://github.com/wkentaro/labelme.git

In [None]:
!pip install ultralytics
import os
import cv2
import numpy as np
import pandas as pd
import shutil
from google.colab import drive
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from skimage.feature import local_binary_pattern
from ultralytics import YOLO
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
import requests
from PIL import Image
import torch
from torch.utils.data import Dataset
import torchvision.transforms as transforms
import json
import labelme
import PIL.Image
from labelme import utils

# 📁 구글 드라이브 마운트 및 기본 경로 설정
drive.mount('/content/drive')
base_dir = '/content/drive/MyDrive/SmartSan_Project/OpenImages_YOLO'
os.makedirs(base_dir, exist_ok=True)

필요한 Open Images 파일 다운로드

In [None]:
# 클래스 설명 파일
!wget https://storage.googleapis.com/openimages/2018_04/class-descriptions-boxable.csv -P {base_dir}

# validation bounding box
!wget https://storage.googleapis.com/openimages/2018_04/validation/validation-annotations-bbox.csv -P {base_dir}

# validation 이미지 리스트
!wget https://storage.googleapis.com/openimages/2018_04/validation/validation-images-with-rotation.csv -P {base_dir}

- 필요한 라벨만 필터링해서 저장
- 해당 라벨에 포함된 이미지들만 리스트업

In [None]:
# 🧹 대상 클래스 필터링
bbox_csv_path = f"{base_dir}/validation-annotations-bbox.csv"
bbox_df = pd.read_csv(bbox_csv_path)
target_classes = {"/m/0c_jw": "sink"}
filtered_df = bbox_df[bbox_df['LabelName'].isin(target_classes.keys())]
filtered_df.to_csv(f"{base_dir}/filtered_validation_boxes.csv", index=False)
print(f"✅ 대상 클래스만 추출 완료: {filtered_df.shape[0]}개 bounding box")

이미지 다운로드 + 저장

In [None]:
# 🧳 이미지 다운로드
image_ids = filtered_df['ImageID'].unique()[:500]  # 최대 500장
image_dir = os.path.join(base_dir, 'images', 'train')
os.makedirs(image_dir, exist_ok=True)

base_img_url = "https://open-images-dataset.s3.amazonaws.com/validation"
for img_id in tqdm(image_ids):
    img_url = f"{base_img_url}/{img_id}.jpg"
    img_path = os.path.join(image_dir, f"{img_id}.jpg")
    try:
        r = requests.get(img_url, timeout=10)
        with open(img_path, 'wb') as f:
            f.write(r.content)
    except Exception as e:
        print(f"❌ {img_id} 다운로드 실패: {e}")

YOLO 포맷 라벨 파일 생성


In [None]:
# 🏷️ YOLO 포맷 라벨 생성
label_dir = os.path.join(base_dir, 'labels', 'train')
os.makedirs(label_dir, exist_ok=True)
class_id_map = {"/m/0c_jw": 0}  # sink 클래스 라벨
grouped = filtered_df.groupby('ImageID')
for img_id, group in grouped:
    if img_id not in image_ids:
        continue
    label_path = os.path.join(label_dir, f"{img_id}.txt")
    with open(label_path, 'w') as f:
        for _, row in group.iterrows():
            class_id = class_id_map[row['LabelName']]
            x_center = (row['XMin'] + row['XMax']) / 2
            y_center = (row['YMin'] + row['YMax']) / 2
            width = row['XMax'] - row['XMin']
            height = row['YMax'] - row['YMin']
            f.write(f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")

data.yaml 생성 (YOLO 학습 설정)

In [None]:
# 📄 data.yaml 생성
yaml_path = os.path.join(base_dir, "data.yaml")
with open(yaml_path, 'w') as f:
    f.write(f"""
path: {base_dir}
train: images/train
val: images/train

names:
  0: sink
""")
print("✅ data.yaml 생성 완료!")

YOLOv8 학습 시작

In [None]:
# 🧳 실사 이미지 및 라벨 자동 복사
zip_extracted_path = '/content/drive/MyDrive/sink_labels/'  # 라벨링 zip 푼 경로
img_train_path = os.path.join(base_dir, 'images/train')
label_train_path = os.path.join(base_dir, 'labels/train')

for file in os.listdir(zip_extracted_path):
    if file.endswith('.jpg'):
        shutil.copy(os.path.join(zip_extracted_path, file), img_train_path)
    elif file.endswith('.txt'):
        shutil.copy(os.path.join(zip_extracted_path, file), label_train_path)

print("✅ 실사 이미지 및 라벨 정리 완료!")

In [None]:
# 🧳 실사 이미지 및 라벨 자동 복사
zip_extracted_path = '/content/drive/MyDrive/sink_labels/'  # 라벨링 zip 푼 경로
img_train_path = os.path.join(base_dir, 'images/train')
label_train_path = os.path.join(base_dir, 'labels/train')

for file in os.listdir(zip_extracted_path):
    if file.endswith('.jpg'):
        shutil.copy(os.path.join(zip_extracted_path, file), img_train_path)
    elif file.endswith('.txt'):
        shutil.copy(os.path.join(zip_extracted_path, file), label_train_path)
print("✅ 실사 이미지 및 라벨 정리 완료!")

# 🔁 Fine-tuning 시작
model = YOLO('yolov8n.pt')  # 기존 best.pt 대신 경량 YOLO 모델로 시작
model.train(
    data=yaml_path,
    epochs=25,
    imgsz=960,
    batch=8
)
print("✅ Fine-tuning 완료!")

탐지 모델 로딩 및 적용(테스트 데이터)

In [None]:
# crop 및 output 폴더 초기화
crop_dir = '/content/drive/MyDrive/SmartSan_Project/cropped_dataset/sink'
output_dir = '/content/drive/MyDrive/SmartSan_Project/output'

for path in [crop_dir, output_dir]:
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path, exist_ok=True)

print("✅ crop_dir 및 output_dir 초기화 완료")

# YOLO 모델 로드
model = YOLO('runs/detect/train/weights/best.pt')

# 경로 설정
sink_labels_dirs = [
    '/content/drive/MyDrive/sink_labels',  # 첫 번째 라벨 폴더
    '/content/drive/MyDrive/sink_labels/sink_labels2',  # 두 번째 라벨 폴더
    '/content/drive/MyDrive/sink_labels/sink_labels3'  # 세 번째 라벨 폴더
]
raw_images_dir = '/content/drive/MyDrive/SmartSan_Project/raw_images'
output_dir = '/content/drive/MyDrive/SmartSan_Project/output'
os.makedirs(raw_images_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)

# 이미지 파일 자동 복사 (sink_labels2 라벨 파일에 맞는 이미지 복사)
for directory in sink_labels_dirs:
    label_files = [f for f in os.listdir(directory) if f.endswith('.txt')]  # .txt 라벨 파일만 읽기

    for label_file in label_files:
        # 이미지 파일명 생성
        img_name = label_file.replace('.txt', '.jpg')
        img_path = os.path.join(raw_images_dir, img_name)

        # 이미지가 이미 존재하는지 확인
        if not os.path.exists(img_path):
            # 이미지는 raw_images 폴더에 넣기 위해서 복사
            src_img_path = os.path.join(directory, img_name)

            if os.path.exists(src_img_path):  # 원본 이미지가 있는지 확인
                shutil.copy(src_img_path, img_path)  # 이미지를 raw_images 폴더로 복사
                print(f"✅ {img_name} 이미지 복사 완료.")
            else:
                print(f"❌ {img_name} 이미지를 찾을 수 없습니다.")
        else:
            print(f"✅ {img_name} 이미지가 이미 존재합니다.")

# 자동으로 탐지 및 결과 저장
image_files = [f for f in os.listdir(raw_images_dir) if f.endswith('.jpg')]

# 이미지가 있는지 확인
if len(image_files) == 0:
    print("❌ raw_images 폴더에 이미지 파일이 없습니다.")
else:
    print(f"✅ 총 {len(image_files)}개의 이미지가 존재합니다.")

# YOLO 탐지 수행 및 결과 저장
for img_file in image_files:
    img_path = os.path.join(raw_images_dir, img_file)

    if not os.path.exists(img_path):
        print(f"❌ {img_file} 없음 - 스킵")
        continue

    # YOLO 탐지 수행
    results = model(img_path)

    # 결과 이미지 저장
    save_path = os.path.join(output_dir, f"result_{img_file}")
    results[0].save(filename=save_path)

    print(f"✅ 탐지 완료: {img_file} 저장됨 -> {save_path}")

mAP 학습 결과 시각화

In [None]:
from IPython.display import Image, display

# mAP 그래프 시각화
result_graph_path = 'runs/detect/train/results.png'
display(Image(filename=result_graph_path))

YOLO 기반 자동 crop 코드

In [None]:
from PIL import Image

# YOLO 모델 로드
model = YOLO('runs/detect/train/weights/best.pt')

# 폴더 설정
raw_dir = '/content/drive/MyDrive/SmartSan_Project/raw_images'
crop_dir = '/content/drive/MyDrive/SmartSan_Project/cropped_dataset/sink'
os.makedirs(crop_dir, exist_ok=True)

# raw_images 폴더에 있는 모든 .jpg 이미지 파일 읽기
image_files = [f for f in os.listdir(raw_dir) if f.endswith('.jpg')]

# 이미지가 있는지 확인
if len(image_files) == 0:
    print("❌ raw_images 폴더에 이미지가 없습니다.")
else:
    print(f"✅ 총 {len(image_files)}개의 이미지가 존재합니다.")

# 이미지 반복 탐지 및 크롭
for img_file in image_files:
    img_path = os.path.join(raw_dir, img_file)

    if not os.path.exists(img_path):
        print(f"❌ {img_file} 없음 - 스킵")
        continue

    # YOLO 탐지
    results = model(img_path)
    img = Image.open(img_path)

    # sink class = 0 (sink만 크롭)
    for j, box in enumerate(results[0].boxes):
        cls_id = int(box.cls[0])
        if cls_id == 0:  # sink만 크롭
            xyxy = box.xyxy[0].cpu().numpy()
            cropped = img.crop(xyxy)

            # 이미지를 RGB로 변환하여 저장 (RGBA -> RGB)
            cropped = cropped.convert("RGB")

            crop_path = os.path.join(crop_dir, f'{img_file[:-4]}_crop{j}.jpg')
            cropped.save(crop_path)
            print(f"✅ 저장됨: {crop_path}")

U-Net 모델 정의

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        def conv_block(in_ch, out_ch):
            return nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
                nn.ReLU(inplace=True)
            )

        self.enc1 = conv_block(in_channels, 64)
        self.enc2 = conv_block(64, 128)
        self.enc3 = conv_block(128, 256)
        self.enc4 = conv_block(256, 512)

        self.pool = nn.MaxPool2d(2)
        self.bottleneck = conv_block(512, 1024)

        self.upconv4 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec4 = conv_block(1024, 512)
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec3 = conv_block(512, 256)
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = conv_block(256, 128)
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec1 = conv_block(128, 64)

        self.out = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        enc1 = self.enc1(x)
        enc2 = self.enc2(self.pool(enc1))
        enc3 = self.enc3(self.pool(enc2))
        enc4 = self.enc4(self.pool(enc3))
        bottleneck = self.bottleneck(self.pool(enc4))

        dec4 = self.dec4(torch.cat((self.upconv4(bottleneck), enc4), dim=1))
        dec3 = self.dec3(torch.cat((self.upconv3(dec4), enc3), dim=1))
        dec2 = self.dec2(torch.cat((self.upconv2(dec3), enc2), dim=1))
        dec1 = self.dec1(torch.cat((self.upconv1(dec2), enc1), dim=1))

        return torch.sigmoid(self.out(dec1))

Dice Loss + BCEDiceLoss

In [None]:
# DiceLoss에서는 sigmoid 제거 (입력에 이미 sigmoid 되어 있으니까)
class DiceLoss(nn.Module):
    def __init__(self, smooth=1.0):
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, inputs, targets):
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_score = (2. * intersection + self.smooth) / (inputs.sum() + targets.sum() + self.smooth)
        return 1 - dice_score


class BCEDiceLoss(nn.Module):
    def __init__(self, dice_weight=1.5):
        super(BCEDiceLoss, self).__init__()
        self.bce = nn.BCELoss()
        self.dice = DiceLoss()
        self.dice_weight = dice_weight

    def forward(self, inputs, targets):
        bce_loss = self.bce(inputs, targets)
        dice_loss = self.dice(inputs, targets)
        return bce_loss + self.dice_weight * dice_loss

json → mask 변환 시작

In [None]:
pip install albumentations

In [None]:
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms.functional as TF
import random

class JointTransform:
    def __call__(self, image, mask):
        if random.random() > 0.5:
            image = TF.hflip(image)
            mask = TF.hflip(mask)
        if random.random() > 0.5:
            angle = random.uniform(-15, 15)
            image = TF.rotate(image, angle)
            mask = TF.rotate(mask, angle)
        return image, mask


resize = transforms.Resize((256, 256))

class SinkSegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None, augment=False):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.augment = augment
        self.joint_transform = JointTransform()
        self.image_filenames = sorted([f for f in os.listdir(image_dir) if f.endswith(('jpg', 'png', 'jpeg'))])


    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        img_name = self.image_filenames[idx]
        img_path = os.path.join(self.image_dir, img_name)

        # 마스크 파일 이름 생성 (중복 방지)
        mask_filename = os.path.splitext(img_name)[0] + '_mask.png'
        mask_path = os.path.join(self.mask_dir, mask_filename)

        # 이미지/마스크 불러오기
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"))

        # 증강 적용 (학습용에만)
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask'].unsqueeze(0).float() / 255.0 # 1채널로 만들고 float 변환
        else:
            image = transforms.ToTensor()(image)
            mask = transforms.ToTensor()(mask)

        return image, mask

데이터셋 로딩

In [None]:
# 📊 학습 데이터 준비
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, random_split
from torch.utils.data import Subset

# Albumentations transform 정의
train_transform = A.Compose([
    A.Resize(256, 256),
    A.HorizontalFlip(p=0.5),
    A.Rotate(limit=15, p=0.5),
    A.ColorJitter(brightness=0.2, contrast=0.2, p=0.5),
    A.ISONoise(color_shift=(0.01, 0.05), intensity=(0.1, 0.5), p=0.3),
    A.Normalize(),
    ToTensorV2()
])
val_transform = A.Compose([
    A.Resize(256, 256),
    A.Normalize(),
    ToTensorV2()
])

# 경로 설정
image_dir = "/content/drive/MyDrive/SmartSan_Project/segmentation_labelme/images"
mask_dir = "/content/drive/MyDrive/SmartSan_Project/segmentation_labelme/masks"

# 인덱스 분할
all_indices = list(range(len(os.listdir(image_dir))))
train_indices, val_indices = train_test_split(all_indices, test_size=0.2, random_state=42)

# Dataset 정의 및 Subset 나누기
train_dataset_full = SinkSegmentationDataset(image_dir, mask_dir, transform=train_transform)
val_dataset_full = SinkSegmentationDataset(image_dir, mask_dir, transform=val_transform)

train_dataset = Subset(train_dataset_full, train_indices)
val_dataset = Subset(val_dataset_full, val_indices)

# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# 학습 설정
import torch.optim as optim
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet().to(device)
criterion = BCEDiceLoss(dice_weight=1.5)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 디버깅
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

# 학습 루프
from tqdm import tqdm
num_epochs = 30
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for images, masks in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        images = images.to(device)
        masks = masks.to(device).squeeze(1)
        outputs = model(images).squeeze(1)
        loss = criterion(outputs, masks)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    avg_loss = train_loss / len(train_loader)
    print(f"✅ Epoch {epoch+1} - 평균 학습 손실: {avg_loss:.4f}")

캐시 및 불필요한 객체 정리

In [None]:
import gc
import torch

gc.collect()
torch.cuda.empty_cache()

모델 시각화 코드

In [None]:
# 평가 모드로 설정
model.eval()

# 파일 경로
img_path = '/content/drive/MyDrive/SmartSan_Project/segmentation_labelme/images/test41.jpg'
mask_path = '/content/drive/MyDrive/SmartSan_Project/segmentation_labelme/masks/test41_mask.png'

# 전처리
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

# 이미지 불러오기
img = Image.open(img_path).convert("RGB")
input_tensor = transform(img).unsqueeze(0).to(device)

# 마스크 불러오기
mask = Image.open(mask_path).convert("L").resize((256, 256))
mask_tensor = transforms.ToTensor()(mask).squeeze().numpy()

# 추론
with torch.no_grad():
    pred = model(input_tensor)
    pred_mask = (pred.squeeze().cpu().numpy() > 0.5).astype(int)

# 시각화
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.imshow(img)
plt.title("Input Image")
plt.axis("off")

plt.subplot(1, 3, 2)
plt.imshow(mask_tensor, cmap="gray")
plt.title("Ground Truth")
plt.axis("off")

plt.subplot(1, 3, 3)
plt.imshow(pred_mask, cmap="gray")
plt.title("Predicted Mask")
plt.axis("off")

plt.tight_layout()
plt.show()

전체 예측 시각화 + 저장

In [None]:
def visualize_with_status(model, dataset, index, device, save_dir, show=True):
    model.eval()
    image, mask = dataset[index]

    with torch.no_grad():
        image_input = image.unsqueeze(0).to(device)
        output = model(image_input).squeeze().cpu()

    pred_mask = (output > 0.1).float()
    contamination = pred_mask.sum().item() / pred_mask.numel() * 100

    # 🎯 3단계 오염도 기준 (0~15 clean, 15~30 moderate, 30~ dirty)
    if contamination < 15:
        status = "Clean"
        box_color = 'blue'
    elif contamination < 30:
        status = "Moderate"
        box_color = 'orange'
    else:
        status = "Dirty"
        box_color = 'red'

    # 시각화
    fig, axs = plt.subplots(1, 3, figsize=(12, 4))

    axs[0].imshow(image.permute(1, 2, 0).numpy())
    axs[0].set_title(f"Input Image\n[{status}] {contamination:.2f}%")
    axs[0].add_patch(plt.Rectangle(
        (0, 0), image.shape[2], image.shape[1],
        linewidth=4, edgecolor=box_color, facecolor='none'
    ))

    axs[1].imshow(mask.squeeze().numpy(), cmap='gray')
    axs[1].set_title("Ground Truth")

    axs[2].imshow(pred_mask.squeeze().numpy(), cmap='gray')
    axs[2].set_title("Predicted Mask")

    for ax in axs:
        ax.axis("off")

    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, f"result_{index}.png")
    plt.tight_layout()
    plt.savefig(save_path)

    if show:
        plt.show()
    else:
        plt.close()

save_dir = "/content/drive/MyDrive/SmartSan_Project/results"
full_dataset = SinkSegmentationDataset(image_dir, mask_dir, transform=val_transform)

print(f"📊 전체 데이터셋 이미지 수: {len(full_dataset)}")

# 전체 이미지 시각화 + 저장
for i in range(len(full_dataset)):
    print(f"🔄 이미지 {i} 처리 중...")
    visualize_with_status(model, full_dataset, index=i, device=device, save_dir=save_dir, show=True)

Confusion Matrix + Classification Report

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

true_labels = []
pred_labels = []

for i in range(len(full_dataset)):
    image, mask = full_dataset[i]
    image_input = image.unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(image_input).squeeze().cpu()

    # 예측 오염도 (%)
    pred_mask = (output > 0.1).float()
    contamination = 100.0 * pred_mask.sum().item() / pred_mask.numel()

    # 예측 클래스
    if contamination <= 15:
        pred = 0  # Clean
    elif contamination <= 30:
        pred = 1  # Moderate
    else:
        pred = 2  # Dirty
    pred_labels.append(pred)

    # 실제 오염도 (%)
    true_contam = 100.0 * mask.sum().item() / mask.numel()
    if true_contam <= 15:
        true = 0
    elif true_contam <= 30:
        true = 1
    else:
        true = 2
    true_labels.append(true)

# 🔢 Confusion Matrix
cm = confusion_matrix(true_labels, pred_labels)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Clean", "Moderate", "Dirty"])
disp.plot(cmap="Blues")
plt.title("📉 Confusion Matrix")
plt.show()

# 📄 Classification Report
report = classification_report(true_labels, pred_labels, target_names=["Clean", "Moderate", "Dirty"])
print("📋 Classification Report:\n")
print(report)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!zip -r /content/drive/MyDrive/SmartSan_Project.zip /content/drive/MyDrive/SmartSan_Project