In [1]:
import torch
import cv2 as cv
from ultralytics import YOLO
import numpy as np
import os
import json
from pathlib import Path
import shutil
import random
from datetime import datetime
import yaml
import matplotlib.pyplot as plt
import time

### 1단계

In [2]:
# 1단계: 설치 확인

def check_installation():
    """설치된 패키지들 확인"""
    print("=" * 50)
    print("설치 확인 중...")
    print("=" * 50)
    
    # PyTorch 확인
    print(f"PyTorch: {torch.__version__}")
    print(f"Device: {'GPU' if torch.cuda.is_available() else 'CPU'}")
    
    # OpenCV 확인
    print(f"OpenCV: {cv.__version__}")
    
    print("\nYOLO 모델 테스트 중...")
    
    # YOLO 모델 로드 테스트
    try:
        model = YOLO('yolov8n.pt')
        print("YOLO 모델 로드 성공!")
        
        # 더미 이미지로 추론 테스트
        dummy_image = np.zeros((640, 640, 3), dtype=np.uint8)
        results = model(dummy_image)
        print("모델 추론 테스트 성공!")
        
        return True
        
    except Exception as e:
        print(f"YOLO 모델 테스트 실패: {e}")
        return False

if __name__ == "__main__":
    if check_installation():
        print("\n모든 설치가 완료되었습니다!")
        print("다음: python step2_augmented_labeling.py 실행")
    else:
        print("\n설치에 문제가 있습니다. 에러 메시지를 확인해주세요.")

설치 확인 중...
PyTorch: 2.5.1
Device: CPU
OpenCV: 4.12.0

YOLO 모델 테스트 중...
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 2.1MB/s 3.0s.9s<0.1s5sss
YOLO 모델 로드 성공!

0: 640x640 (no detections), 232.0ms
Speed: 23.0ms preprocess, 232.0ms inference, 1.2ms postprocess per image at shape (1, 3, 640, 640)
모델 추론 테스트 성공!

모든 설치가 완료되었습니다!
다음: python step2_augmented_labeling.py 실행


### 2단계

In [3]:
# 2단계: 데이터 준비 (데이터 증강 포함)

def detect_object_in_image(image):
    """이미지에서 객체 위치 자동 탐지"""
    height, width = image.shape[:2]
    
    gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
    blurred = cv.GaussianBlur(gray, (5, 5), 0)
    _, binary = cv.threshold(blurred, 0, 255, cv.THRESH_BINARY + cv.THRESH_OTSU)
    
    kernel = np.ones((3,3), np.uint8)
    cleaned = cv.morphologyEx(binary, cv.MORPH_CLOSE, kernel)
    cleaned = cv.morphologyEx(cleaned, cv.MORPH_OPEN, kernel)
    
    contours, _ = cv.findContours(cleaned, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    
    if contours:
        largest_contour = max(contours, key=cv.contourArea)
        x, y, w, h = cv.boundingRect(largest_contour)
        
        if w * h < width * height * 0.01:
            margin = 0.1
            x, y = int(width * margin), int(height * margin)
            w, h = int(width * 0.8), int(height * 0.8)
        
        center_x = (x + w / 2) / width
        center_y = (y + h / 2) / height
        norm_w = w / width
        norm_h = h / height
        
        return center_x, center_y, norm_w, norm_h
    else:
        return 0.5, 0.5, 0.8, 0.8

def augment_brightness(image):
    """밝기 변화 - 다양한 조명 환경 시뮬레이션"""
    variations = []
    
    variations.append(("original", image))
    variations.append(("dark", cv.convertScaleAbs(image, alpha=0.6, beta=0)))
    variations.append(("very_dark", cv.convertScaleAbs(image, alpha=0.4, beta=0)))
    variations.append(("bright", cv.convertScaleAbs(image, alpha=1.3, beta=10)))
    variations.append(("low_contrast", cv.convertScaleAbs(image, alpha=0.8, beta=0)))
    
    return variations

def add_noise(image, noise_level=10):
    """노이즈 추가 - 저조도 환경 시뮬레이션"""
    noise = np.random.normal(0, noise_level, image.shape).astype(np.int16)
    noisy = np.clip(image.astype(np.int16) + noise, 0, 255).astype(np.uint8)
    return noisy

def setup_augmented_dataset(source_path="trashnet", augment=True):
    """데이터 증강을 포함한 데이터셋 생성"""
    source = Path(source_path)
    target = Path("dataset")
    
    if target.exists():
        response = input("기존 dataset 폴더를 삭제하고 다시 만들까요? (y/n): ")
        if response.lower() == 'y':
            shutil.rmtree(target)
        else:
            print("취소됨")
            return False
    
    # 폴더 생성
    for split in ['train', 'val']:
        (target / "images" / split).mkdir(parents=True, exist_ok=True)
        (target / "labels" / split).mkdir(parents=True, exist_ok=True)
    
    classes = {'can': 0, 'glass': 1, 'paper': 2, 'plastic': 3, 'trash': 4, 'vinyl': 5}
    
    # 이미지 수집
    all_files = []
    print("\n이미지 수집 중...")
    for class_name, class_id in classes.items():
        class_dir = source / class_name
        if class_dir.exists():
            count = 0
            for ext in ['.jpg', '.jpeg', '.png']:
                images = list(class_dir.glob(f"*{ext}"))
                count += len(images)
                for img_path in images:
                    all_files.append((img_path, class_id, class_name))
            print(f"  {class_name}: {count}장")
    
    if len(all_files) < 10:
        print("이미지가 너무 적습니다 (최소 10장 필요)")
        return False
    
    print(f"\n총 {len(all_files)}장의 원본 이미지")
    
    random.shuffle(all_files)
    split_idx = int(len(all_files) * 0.8)
    
    processed = 0
    
    print("\n데이터 처리 중...")
    if augment:
        print("  증강 옵션: ON (원본 + 어두운/밝은 버전 생성)")
    else:
        print("  증강 옵션: OFF (원본만)")
    
    for i, (img_path, class_id, class_name) in enumerate(all_files):
        try:
            img = cv.imread(str(img_path))
            if img is None:
                continue
            
            img = cv.resize(img, (640, 640))
            split = 'train' if i < split_idx else 'val'
            
            # 증강 적용
            if augment and split == 'train':
                variations = augment_brightness(img)
                selected = [variations[0]]
                if len(variations) > 1:
                    selected.extend(random.sample(variations[1:], min(2, len(variations)-1)))
            else:
                selected = [("original", img)]
            
            # 저장
            for aug_type, aug_img in selected:
                if "dark" in aug_type and random.random() > 0.5:
                    aug_img = add_noise(aug_img, noise_level=15)
                
                cx, cy, w, h = detect_object_in_image(aug_img)
                img_name = f"{processed:05d}.jpg"
                
                cv.imwrite(str(target / "images" / split / img_name), aug_img)
                
                with open(target / "labels" / split / f"{processed:05d}.txt", 'w') as f:
                    f.write(f"{class_id} {cx:.6f} {cy:.6f} {w:.6f} {h:.6f}\n")
                
                processed += 1
            
            if (i + 1) % 50 == 0:
                print(f"  진행: {i+1}/{len(all_files)} ({processed}개 생성)")
                
        except Exception as e:
            continue
    
    print(f"\n처리 완료: {processed}개 이미지 생성")
    
    train_count = len(list((target / "images" / "train").glob("*.jpg")))
    val_count = len(list((target / "images" / "val").glob("*.jpg")))
    print(f"  훈련: {train_count}장")
    print(f"  검증: {val_count}장")
    
    # dataset.yaml 생성
    yaml_content = """path: ./dataset
train: images/train
val: images/val
nc: 6
names: ['can', 'glass', 'paper', 'plastic', 'trash', 'vinyl']
"""
    
    with open("dataset.yaml", "w") as f:
        f.write(yaml_content)
    
    print("\ndataset.yaml 생성 완료")
    return True

if __name__ == "__main__":
    print("데이터 증강 포함 데이터셋 생성")
    print("=" * 50)
    
    source = input("trashnet 폴더 경로 (엔터=trashnet): ").strip() or "trashnet"
    use_augment = input("데이터 증강 사용? (y/n, 엔터=y): ").strip().lower() or 'y'
    
    if setup_augmented_dataset(source, use_augment == 'y'):
        print("\n2단계 완료!")
        print("다음: python step3_train.py 실행")

데이터 증강 포함 데이터셋 생성



이미지 수집 중...
  can: 359장
  glass: 504장
  paper: 605장
  plastic: 466장
  trash: 82장
  vinyl: 119장

총 2135장의 원본 이미지

데이터 처리 중...
  증강 옵션: ON (원본 + 어두운/밝은 버전 생성)
  진행: 50/2135 (150개 생성)
  진행: 100/2135 (300개 생성)
  진행: 150/2135 (450개 생성)
  진행: 200/2135 (600개 생성)
  진행: 250/2135 (750개 생성)
  진행: 300/2135 (900개 생성)
  진행: 350/2135 (1050개 생성)
  진행: 400/2135 (1200개 생성)
  진행: 450/2135 (1350개 생성)
  진행: 500/2135 (1500개 생성)
  진행: 550/2135 (1650개 생성)
  진행: 600/2135 (1800개 생성)
  진행: 650/2135 (1950개 생성)
  진행: 700/2135 (2100개 생성)
  진행: 750/2135 (2250개 생성)
  진행: 800/2135 (2400개 생성)
  진행: 850/2135 (2550개 생성)
  진행: 900/2135 (2700개 생성)
  진행: 950/2135 (2850개 생성)
  진행: 1000/2135 (3000개 생성)
  진행: 1050/2135 (3150개 생성)
  진행: 1100/2135 (3300개 생성)
  진행: 1150/2135 (3450개 생성)
  진행: 1200/2135 (3600개 생성)
  진행: 1250/2135 (3750개 생성)
  진행: 1300/2135 (3900개 생성)
  진행: 1350/2135 (4050개 생성)
  진행: 1400/2135 (4200개 생성)
  진행: 1450/2135 (4350개 생성)
  진행: 1500/2135 (4500개 생성)
  진행: 1550/2135 (4650개 생성)
  진행: 1600/2135 (4800개 생성)
  진행

### 3단계

In [4]:
# 3단계: 모델 훈련
class WasteModelTrainer:
    def __init__(self, dataset_yaml="dataset.yaml"):
        self.dataset_yaml = dataset_yaml
        self.device = 'cpu'
        self.results_path = Path("training_results")
        self.results_path.mkdir(exist_ok=True)
        
        print("YOLO 모델 훈련 준비")
        print(f"Device: {self.device}")
    
    def check_dataset(self):
        print("\n데이터셋 상태 확인 중...")
        print("=" * 50)
        
        if not Path(self.dataset_yaml).exists():
            print(f"{self.dataset_yaml} 파일이 없습니다!")
            return False
        
        with open(self.dataset_yaml, 'r') as f:
            config = yaml.safe_load(f)
        
        dataset_path = Path(config['path'])
        train_images = dataset_path / config['train']
        val_images = dataset_path / config['val']
        train_labels = dataset_path / "labels" / "train"
        val_labels = dataset_path / "labels" / "val"
        
        folders_ok = True
        for folder, name in [(train_images, "훈련 이미지"), (val_images, "검증 이미지"), 
                            (train_labels, "훈련 라벨"), (val_labels, "검증 라벨")]:
            if folder.exists():
                count = len(list(folder.glob("*")))
                print(f"{name}: {count}개")
            else:
                print(f"{name}: 폴더 없음")
                folders_ok = False
        
        print(f"클래스 수: {config['nc']}")
        print(f"클래스: {config['names']}")
        print("=" * 50)
        
        if not folders_ok:
            print("필요한 폴더가 없습니다. 2단계를 먼저 완료해주세요.")
            return False
        
        print("데이터셋 준비 완료!")
        return True
    
    def train_model(self, model_size='n', epochs=50, batch_size=8, img_size=640):
        print(f"\n모델 훈련 시작!")
        print(f"모델 크기: YOLOv8{model_size}")
        print(f"에포크: {epochs}")
        print(f"배치 크기: {batch_size}")
        print(f"이미지 크기: {img_size}")
        print("=" * 50)
        
        model = YOLO(f'yolov8{model_size}.pt')
        print(f"YOLOv8{model_size} 사전훈련 모델 로드 완료")
        
        start_time = time.time()
        
        try:
            results = model.train(
                data=self.dataset_yaml,
                epochs=epochs,
                imgsz=img_size,
                batch=batch_size,
                device=self.device,
                project='runs/detect',
                name='waste_classification',
                save=True,
                plots=True,
                patience=10,
                save_period=10,
                workers=2,
                verbose=True,
                val=True,
                cache=False,
                amp=False,
            )
            
            training_time = time.time() - start_time
            hours = int(training_time // 3600)
            minutes = int((training_time % 3600) // 60)
            seconds = int(training_time % 60)
            
            print(f"\n훈련 완료!")
            print(f"훈련 시간: {hours}시간 {minutes}분 {seconds}초")
            print(f"결과 저장: runs/detect/waste_classification")
            
            best_model_path = Path("runs/detect/waste_classification/weights/best.pt")
            if best_model_path.exists():
                shutil.copy2(best_model_path, "best_waste_model.pt")
                print("최적 모델을 'best_waste_model.pt'로 복사했습니다.")
            
            return results
            
        except Exception as e:
            print(f"훈련 중 오류 발생: {e}")
            return None

def main():
    print("3단계: YOLO 쓰레기 분류 모델 훈련")
    print("=" * 50)
    
    trainer = WasteModelTrainer()
    
    if not trainer.check_dataset():
        print("\n데이터셋 문제가 있습니다. 2단계를 먼저 완료해주세요.")
        return
    
    print("\n빠른 훈련 설정 (30 에포크)")
    trainer.train_model(model_size='n', epochs=30, batch_size=4, img_size=416)
    
    print("\n3단계 완료!")
    print("다음: python step4_folder_test.py 실행")

if __name__ == "__main__":
    main()

3단계: YOLO 쓰레기 분류 모델 훈련
YOLO 모델 훈련 준비
Device: cpu

데이터셋 상태 확인 중...
훈련 이미지: 5124개
검증 이미지: 427개
훈련 라벨: 5124개
검증 라벨: 427개
클래스 수: 6
클래스: ['can', 'glass', 'paper', 'plastic', 'trash', 'vinyl']
데이터셋 준비 완료!

빠른 훈련 설정 (30 에포크)

모델 훈련 시작!
모델 크기: YOLOv8n
에포크: 30
배치 크기: 4
이미지 크기: 416
YOLOv8n 사전훈련 모델 로드 완료
New https://pypi.org/project/ultralytics/8.3.204 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.203  Python-3.11.13 torch-2.5.1 CPU (Intel Core Ultra 7 258V)
[34m[1mengine\trainer: [0magnostic_nms=False, amp=False, augment=False, auto_augment=randaugment, batch=4, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=dataset.yaml, degrees=0.0, deterministic=True, device=cpu, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=30, erasing=0.4, exist_ok=False, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False

### 4단계

In [None]:
# 4단계: 저장된 이미지 분석

class WasteImageAnalyzer:
    def __init__(self, model_path="best_waste_model.pt"):
        if not Path(model_path).exists():
            print(f"모델 파일이 없습니다: {model_path}")
            print("3단계를 먼저 완료하세요")
            return
        
        self.model = YOLO(model_path)
        print("모델 로드 완료")
    
    def analyze_folder(self, input_folder, output_folder):
        input_path = Path(input_folder)
        output_path = Path(output_folder)
        output_path.mkdir(exist_ok=True)
        
        images = list(input_path.glob('*.jpg'))
        images.extend(list(input_path.glob('*.jpeg')))
        images.extend(list(input_path.glob('*.png')))
        
        if not images:
            print(f"'{input_folder}'에 이미지가 없습니다")
            return
        
        print(f"\n{len(images)}개 이미지 분석 시작...")
        
        for img_path in images:
            print(f"\n분석 중: {img_path.name}")
            
            results = self.model(str(img_path), conf=0.3)
            
            detections = []
            for box in results[0].boxes:
                detections.append({
                    'class': self.model.names[int(box.cls[0])],
                    'confidence': round(float(box.conf[0]), 2),
                    'bbox': [int(x) for x in box.xyxy[0].tolist()]
                })
            
            if detections:
                for det in detections:
                    print(f"  발견: {det['class']} ({det['confidence']})")
            else:
                print("  탐지된 객체 없음")
            
            annotated = results[0].plot()
            result_img_path = output_path / f"{img_path.stem}_result.jpg"
            cv.imwrite(str(result_img_path), annotated)
            
            result_json = {
                'original': str(img_path),
                'result_image': str(result_img_path),
                'detections': detections,
                'count': len(detections)
            }
            
            json_path = output_path / f"{img_path.stem}_result.json"
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump(result_json, f, indent=2, ensure_ascii=False)
        
        print(f"\n완료! 결과: {output_folder}")

if __name__ == "__main__":
    analyzer = WasteImageAnalyzer()
    
    input_folder = input("분석할 이미지 폴더 경로 (기본: ./test_images): ").strip() or "./test_images"
    output_folder = input("결과 저장 폴더 경로 (기본: ./results): ").strip() or "./results"
    
    Path(input_folder).mkdir(exist_ok=True)
    print(f"\n'{input_folder}' 폴더에 테스트 이미지를 넣어주세요")
    
    input("준비되면 Enter를 누르세요...")
    
    analyzer.analyze_folder(input_folder, output_folder)

모델 로드 완료

'./test_images' 폴더에 테스트 이미지를 넣어주세요

6개 이미지 분석 시작...

분석 중: paper491.jpg

image 1/1 c:\Users\jk316\OneDrive\Desktop\practice4\test_images\paper491.jpg: 320x416 1 trash, 45.2ms
Speed: 1.7ms preprocess, 45.2ms inference, 0.7ms postprocess per image at shape (1, 3, 320, 416)
  발견: trash (0.82)

분석 중: paper559.jpg

image 1/1 c:\Users\jk316\OneDrive\Desktop\practice4\test_images\paper559.jpg: 320x416 1 plastic, 36.4ms
Speed: 0.7ms preprocess, 36.4ms inference, 0.5ms postprocess per image at shape (1, 3, 320, 416)
  발견: plastic (0.8)

분석 중: plastic171.jpg

image 1/1 c:\Users\jk316\OneDrive\Desktop\practice4\test_images\plastic171.jpg: 320x416 1 vinyl, 36.5ms
Speed: 0.6ms preprocess, 36.5ms inference, 0.6ms postprocess per image at shape (1, 3, 320, 416)
  발견: vinyl (0.79)

분석 중: plastic313.jpg

image 1/1 c:\Users\jk316\OneDrive\Desktop\practice4\test_images\plastic313.jpg: 320x416 1 vinyl, 36.4ms
Speed: 0.8ms preprocess, 36.4ms inference, 0.5ms postprocess per image at shape (1, 3, 