In [None]:
import cv2
import numpy as np
import os
from IPython.display import HTML, display
import base64

print("✅ 라이브러리가 준비되었습니다.")

In [None]:
import firebase_admin
from firebase_admin import credentials, storage
from google.colab import userdata
import json
import os

# cred = credentials.Certificate("./serviceAccountKey.json")

key_json = userdata.get('firebase') #colab 환경

try:
    with open('firebase_key.json', 'w') as f:
        json.dump(json.loads(key_json), f)
    key_path = 'firebase_key.json'
except json.JSONDecodeError:
    # If the key is not valid JSON, assume it's already a file path
    key_path = key_json


cred = credentials.Certificate(key_path)

if not firebase_admin._apps:
    firebase_admin.initialize_app(cred, {
        'storageBucket': 'fairplayfairy-3e2eb.firebasestorage.app'
    })
    print("Firebase 앱이 성공적으로 초기화되었습니다.")
else:
    print("Firebase 앱이 이미 초기화되어 있습니다.")

# Storage 버킷 객체 가져오기 예시
bucket = storage.bucket()
print("Storage 버킷에 접근 성공:", bucket.name)

In [1]:
def track_aim_in_video(input_path, output_path="result_local.mp4"):
    """
    영상 파일을 입력받아 옵티컬 플로우로 에임을 추적하고,
    결과 영상 저장 및 에임 이동 데이터를 반환합니다.
    output_path가 None이면 결과 영상을 저장하지 않습니다.
    """
    aim_movements = []
    try:
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            print("Error: 영상을 열 수 없습니다.")
            return None

        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        
        # 영상 시작 3초 스킵(3,2,1 카운트가 있기 때문에 옵티컬 플로우에서 제외)
        skip_frames = int(fps * 3)
        for _ in range(skip_frames):
            ret, _ = cap.read()
            if not ret:
                cap.release()
                return None
        out = None
        if output_path is not None:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        ret, prev_frame = cap.read()
        if not ret:
            print("Error: 첫 프레임을 읽을 수 없습니다.")
            cap.release()
            if out is not None:
                out.release()
            return None
            
        roi_x, roi_y = int(width * 0.2), int(height * 0.2)
        roi_w, roi_h = int(width * 0.6), int(height * 0.6)

        prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
        prev_roi = prev_gray[roi_y:roi_y+roi_h, roi_x:roi_x+roi_w]

        frame_count = 0
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            roi = gray[roi_y:roi_y+roi_h, roi_x:roi_x+roi_w]

            flow = cv2.calcOpticalFlowFarneback(prev_roi, roi, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            
            dx, dy = np.mean(flow[..., 0]), np.mean(flow[..., 1])
            
            aim_dx, aim_dy = -dx, -dy
            aim_movements.append((aim_dx, aim_dy)) # 에임 이동 데이터 저장
            
            aim_speed = np.sqrt(aim_dx**2 + aim_dy**2)
            
            if aim_speed > 1.0:
                center_x, center_y = width // 2, height // 2
                arrow_end_x = int(center_x + aim_dx * 20)
                arrow_end_y = int(center_y + aim_dy * 20)
                
                cv2.arrowedLine(frame, (center_x, center_y), (arrow_end_x, arrow_end_y),
                                (0, 255, 255), 3, tipLength=0.3)
            
            if out is not None:
                out.write(frame)
            prev_roi = roi


        if output_path is not None:
            print(f"✅ 영상 처리가 완료되었습니다. 결과가 '{output_path}'에 저장되었습니다.")
        else:
            print(f"✅ 영상 처리가 완료되었습니다. (결과 영상 저장 안함)")

        cap.release()
        if out is not None:
            out.release()
        return aim_movements

    except Exception as e:
        print(f"오류가 발생했습니다: {e}")
        return None

# --- 실행 ---
aim_data = None
if 'input_filename' in locals() and os.path.exists(input_filename):
    output_filename = "result_local.mp4"
    aim_data = track_aim_in_video(input_filename, output_filename)
else:
    print("⚠️ 파일 경로를 먼저 확인해주세요.")


⚠️ 파일 경로를 먼저 확인해주세요.


In [None]:
import tempfile
import os
from joblib import Parallel, delayed

def process_video_task(video_blob, total_count, current_index):
    progress = f"[{current_index}/{total_count}]"
    fd, temp_path = tempfile.mkstemp(suffix=".webm")
    os.close(fd)
    try:
        try:
            video_blob.download_to_filename(temp_path)
        except Exception as e:
            print(f"{progress} ❌ 다운로드 실패: {video_blob.name} ({e})")
            return None
        if not os.path.exists(temp_path) or os.path.getsize(temp_path) == 0:
            print(f"{progress} ❌ 빈 파일 → 건너뜀: {video_blob.name}")
            return None
        aim_data = track_aim_in_video(temp_path, output_path=None)
        if aim_data is None:
            print(f"{progress} ⚠️ 분석 실패: {video_blob.name}")
            return None
        print(f"{progress} ✅ 수집: {video_blob.name} ({len(aim_data)} 프레임)")
        return {'name': video_blob.name, 'data': aim_data}
    finally:
        try:
            if os.path.exists(temp_path):
                os.remove(temp_path)
        except Exception as e:
            print(f"{progress} ❗ 임시파일 삭제 실패: {e}")

videos = list(bucket.list_blobs(prefix='videos/'))
webm_files = [v for v in videos if v.name.endswith('.webm')]

if not webm_files:
    print("⚠️ 처리할 .webm 없음")
else:
    total_files = len(webm_files)
    print(f"총 {total_files}개 영상 병렬 처리 시작 (threading)")
    results = Parallel(n_jobs=8, backend='threading')(  # 필요시 n_jobs 조정
        delayed(process_video_task)(blob, total_files, i)
        for i, blob in enumerate(webm_files, start=1)
    )
    all_aim_data = [r for r in results if r is not None]
    print(f"\n--- 완료 ---\n✔️ 성공: {len(all_aim_data)} / {total_files}")

In [None]:
from google.colab import drive
import pandas as pd
import os

# 1. 구글 드라이브 마운트
drive.mount('/content/drive')

# 2. 영상 처리 결과를 DataFrame으로 변환
# 예시: all_aim_data = [{'name': 파일명, 'data': [(dx, dy), ...]}, ...]
rows = []
for item in all_aim_data:
    name = item['name']
    for idx, (dx, dy) in enumerate(item['data']):
        rows.append({'video': name, 'frame': idx, 'dx': dx, 'dy': dy})

df = pd.DataFrame(rows)

# 3. 엑셀로 저장 
save_path = '/content/drive/MyDrive/aim_data.xlsx'
df.to_excel(save_path, index=False)
print(f"엑셀 파일 저장 완료: {save_path}")