<a href="https://colab.research.google.com/github/jetsonmom/halla_ai/blob/main/Untitled50.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

In [None]:
import os
HOME = os.getcwd()
print(HOME)

In [None]:
%pip install "ultralytics<=8.3.40" supervision roboflow
import ultralytics
ultralytics.checks()

In [None]:
!pip install yt-dlp
!pip install ultralytics

In [None]:
# 필요한 라이브러리 설치 (실행 전 필요)
!pip install yt-dlp

# 필요한 라이브러리 임포트
from ultralytics import YOLO
import cv2
import numpy as np
import time
import os
import torch
from google.colab.patches import cv2_imshow
import subprocess

# 홈 디렉토리 설정
HOME = os.getcwd()
print(f"작업 디렉토리: {HOME}")

# weights 폴더 생성 (없는 경우)
weights_dir = os.path.join(HOME, "weights")
if not os.path.exists(weights_dir):
    os.makedirs(weights_dir)
    print(f"weights 폴더 생성됨: {weights_dir}")

def download_youtube_video(youtube_url):
    """yt-dlp를 사용하여 YouTube 비디오 다운로드 함수"""
    try:
        print(f"YouTube 비디오 다운로드 중: {youtube_url}")

        # 출력 파일 경로
        output_path = os.path.join(HOME, "youtube_video.mp4")

        # yt-dlp 명령을 사용하여 비디오 다운로드
        subprocess.check_call([
            "yt-dlp",
            youtube_url,
            "-f", "best[ext=mp4]",
            "-o", output_path,
            "--force-overwrites"
        ])

        if os.path.exists(output_path):
            print(f"다운로드 완료: {output_path}")
            return output_path
        else:
            print(f"다운로드된 파일을 찾을 수 없습니다: {output_path}")
            return None
    except Exception as e:
        print(f"YouTube 비디오 다운로드 중 오류 발생: {e}")
        return None

def count_objects(results):
    """객체 카운트 함수"""
    counts = {}
    for result in results:
        boxes = result.boxes
        for box in boxes:
            cls = int(box.cls[0])
            conf = float(box.conf[0])
            if conf > 0.25:  # 신뢰도 임계값
                class_name = result.names[cls]
                if class_name in counts:
                    counts[class_name] += 1
                else:
                    counts[class_name] = 1
    return counts

def process_video(video_path):
    """YOLOv10x를 사용한 비디오 처리"""
    # 모델 파일 경로 설정
    model_path = find_model_path()
    if not model_path:
        print("모델 파일을 찾을 수 없습니다. 처리를 중단합니다.")
        return

    print(f"비디오 파일: {video_path}")
    print(f"모델 파일: {model_path}")

    # 파일 존재 확인
    if not os.path.exists(video_path):
        print(f"오류: 비디오 파일이 존재하지 않습니다: {video_path}")
        return

    # PyTorch 호환성 설정
    if torch.__version__.startswith("2.") and float(torch.__version__.split(".")[1]) >= 6:
        print("PyTorch 2.6+ 호환성 문제 해결을 위한 설정 적용...")
        original_torch_load = torch.load

        def patched_torch_load(*args, **kwargs):
            kwargs['weights_only'] = False
            return original_torch_load(*args, **kwargs)

        torch.load = patched_torch_load

    # YOLOv10 모델 로드
    print("YOLOv10 모델 로드 중...")
    try:
        model = YOLO(model_path)
        print("모델 로드 완료!")
    except Exception as e:
        print(f"모델 로드 중 오류 발생: {e}")
        return

    # 비디오 캡처 객체 생성
    cap = cv2.VideoCapture(video_path)

    # 비디오 정보 가져오기
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"비디오 정보 - 해상도: {width}x{height}, FPS: {fps}, 총 프레임 수: {total_frames}")

    # 결과 비디오 저장을 위한 설정
    output_path = f"{HOME}/output_detection_yolov10.mp4"
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # 진행률 표시를 위한 변수
    frame_count = 0
    start_time = time.time()

    # 프레임 스킵 설정 (처리 속도 향상)
    frame_skip = 2  # 2프레임마다 1프레임만 처리

    try:
        while True:
            # 프레임 읽기
            ret, frame = cap.read()
            if not ret:
                break

            # 프레임 카운트 증가
            frame_count += 1

            # 진행률 계산 및 표시
            progress = (frame_count / total_frames) * 100
            elapsed = time.time() - start_time
            fps_processing = frame_count / elapsed if elapsed > 0 else 0
            remaining_time = (elapsed / frame_count) * (total_frames - frame_count) if frame_count > 0 else 0

            print(f"처리 중... {progress:.1f}% (프레임: {frame_count}/{total_frames}, FPS: {fps_processing:.1f}, 남은 시간: {remaining_time:.1f}초)")

            # 프레임 스킵 처리
            if frame_count % frame_skip != 0 and frame_count != 1 and frame_count != total_frames:
                # 스킵된 프레임도 저장 (이전 처리 결과 재사용)
                if 'annotated_frame' in locals():
                    out.write(annotated_frame)
                else:
                    out.write(frame)
                continue

            # 객체 감지
            results = model(frame, conf=0.25)  # confidence threshold 0.25

            # 객체 카운트
            current_counts = count_objects(results)

            # 결과 시각화
            annotated_frame = results[0].plot()

            # 카운트 정보 표시
            y_offset = 30
            for obj, count in sorted(current_counts.items(), key=lambda x: -x[1]):  # 내림차순 정렬
                text = f'{obj}: {count}'
                cv2.putText(annotated_frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX,
                          0.6, (0, 255, 0), 2)
                y_offset += 30

            # 정보 표시
            cv2.putText(annotated_frame, f'Frame: {frame_count}/{total_frames}',
                      (10, height - 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
            cv2.putText(annotated_frame, f'FPS: {fps_processing:.1f}',
                      (10, height - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
            cv2.putText(annotated_frame, f'Progress: {progress:.1f}%',
                      (10, height - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # 결과 프레임 저장
            out.write(annotated_frame)

            # 10프레임마다 또는 특정 조건에서 미리보기 표시
            if frame_count % 10 == 0 or frame_count == 1 or frame_count == total_frames:
                # 이미지 크기 조정 (미리보기용)
                preview_height = 360  # 고정 높이
                preview_width = int(width * (preview_height / height))
                preview_frame = cv2.resize(annotated_frame, (preview_width, preview_height))
                cv2_imshow(preview_frame)

    except Exception as e:
        print(f"비디오 처리 중 오류 발생: {e}")
    finally:
        # 리소스 해제
        cap.release()
        out.release()

        # 처리 통계 출력
        total_time = time.time() - start_time
        avg_fps = frame_count / total_time if total_time > 0 else 0

        print("\n=== YOLOv10 객체 감지 결과 ===")
        print(f"처리 완료: 총 {frame_count}/{total_frames} 프레임")
        print(f"총 처리 시간: {total_time:.2f}초")
        print(f"평균 처리 속도: {avg_fps:.1f} FPS")
        print(f"결과 파일: {output_path}")

        # 결과 파일 다운로드 안내
        print("\n결과 파일을 다운로드하려면 아래 코드를 실행하세요:")
        print("from google.colab import files")
        print(f"files.download('{output_path}')")

        return output_path

def find_model_path():
    """모델 파일 경로 찾기"""
    # 가능한 모든 경로 확인
    possible_paths = [
        os.path.join(HOME, "weights", "yolov10x.pt"),
        os.path.join("/content", "yolov10x.pt"),
        os.path.join("/content", "weights", "yolov10x.pt")
    ]

    # 존재하는 첫 번째 경로 반환
    for path in possible_paths:
        if os.path.exists(path):
            print(f"모델 파일 경로: {path}")

            # weights 폴더에 없으면 복사
            target_path = os.path.join(HOME, "weights", "yolov10x.pt")
            if path != target_path:
                import shutil
                os.makedirs(os.path.dirname(target_path), exist_ok=True)
                shutil.copy(path, target_path)
                print(f"모델 파일을 weights 폴더로 복사했습니다: {target_path}")
                return target_path

            return path

    # 모델 파일을 찾을 수 없음
    return None

# 실행
def main():
    # YouTube 비디오 URL
    youtube_url = "https://www.youtube.com/shorts/pwpGAPI-xEQ"  # 원하는 YouTube URL로 변경 가능

    # 유튜브 영상 다운로드
    video_path = download_youtube_video(youtube_url)

    if video_path and os.path.exists(video_path):
        # 비디오 처리
        process_video(video_path)
    else:
        print("비디오 다운로드 실패. 기본 소스를 찾을 수 없습니다.")

if __name__ == "__main__":
    main()