In [None]:
# Cell 1: 라이브러리 설치 및 import
# 필요한 라이브러리 설치
%pip install fastapi uvicorn python-multipart jinja2 aiofiles
%pip install torch torchvision transformers ultralytics opencv-python pillow numpy

# 1

In [1]:
# 기본 라이브러리 import
from fastapi import FastAPI, Request, File, UploadFile, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
import threading
import asyncio
import io
import base64
import json
import datetime
import time
from threading import Lock
from fastapi import WebSocket, WebSocketDisconnect
import asyncio


# AI 모델 라이브러리
import cv2
import torch
from PIL import Image
import numpy as np
from transformers import AutoImageProcessor, AutoModelForImageClassification, ViTImageProcessor, ViTForImageClassification, pipeline
from ultralytics import YOLO

print("✅ 라이브러리 import 완료")


  from .autonotebook import tqdm as notebook_tqdm


✅ 라이브러리 import 완료


# 2

In [2]:
# Cell 2: 모델 초기화 및 설정
# 글로벌 변수로 모델들 저장
# 전역 변수에 실시간 처리용 변수들 추가
emotion_model = None
yolo_model = None
processor = None
device = None
emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise']

# 감정 분석 결과 저장용 전역 변수
all_emotions = []
emotion_statistics = {
    'Angry': 0, 'Disgust': 0, 'Fear': 0, 'Happy': 0,
    'Neutral': 0, 'Sad': 0, 'Surprise': 0
}
analysis_history = []

# 실시간 처리용 전역 변수
is_processing = False  # 실시간 처리 상태
processing_thread = None  # 처리 스레드
frame_count = 0  # 프레임 카운터
data_lock = Lock()  # 데이터 동기화용 락
webcam_capture = None  # 웹캠 캡처 객체


# 통합된 감정 예측 함수 (1:1 패딩 + 224x224)
def predict_emotion(face_image):
    """
    얼굴 이미지에서 감정을 예측하는 함수
    Args:
        face_image: 입력 얼굴 이미지 (numpy array, RGB)
    Returns:
        [{'label': str, 'score': float}, ...] 상위 2개 감정 예측 리스트 (가중치 적용 용도도)
    """
    try:
        # 1. 이미지를 1:1 비율로 정사각형 패딩
        h, w = face_image.shape[:2]
        max_size = max(h, w)
        
        # 정사각형 패딩
        square_img = np.zeros((max_size, max_size, 3), dtype=np.uint8)
        y_offset = (max_size - h) // 2
        x_offset = (max_size - w) // 2
        square_img[y_offset:y_offset+h, x_offset:x_offset+w] = face_image
        
        # 2. PIL 이미지로 변환
        pil_image = Image.fromarray(square_img)
        if pil_image.mode != 'RGB':
            pil_image = pil_image.convert('RGB')
        
        # 3. ViT 전처리 (자동으로 224x224로 리사이즈됨)
        inputs = processor(pil_image, return_tensors="pt").to(device)
        
        # 4. 추론
        with torch.no_grad():
            outputs = emotion_model(**inputs)
            predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
        
        # 5. 상위 2개 결과만 반환 (가중치 적용을 위해)
        top2_idx = torch.topk(predictions, 2).indices[0]
        top2_scores = torch.topk(predictions, 2).values[0]
        
        results = []
        for idx, score in zip(top2_idx, top2_scores):
            results.append({
                'label': emotion_labels[idx.item()],
                'score': float(score.item())
            })
        
        return results
        
    except Exception as e:
        print(f"감정 예측 오류: {e}")
        return [{'label': 'Error', 'score': 0.0}]

print("✅ 감정 예측 함수 정의 완료")

✅ 감정 예측 함수 정의 완료


# 3

In [3]:
# Cell 3: 모델 로드 함수
VIT_MODEL_PATH = "./ViT_model.pth"
YOLO_MODEL_PATH = "./yolov12n-face.pt" 

async def load_models():
    """모델들을 비동기적으로 로드하는 함수"""
    global emotion_model, yolo_model, processor, device
    
    try:
        print("🔄 모델 로딩 중...")
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # ViT 모델과 프로세서 로드
        processor = ViTImageProcessor.from_pretrained("mo-thecreator/vit-Facial-Expression-Recognition")
        emotion_model = ViTForImageClassification.from_pretrained("mo-thecreator/vit-Facial-Expression-Recognition")
        checkpoint = torch.load(VIT_MODEL_PATH, map_location=torch.device(device))
        emotion_model.load_state_dict(checkpoint)
        emotion_model.to(device)
        emotion_model.eval()
        
        # YOLO 얼굴 탐지 모델 로드
        yolo_model = YOLO(YOLO_MODEL_PATH)
        
        print("✅ 모델 로딩 완료!")
        print(f"사용 장치: {device}")
        
    except Exception as e:
        print(f"❌ 모델 로딩 실패: {e}")
        print("기본 모델로 대체합니다.")
        try:
            global pipe
            pipe = pipeline("image-classification", model="j-hartmann/emotion-english-distilroberta-base")
            print("✅ 대체 모델 로드 완료")
        except:
            print("❌ 대체 모델도 실패")

print("✅ 모델 로드 함수 정의 완료")

✅ 모델 로드 함수 정의 완료


# 4

In [4]:
# Cell 4: FastAPI 앱 설정
app = FastAPI(title="웹캠 감정 분석 API", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

os.makedirs("templates", exist_ok=True)
templates = Jinja2Templates(directory="templates")

@app.on_event("startup")
async def startup_event():
    await load_models()

print("✅ FastAPI 앱 설정 완료")

✅ FastAPI 앱 설정 완료


        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("startup")


# 5

In [5]:
# Cell 5: AI 분석 함수 (YOLO 필수)
def analyze_webcam_image(image_bytes):
    """웹캠 이미지를 분석하여 감정을 반환하는 함수 (가중치 적용 및 데이터 저장)"""
    global all_emotions, emotion_statistics, analysis_history
    
    try:
        # 바이트 데이터를 numpy 배열로 변환
        nparr = np.frombuffer(image_bytes, np.uint8)
        frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        
        if frame is None:
            return "이미지를 읽을 수 없습니다."
        
        # RGB로 변환
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        analysis_results = []
        current_analysis_emotions = []  # 현재 분석의 감정들 (가중치 적용)
        
        # YOLO로 얼굴 탐지
        try:
            results = yolo_model(frame, verbose=False)
            face_detected = False
            
            for result in results:
                boxes = result.boxes
                if boxes is not None and len(boxes) > 0:
                    for box in boxes:
                        # 신뢰도 0.8 이상만 처리 (비디오 코드와 동일)
                        if box.cls[0] == 0 and box.conf[0] > 0.8:
                            x1, y1, x2, y2 = map(int, box.xyxy[0])
                            confidence = float(box.conf[0])
                            
                            # 얼굴 영역 추정 (상반신의 상위 1/3 정도)
                            face_height = int((y2 - y1) * 0.3)
                            face_y1 = y1
                            face_y2 = y1 + face_height
                            face_x1 = x1 + int((x2 - x1) * 0.2)
                            face_x2 = x2 - int((x2 - x1) * 0.2)
                            
                            # 유효한 영역인지 확인
                            if (face_y2 > face_y1 and face_x2 > face_x1 and 
                                face_y1 >= 0 and face_x1 >= 0 and 
                                face_y2 < frame.shape[0] and face_x2 < frame.shape[1]):
                                
                                face_crop = frame_rgb[face_y1:face_y2, face_x1:face_x2]
                                
                                if face_crop.size > 0:
                                    # 감정 예측
                                    emotion_results = predict_emotion(face_crop)
                                    face_detected = True
                                    
                                    # 상위 2개 감정에 가중치 적용 (비디오 코드와 동일)
                                    if len(emotion_results) >= 2:
                                        first_emotion = emotion_results[0]['label']
                                        second_emotion = emotion_results[1]['label']
                                        first_score = emotion_results[0]['score']
                                        second_score = emotion_results[1]['score']
                                        
                                        # 신뢰도 차이가 클수록 첫 번째에 더 큰 가중치
                                        score_ratio = first_score / (second_score + 0.01)
                                        if score_ratio > 2.0:  # 2배 이상 차이나면
                                            current_analysis_emotions.extend([first_emotion] * 3)  # 3배 가중치
                                            current_analysis_emotions.extend([second_emotion])
                                        else:
                                            current_analysis_emotions.extend([first_emotion])
                                            current_analysis_emotions.extend([second_emotion])
                                    
                                    analysis_results.append(f"👤 사람 탐지됨 (신뢰도: {confidence:.2f})")
                                    analysis_results.append(f"📊 감정 분석 결과:")
                                    
                                    for i, emotion in enumerate(emotion_results[:3]):
                                        emoji_map = {
                                            'Angry': '😡', 'Disgust': '🤢', 'Fear': '😨',
                                            'Happy': '😄', 'Neutral': '😐', 'Sad': '😭', 'Surprise': '😮'
                                        }
                                        emoji = emoji_map.get(emotion['label'], '❓')
                                        analysis_results.append(
                                            f"  {i+1}. {emoji} {emotion['label']}: {emotion['score']:.1%}"
                                        )
                                    break
            
            # 감정 데이터 저장 (얼굴이 감지된 경우만)
            if face_detected and current_analysis_emotions:
                # 전체 감정 리스트에 추가
                all_emotions.extend(current_analysis_emotions)
                
                # 감정별 카운트 업데이트
                for emotion in current_analysis_emotions:
                    if emotion in emotion_statistics:
                        emotion_statistics[emotion] += 1
                
                # 분석 히스토리에 추가
                analysis_history.append({
                    'timestamp': datetime.datetime.now().isoformat(),
                    'emotions': current_analysis_emotions.copy(),
                    'confidence': confidence
                })
                
                # 가중치 적용된 감정 정보 추가
                analysis_results.append(f"\n🔢 가중치 적용된 감정: {', '.join(current_analysis_emotions)}")
            
            if not face_detected:
                analysis_results.append("👤 사람이 감지되지 않았습니다.")
                analysis_results.append("💡 카메라에 얼굴이 잘 보이도록 조정해주세요.")
                
        except Exception as e:
            analysis_results.append(f"❌ 얼굴 탐지 오류: {str(e)}")
        
        # 분석 시간 추가
        analysis_results.append(f"\n⏰ 분석 시간: {datetime.datetime.now().strftime('%H:%M:%S')}")
        
        # 누적 통계 정보 추가
        if all_emotions:
            analysis_results.append(f"\n📊 누적 분석 통계:")
            for emotion, count in emotion_statistics.items():
                if count > 0:
                    emoji_map = {
                        'Angry': '😡', 'Disgust': '🤢', 'Fear': '😨',
                        'Happy': '😄', 'Neutral': '😐', 'Sad': '😭', 'Surprise': '😮'
                    }
                    emoji = emoji_map.get(emotion, '❓')
                    analysis_results.append(f"  {emoji} {emotion}: {count}회")
        
        return "\n".join(analysis_results)
        
    except Exception as e:
        return f"❌ 이미지 분석 중 오류 발생: {str(e)}"

print("✅ AI 분석 함수 정의 완료")

✅ AI 분석 함수 정의 완료


# 6

In [None]:
# Cell 6: FastAPI 라우트 정의 
@app.get("/", response_class=HTMLResponse)
async def main_page(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.post("/analyze")
async def analyze_image(file: UploadFile = File(...)):
    try:
        if not file.content_type.startswith("image/"):
            raise HTTPException(status_code=400, detail="이미지 파일만 업로드 가능합니다.")
        
        image_data = await file.read()
        print(f"[DEBUG] 이미지 바이트 크기: {len(image_data)}")
        analysis_result = analyze_webcam_image(image_data)
        print(f"[DEBUG] 분석 결과:\n{analysis_result}")
        
        return JSONResponse(content={
            "status": "success",
            "analysis": analysis_result,
            "image_size": len(image_data)
        })
    except Exception as e:
        print(f"[ERROR] 분석 중 오류: {e}")
        return JSONResponse(content={
            "status": "error",
            "analysis": f"분석 중 오류가 발생했습니다: {str(e)}"
        }, status_code=500)

@app.get("/model-status")
async def model_status():
    global emotion_model, yolo_model
    status = {
        "emotion_model": emotion_model is not None,
        "yolo_model": yolo_model is not None,
        "device": device
    }
    return JSONResponse(content=status)

@app.get("/emotion-statistics")
async def get_emotion_statistics():
    """감정 분석 통계 조회"""
    global all_emotions, emotion_statistics, analysis_history
    
    return JSONResponse(content={
        "total_analyses": len(analysis_history),
        "total_emotions": len(all_emotions),
        "emotion_counts": emotion_statistics,
        "recent_analyses": analysis_history[-10:] if len(analysis_history) > 10 else analysis_history
    })

@app.post("/reset-statistics")
async def reset_statistics():
    """감정 분석 통계 초기화"""
    global all_emotions, emotion_statistics, analysis_history
    
    all_emotions.clear()
    analysis_history.clear()
    for emotion in emotion_statistics:
        emotion_statistics[emotion] = 0
    
    return JSONResponse(content={"status": "success", "message": "통계가 초기화되었습니다."})

@app.get("/export-emotions")
async def export_emotions():
    """감정 데이터 내보내기 (데이터베이스 저장 전 확인용)"""
    global all_emotions, emotion_statistics, analysis_history
    
    return JSONResponse(content={
        "all_emotions": all_emotions,
        "statistics": emotion_statistics,
        "history": analysis_history,
        "export_time": datetime.datetime.now().isoformat()
    })

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # JSON 형태로 받기
            data = await websocket.receive_json()
            
            # base64 이미지 디코딩
            if 'image' in data:
                image_b64 = data['image']
                image_bytes = base64.b64decode(image_b64)
                
                # 기존 분석 함수 사용
                result = analyze_webcam_image(image_bytes)
                
                # 결과 전송
                await websocket.send_json({"analysis": result})
            
    except WebSocketDisconnect:
        pass
    except Exception as e:
        print(f"WebSocket 오류: {e}")
        await websocket.send_json({"analysis": f"처리 중 오류: {str(e)}"})

print("✅ FastAPI 라우트가 설정되었습니다.")

# 서버 실행 함수
def run_server():
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")

# 백그라운드에서 서버 실행
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()

print("🚀 AI 감정 분석 서버가 시작되었습니다!")
print("📱 웹페이지 접속: http://127.0.0.1:8000")
print("📊 모델 상태 확인: http://127.0.0.1:8000/model-status")
print("📚 API 문서: http://127.0.0.1:8000/docs")
print("\n🤖 지원 기능:")
print("- 실시간 웹캠 캡처")
print("- YOLO 기반 얼굴/사람 탐지 (필수)")
print("- ViT 기반 7가지 감정 분석 (1:1 패딩 + 224x224)")
print("- 실시간 결과 표시")
print("\n⚠️  주의: 첫 실행 시 모델 다운로드로 시간이 소요될 수 있습니다.")

✅ FastAPI 라우트가 설정되었습니다.
🚀 AI 감정 분석 서버가 시작되었습니다!
📱 웹페이지 접속: http://127.0.0.1:8000
📊 모델 상태 확인: http://127.0.0.1:8000/model-status
📚 API 문서: http://127.0.0.1:8000/docs

🤖 지원 기능:
- 실시간 웹캠 캡처
- YOLO 기반 얼굴/사람 탐지 (필수)
- ViT 기반 7가지 감정 분석 (1:1 패딩 + 224x224)
- 실시간 결과 표시

⚠️  주의: 첫 실행 시 모델 다운로드로 시간이 소요될 수 있습니다.


INFO:     Started server process [29144]
INFO:     Waiting for application startup.


🔄 모델 로딩 중...


INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


✅ 모델 로딩 완료!
사용 장치: cpu
INFO:     127.0.0.1:7610 - "GET / HTTP/1.1" 200 OK




INFO:     127.0.0.1:4684 - "GET /ws HTTP/1.1" 404 Not Found




INFO:     127.0.0.1:3629 - "GET /ws HTTP/1.1" 404 Not Found




INFO:     127.0.0.1:9305 - "GET /ws HTTP/1.1" 404 Not Found


In [None]:
import os
import signal

# 현재 프로세스의 PID 얻기
current_pid = os.getpid()
print(f"현재 프로세스 PID: {current_pid}")

# 자신을 종료
os.kill(current_pid, signal.SIGKILL)