<a href="https://colab.research.google.com/github/jiyeonjin/0624_new/blob/main/0731_Ensamble.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# 1. 기본 패키지 업데이트
!apt update -qq

# 2. FFmpeg 설치 (비디오 처리용)
!apt install -y ffmpeg

# 3. Python 패키지 설치
!pip install ultralytics  # YOLOv8
!pip install yt-dlp       # YouTube 다운로더
!pip install opencv-python
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 4. 추가 의존성 패키지
!pip install numpy matplotlib pillow

36 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mSkipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)[0m
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 36 not upgraded.
Collecting ultralytics
  Downloading ultralytics-8.3.170-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_ru

In [None]:
# 비디오 파일 직접 업로드 방식 앙상블 객체 탐지
import subprocess
import sys
import os

def install_packages():
    """필요한 패키지 설치"""
    packages = ['ultralytics', 'opencv-python', 'numpy', 'matplotlib']

    for package in packages:
        try:
            if package == 'opencv-python':
                import cv2
            elif package == 'ultralytics':
                from ultralytics import YOLO
            else:
                __import__(package)
            print(f"✅ {package} 설치됨")
        except ImportError:
            print(f"📦 {package} 설치 중...")
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', package, '--quiet'])

# 패키지 설치
install_packages()

# 라이브러리 import
import cv2
import numpy as np
from collections import defaultdict
import zipfile
import warnings
warnings.filterwarnings('ignore')

from ultralytics import YOLO
from google.colab import files
import matplotlib.pyplot as plt
from IPython.display import display, HTML

print("✅ 모든 라이브러리 로딩 완료!")

class VideoEnsembleDetector:
    def __init__(self):
        """비디오 앙상블 객체 탐지기 초기화"""
        print("🤖 앙상블 모델 로딩 중...")

        try:
            # 가벼운 모델 하나로 다중 예측 앙상블
            self.model = YOLO('yolov8n.pt')
            print("✅ YOLOv8n 모델 로딩 완료")

            # 앙상블 설정: 다중 confidence threshold 사용
            self.ensemble_configs = [
                {'conf': 0.15, 'weight': 0.2},
                {'conf': 0.25, 'weight': 0.3},
                {'conf': 0.35, 'weight': 0.3},
                {'conf': 0.45, 'weight': 0.2}
            ]

        except Exception as e:
            print(f"❌ 모델 로딩 실패: {e}")
            return

        # 도로 주행 관련 클래스
        self.target_classes = {
            'person': 0, 'bicycle': 1, 'car': 2, 'motorcycle': 3,
            'bus': 5, 'truck': 7, 'traffic light': 9, 'stop sign': 11
        }

        # 색상 매핑
        self.colors = {
            'person': (0, 255, 0),      # 초록
            'bicycle': (255, 0, 0),     # 파랑
            'car': (0, 0, 255),         # 빨강
            'motorcycle': (255, 255, 0), # 시안
            'bus': (128, 0, 128),       # 보라
            'truck': (255, 165, 0),     # 주황
            'traffic light': (0, 255, 255), # 노랑
            'stop sign': (255, 0, 255)  # 마젠타
        }

        self.iou_threshold = 0.5
        print("🎯 앙상블 탐지기 초기화 완료!")

    def upload_video(self):
        """Colab에서 비디오 파일 업로드"""
        print("📁 비디오 파일을 선택해주세요...")
        print("💡 권장: MP4, AVI, MOV 등의 일반적인 비디오 형식")
        print("💡 권장: 파일 크기 100MB 이하, 길이 5분 이하")

        try:
            uploaded = files.upload()

            if not uploaded:
                print("❌ 파일이 업로드되지 않았습니다.")
                return None, None

            # 업로드된 파일 확인
            filename = list(uploaded.keys())[0]
            filesize_mb = len(uploaded[filename]) / (1024 * 1024)

            print(f"✅ 업로드 완료: {filename}")
            print(f"📏 파일 크기: {filesize_mb:.1f}MB")

            # 비디오 파일인지 확인
            video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']
            if not any(filename.lower().endswith(ext) for ext in video_extensions):
                print("❌ 비디오 파일이 아닙니다. MP4, AVI, MOV 등을 사용해주세요.")
                return None, None

            return filename, filename.split('.')[0]

        except Exception as e:
            print(f"❌ 업로드 오류: {e}")
            return None, None

    def ensemble_predict(self, frame):
        """앙상블 예측 수행"""
        all_detections = []

        # 각 confidence threshold로 예측
        for config in self.ensemble_configs:
            try:
                results = self.model(frame, conf=config['conf'], verbose=False)
                weight = config['weight']

                for result in results:
                    if result.boxes is not None:
                        for box in result.boxes:
                            cls_id = int(box.cls[0])
                            conf = float(box.conf[0]) * weight  # 가중치 적용
                            bbox = box.xyxy[0].cpu().numpy()

                            all_detections.append({
                                'bbox': bbox,
                                'conf': conf,
                                'cls_id': cls_id,
                                'threshold': config['conf']
                            })
            except Exception:
                continue

        return all_detections

    def weighted_nms(self, detections):
        """가중치 기반 NMS"""
        if not detections:
            return []

        # 클래스별로 분리
        class_detections = defaultdict(list)
        for det in detections:
            class_detections[det['cls_id']].append(det)

        final_detections = []

        for cls_id, cls_dets in class_detections.items():
            # confidence 내림차순 정렬
            cls_dets.sort(key=lambda x: x['conf'], reverse=True)

            kept = []
            for det in cls_dets:
                bbox1 = det['bbox']
                should_keep = True

                # 기존 kept 박스들과 IoU 비교
                for kept_det in kept:
                    bbox2 = kept_det['bbox']
                    if self.calculate_iou(bbox1, bbox2) > self.iou_threshold:
                        should_keep = False
                        break

                if should_keep:
                    kept.append(det)
                    if len(kept) >= 15:  # 클래스당 최대 15개
                        break

            final_detections.extend(kept)

        return final_detections

    def calculate_iou(self, bbox1, bbox2):
        """IoU 계산"""
        try:
            x1_1, y1_1, x2_1, y2_1 = bbox1
            x1_2, y1_2, x2_2, y2_2 = bbox2

            # 교집합
            xi1, yi1 = max(x1_1, x1_2), max(y1_1, y1_2)
            xi2, yi2 = min(x2_1, x2_2), min(y2_1, y2_2)

            if xi2 <= xi1 or yi2 <= yi1:
                return 0.0

            inter = (xi2 - xi1) * (yi2 - yi1)
            union = (x2_1 - x1_1) * (y2_1 - y1_1) + (x2_2 - x1_2) * (y2_2 - y1_2) - inter

            return inter / union if union > 0 else 0.0
        except:
            return 0.0

    def draw_detections(self, frame, detections):
        """탐지 결과 시각화"""
        class_names = self.model.names

        for det in detections:
            bbox = det['bbox']
            conf = det['conf']
            cls_id = det['cls_id']

            class_name = class_names.get(cls_id, 'unknown')
            if class_name not in self.target_classes:
                continue

            x1, y1, x2, y2 = map(int, bbox)
            color = self.colors.get(class_name, (255, 255, 255))

            # 바운딩 박스 그리기
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # 라벨 그리기
            label = f"{class_name}: {conf:.2f}"
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)

            # 라벨 배경
            cv2.rectangle(frame, (x1, y1 - h - 10), (x1 + w, y1), color, -1)
            cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)

        return frame

    def process_video(self, video_path, max_frames=900):
        """비디오 처리 (최대 900프레임 = 30초@30fps)"""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print("❌ 비디오 파일을 열 수 없습니다.")
            return None, {}

        # 비디오 정보
        fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # 처리할 프레임 수 결정
        process_frames = min(total_frames, max_frames)

        print(f"🎬 비디오 정보:")
        print(f"   해상도: {width}x{height}")
        print(f"   FPS: {fps}")
        print(f"   총 프레임: {total_frames}")
        print(f"   처리할 프레임: {process_frames}")

        # 출력 비디오 설정
        output_path = "ensemble_detected_video.mp4"
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        frame_count = 0
        detection_stats = defaultdict(int)

        print(f"\n🔍 앙상블 객체 탐지 시작 (4개 threshold 사용)...")

        while frame_count < process_frames:
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1

            # 진행률 표시
            if frame_count % max(1, process_frames // 20) == 0:
                progress = (frame_count / process_frames) * 100
                print(f"⏳ {progress:.0f}% 완료 ({frame_count}/{process_frames})")

            # 앙상블 예측 수행
            detections = self.ensemble_predict(frame)
            filtered_detections = self.weighted_nms(detections)

            # 통계 업데이트
            class_names = self.model.names
            for det in filtered_detections:
                class_name = class_names.get(det['cls_id'], 'unknown')
                if class_name in self.target_classes:
                    detection_stats[class_name] += 1

            # 탐지 결과 그리기
            result_frame = self.draw_detections(frame.copy(), filtered_detections)

            # 프레임 정보 추가
            info = f"Frame: {frame_count}/{process_frames} | Ensemble: 4 thresholds"
            cv2.putText(result_frame, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

            # 현재 프레임 탐지 수 표시
            current_detections = len(filtered_detections)
            det_info = f"Current detections: {current_detections}"
            cv2.putText(result_frame, det_info, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

            out.write(result_frame)

        cap.release()
        out.release()

        print(f"✅ 비디오 처리 완료!")
        print(f"📁 출력 파일: {output_path}")

        return output_path, detection_stats

    def create_final_package(self, video_path, stats, title):
        """최종 결과 패키지 생성"""
        # 통계 파일 생성
        stats_file = "ensemble_detection_report.txt"

        with open(stats_file, 'w', encoding='utf-8') as f:
            f.write("=" * 50 + "\n")
            f.write("앙상블 객체 탐지 결과 리포트\n")
            f.write("=" * 50 + "\n\n")

            f.write(f"비디오 파일: {title}\n")
            f.write(f"사용 모델: YOLOv8n\n")
            f.write(f"앙상블 방식: 다중 Confidence Threshold\n")
            f.write(f"사용된 Threshold: {[cfg['conf'] for cfg in self.ensemble_configs]}\n")
            f.write(f"가중치: {[cfg['weight'] for cfg in self.ensemble_configs]}\n\n")

            f.write("탐지 결과 통계:\n")
            f.write("-" * 30 + "\n")

            total = sum(stats.values())
            for class_name, count in sorted(stats.items(), key=lambda x: x[1], reverse=True):
                percentage = (count / total * 100) if total > 0 else 0
                f.write(f"{class_name:15} : {count:4d}회 ({percentage:5.1f}%)\n")

            f.write("-" * 30 + "\n")
            f.write(f"총 탐지 횟수: {total}회\n")

            # 클래스별 설명
            f.write("\n클래스 설명:\n")
            class_desc = {
                'person': '보행자/사람',
                'bicycle': '자전거',
                'car': '승용차',
                'motorcycle': '오토바이',
                'bus': '버스',
                'truck': '트럭',
                'traffic light': '신호등',
                'stop sign': '정지 표지판'
            }

            for cls, desc in class_desc.items():
                if cls in stats:
                    f.write(f"- {cls}: {desc}\n")

        # ZIP 파일 생성
        zip_filename = f"{title}_ensemble_detection.zip"

        with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
            # 비디오 파일 추가
            if os.path.exists(video_path):
                zipf.write(video_path, "ensemble_detected_video.mp4")

            # 통계 파일 추가
            zipf.write(stats_file, "detection_report.txt")

        # 임시 파일 정리
        if os.path.exists(stats_file):
            os.remove(stats_file)

        return zip_filename

    def run_detection(self):
        """전체 탐지 프로세스 실행"""
        print("🎯 비디오 파일 앙상블 객체 탐지기")
        print("=" * 60)
        print("여러 confidence threshold를 사용한 앙상블 방식으로")
        print("더 정확한 객체 탐지를 수행합니다.")
        print("=" * 60)

        try:
            # 1. 비디오 파일 업로드
            print("\n1️⃣ 비디오 파일 업로드")
            video_path, title = self.upload_video()

            if not video_path:
                print("❌ 비디오 업로드 실패")
                return

            # 2. 앙상블 객체 탐지 수행
            print(f"\n2️⃣ 앙상블 객체 탐지 수행")
            output_video, stats = self.process_video(video_path)

            if not output_video:
                print("❌ 객체 탐지 실패")
                return

            # 3. 결과 분석 출력
            print(f"\n3️⃣ 탐지 결과 분석")
            total_detections = sum(stats.values())
            print(f"📊 총 탐지 횟수: {total_detections}회")

            if stats:
                print("📈 클래스별 탐지 결과:")
                for class_name, count in sorted(stats.items(), key=lambda x: x[1], reverse=True):
                    percentage = (count / total_detections * 100) if total_detections > 0 else 0
                    emoji = {'person': '👤', 'car': '🚗', 'bicycle': '🚲', 'motorcycle': '🏍️',
                            'bus': '🚌', 'truck': '🚛', 'traffic light': '🚦', 'stop sign': '🛑'}.get(class_name, '🎯')
                    print(f"   {emoji} {class_name}: {count}회 ({percentage:.1f}%)")

            # 4. 결과 패키지 생성 및 다운로드
            print(f"\n4️⃣ 결과 패키지 생성")
            zip_file = self.create_final_package(output_video, stats, title)

            print(f"📦 결과 패키지: {zip_file}")
            print("💾 파일 다운로드 중...")
            files.download(zip_file)

            print(f"\n🎉 앙상블 객체 탐지 완료!")
            print("📁 다운로드된 파일 내용:")
            print("   - ensemble_detected_video.mp4 (탐지 결과 영상)")
            print("   - detection_report.txt (상세 분석 리포트)")

        except Exception as e:
            print(f"❌ 오류 발생: {e}")
            print("💡 파일을 다시 업로드하거나 Colab을 재시작해보세요.")

# 실행
detector = VideoEnsembleDetector()
detector.run_detection()