### 라이브러리 정리

In [6]:
# 1. 필수 라이브러리 설치
!pip install -q transformers torch accelerate sentencepiece
!pip install -q yt-dlp pydub SpeechRecognition webrtcvad deepface
!pip install -q opencv-python ultralytics


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/66.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.2/66.2 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for webrtcvad (setup.py) ... [?25l[?25hdone


### LLM(LG EXAONE) 켜기

In [2]:
from huggingface_hub import notebook_login


notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [7]:

# 2. 필요한 라이브러리 import
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# 3. 모델 및 토크나이저 불러오기
# Hugging Face에 공개된 LG EXAONE 모델 ID
model_id = "LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct"

print(f"'{model_id}' 모델 로드를 시작합니다. 몇 분 정도 소요될 수 있습니다...")

# 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained(model_id)

# 모델 로드
# device_map="auto": 사용 가능한 GPU를 자동으로 할당
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    trust_remote_code=True,
    device_map="auto"
)

print("모델 로드가 완료되었습니다.")

'LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct' 모델 로드를 시작합니다. 몇 분 정도 소요될 수 있습니다...


Loading checkpoint shards:   0%|          | 0/7 [00:00<?, ?it/s]

모델 로드가 완료되었습니다.


### 영상에서 정보 수집

In [8]:
import os
import yt_dlp
import glob
from pydub import AudioSegment
import speech_recognition as sr
import wave, contextlib, webrtcvad
import json

YOUTUBE_URL = "https://www.youtube.com/watch?v=aG8ZUcikQcE"


# --- 2. 오디오 다운로드 ---
def download_audio_from_youtube(url, out_template='downloaded_audio.%(ext)s'):
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': out_template,
        'quiet': False,
        'no_warnings': True,
    }
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        candidates = glob.glob('downloaded_audio.*')
        if candidates:
            candidates.sort(key=os.path.getmtime, reverse=True)
            return candidates[0]
        return None
    except Exception as e:
        print("다운로드 에러:", e)
        return None


# --- 3. WAV 변환 ---
def convert_to_wav(input_filepath, wav_path='converted_audio.wav'):
    try:
        audio = AudioSegment.from_file(input_filepath)
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio.export(wav_path, format='wav', parameters=["-acodec", "pcm_s16le"])
        return wav_path
    except Exception as e:
        print("변환 에러:", e)
        return None


# --- 4. VAD 기반 발화 단위 분리 ---
def read_wave(path):
    with contextlib.closing(wave.open(path, 'rb')) as wf:
        num_channels = wf.getnchannels()
        assert num_channels == 1
        sample_width = wf.getsampwidth()
        assert sample_width == 2
        sample_rate = wf.getframerate()
        assert sample_rate in (8000, 16000, 32000, 48000)
        pcm_data = wf.readframes(wf.getnframes())
        return pcm_data, sample_rate


def vad_split(audio_path, aggressiveness=2):
    vad = webrtcvad.Vad(aggressiveness)
    audio, sample_rate = read_wave(audio_path)
    frame_duration = 30  # ms
    frame_size = int(sample_rate * frame_duration / 1000) * 2
    segments = []
    voiced_frames = []
    idx = 0
    start_time = None

    for i in range(0, len(audio), frame_size):
        frame = audio[i:i+frame_size]
        if len(frame) < frame_size:
            break
        is_speech = vad.is_speech(frame, sample_rate)
        if is_speech:
            if start_time is None:
                start_time = (i / frame_size) * (frame_duration / 1000.0)
            voiced_frames.append(frame)
        else:
            if voiced_frames:
                segment = b''.join(voiced_frames)
                out_path = f"vad_segment_{idx}.wav"
                with contextlib.closing(wave.open(out_path, 'wb')) as wf:
                    wf.setnchannels(1)
                    wf.setsampwidth(2)
                    wf.setframerate(sample_rate)
                    wf.writeframes(segment)

                end_time = (i / frame_size) * (frame_duration / 1000.0)
                segments.append((out_path, start_time, end_time))

                voiced_frames = []
                start_time = None
                idx += 1
    return segments


# --- 5. VAD 세그먼트 STT ---
def transcribe_vad_segments(wav_path, language='ko-KR'):
    segment_infos = vad_split(wav_path, aggressiveness=2)
    print("발화 단위 segment 개수:", len(segment_infos))

    r = sr.Recognizer()
    transcripts = []
    for seg_file, start, end in segment_infos:
        with sr.AudioFile(seg_file) as source:
            audio_data = r.record(source)
            try:
                text = r.recognize_google(audio_data, language=language)
            except sr.UnknownValueError:
                text = ""
            except sr.RequestError as e:
                text = f"[API 요청 실패: {e}]"

        transcripts.append({
            "file": seg_file,
            "text": text,
            "start": round(start, 2),
            "end": round(end, 2)
        })
    return transcripts


# --- 6. 실행 ---
if __name__ == "__main__":
    print("유튜브 오디오 다운로드...")
    downloaded = download_audio_from_youtube(YOUTUBE_URL)
    if not downloaded:
        raise RuntimeError("다운로드 실패")
    print("다운로드 완료:", downloaded)

    print("WAV 변환...")
    wav = convert_to_wav(downloaded)
    if not wav:
        raise RuntimeError("WAV 변환 실패")
    print("WAV 변환 완료:", wav)

    print("발화 단위 인식 시작...")
    transcripts = transcribe_vad_segments(wav, language="ko-KR")

    with open("vad_transcript.json", "w", encoding="utf-8") as f:
        json.dump(transcripts, f, ensure_ascii=False, indent=2)

    print("완료: vad_transcript.json 생성됨")


유튜브 오디오 다운로드...
[youtube] Extracting URL: https://www.youtube.com/watch?v=aG8ZUcikQcE
[youtube] aG8ZUcikQcE: Downloading webpage
[youtube] aG8ZUcikQcE: Downloading tv simply player API JSON
[youtube] aG8ZUcikQcE: Downloading tv client config
[youtube] aG8ZUcikQcE: Downloading player 0e6689e2-main
[youtube] aG8ZUcikQcE: Downloading tv player API JSON
[info] aG8ZUcikQcE: Downloading 1 format(s): 251
[download] Sleeping 3.00 seconds as required by the site...
[download] Destination: downloaded_audio.webm
[download] 100% of    4.30MiB in 00:00:00 at 12.94MiB/s  
다운로드 완료: downloaded_audio.webm
WAV 변환...
WAV 변환 완료: converted_audio.wav
발화 단위 인식 시작...
발화 단위 segment 개수: 44
완료: vad_transcript.json 생성됨


In [9]:
import yt_dlp

YOUTUBE_URL = "https://www.youtube.com/watch?v=aG8ZUcikQcE"

def download_video(url, out_template="downloaded_video.%(ext)s"):
    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # 영상+음성 합쳐서 MP4
        'merge_output_format': 'mp4',
        'outtmpl': out_template,
        'quiet': False,
        'no_warnings': True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        return ydl.prepare_filename(info)

if __name__ == "__main__":
    video_path = download_video(YOUTUBE_URL)
    print(f"✅ 다운로드 완료: {video_path}")


[youtube] Extracting URL: https://www.youtube.com/watch?v=aG8ZUcikQcE
[youtube] aG8ZUcikQcE: Downloading webpage
[youtube] aG8ZUcikQcE: Downloading tv simply player API JSON
[youtube] aG8ZUcikQcE: Downloading tv client config
[youtube] aG8ZUcikQcE: Downloading player 0004de42-main
[youtube] aG8ZUcikQcE: Downloading tv player API JSON
[info] aG8ZUcikQcE: Downloading 1 format(s): 399+251
[download] Sleeping 2.00 seconds as required by the site...
[download] Destination: downloaded_video.f399.mp4
[download] 100% of   13.22MiB in 00:00:00 at 16.01MiB/s  
[download] Destination: downloaded_video.f251.webm
[download] 100% of    4.30MiB in 00:00:00 at 29.07MiB/s  
[Merger] Merging formats into "downloaded_video.mp4"
Deleting original file downloaded_video.f399.mp4 (pass -k to keep)
Deleting original file downloaded_video.f251.webm (pass -k to keep)
✅ 다운로드 완료: downloaded_video.mp4


In [10]:
!pip install

Collecting deepface
  Downloading deepface-0.0.95-py3-none-any.whl.metadata (35 kB)
Collecting flask-cors>=4.0.1 (from deepface)
  Downloading flask_cors-6.0.1-py3-none-any.whl.metadata (5.3 kB)
Collecting mtcnn>=0.1.0 (from deepface)
  Downloading mtcnn-1.0.0-py3-none-any.whl.metadata (5.8 kB)
Collecting retina-face>=0.0.14 (from deepface)
  Downloading retina_face-0.0.17-py3-none-any.whl.metadata (10 kB)
Collecting fire>=0.4.0 (from deepface)
  Downloading fire-0.7.1-py3-none-any.whl.metadata (5.8 kB)
Collecting gunicorn>=20.1.0 (from deepface)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting lz4>=4.3.3 (from mtcnn>=0.1.0->deepface)
  Downloading lz4-4.4.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Downloading deepface-0.0.95-py3-none-any.whl (128 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m128.3/128.3 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fire-0.7.1-py3-none-any.whl (115 kB)


In [12]:
!ffmpeg -i downloaded_video.mp4 -vf "fps=25" -c:v libx264 -preset fast -crf 23 -c:a copy fixed_video.mp4


ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enab

In [14]:
from deepface import DeepFace


def analyze_frames_deepface(video_path, start_sec, end_sec, num_frames_to_sample=5, save_debug=True):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"⚠️ 비디오 열기 실패: {video_path}")
        return {"emotion_scores": {"neutral": 1.0}, "dominant_emotion": "neutral"}

    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps <= 0:
        fps = 25  # fallback

    start_frame = int(start_sec * fps)
    end_frame = int(end_sec * fps)
    frame_indices = np.linspace(start_frame, end_frame, num_frames_to_sample, dtype=int)

    all_scores = []
    os.makedirs("debug_frames", exist_ok=True)

    for idx, f_idx in enumerate(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, f_idx)
        ret, frame = cap.read()
        if not ret or frame is None:
            print(f"⚠️ 프레임 추출 실패 (frame={f_idx})")
            continue

        # 저장
        if save_debug:
            fname = f"debug_frames/frame_{round(start_sec,2)}_{round(end_sec,2)}_{idx}.jpg"
            cv2.imwrite(fname, frame)

        try:
            analysis = DeepFace.analyze(
                frame,
                actions=["emotion"],
                detector_backend="opencv",
                enforce_detection=False
            )
            emo = analysis[0]["emotion"]
            all_scores.append(emo)
        except Exception as e:
            print(f"⚠️ DeepFace 분석 실패 (frame={f_idx}): {e}")

    cap.release()

    if all_scores:
        keys = all_scores[0].keys()
        final_scores = {k: float(np.mean([emo[k] for emo in all_scores])) for k in keys}
        dominant = max(final_scores, key=final_scores.get)
    else:
        final_scores = {"neutral": 1.0}
        dominant = "neutral"

    return {"emotion_scores": final_scores, "dominant_emotion": dominant}



def map_emotions_to_transcript(transcript_json, video_path):
    with open(transcript_json, "r", encoding="utf-8") as f:
        transcripts = json.load(f)

    results = []

    for seg in transcripts:
        text = seg.get("text", "").strip()
        if not text:
            continue  # 빈 텍스트 스킵

        start_sec = float(seg.get("start", 0.0))
        end_sec = float(seg.get("end", start_sec + 0.5))

        emo_scores = analyze_frames_deepface(
            video_path,
            start_sec=start_sec,
            end_sec=end_sec,
            num_frames_to_sample=5,
            save_debug=True  # 프레임 저장 활성화
        )

        results.append({
            "file": seg.get("file"),
            "text": text,
            "start": round(start_sec, 2),
            "end": round(end_sec, 2),
            "emotion_scores": emo_scores["emotion_scores"],
            "dominant_emotion": emo_scores["dominant_emotion"]
        })

    return results


if __name__ == "__main__":
    transcript_path = "/content/vad_transcript.json"
    video_path = "/content/fixed_video.mp4"

    mapped_results = map_emotions_to_transcript(transcript_path, video_path)

    with open("mapped_transcript.json", "w", encoding="utf-8") as f:
        json.dump(mapped_results, f, ensure_ascii=False, indent=2)

    print("✅ 완료: mapped_transcript.json 생성됨")
    print("📸 디버그 프레임은 debug_frames/ 폴더에 저장됨")


25-09-22 12:05:24 - Directory /root/.deepface has been created
25-09-22 12:05:24 - Directory /root/.deepface/weights has been created
25-09-22 12:05:27 - 🔗 facial_expression_model_weights.h5 will be downloaded from https://github.com/serengil/deepface_models/releases/download/v1.0/facial_expression_model_weights.h5 to /root/.deepface/weights/facial_expression_model_weights.h5...


Downloading...
From: https://github.com/serengil/deepface_models/releases/download/v1.0/facial_expression_model_weights.h5
To: /root/.deepface/weights/facial_expression_model_weights.h5
100%|██████████| 5.98M/5.98M [00:00<00:00, 176MB/s]


✅ 완료: mapped_transcript.json 생성됨
📸 디버그 프레임은 debug_frames/ 폴더에 저장됨


### 프롬포트 엔지니어링 가능한 코드

In [23]:
import torch
import json
import re
from transformers import AutoModelForCausalLM, AutoTokenizer


def create_prompt_for_llm(text: str):
    """
    입력: 한국어 문장 (STT 결과)
    출력: JSON 한 줄 (ksl_gloss 변환 결과)
    """
    prompt = f"""
문장을 한국수어 글로스(KSL Gloss)로 변환하세요.
규칙:
- 조사(은/는, 이/가, 을/를 등) 제거
- 명사·동사·형용사만 남김
- 단어는 '/'로 구분
- 반드시 JSON 한 줄만 출력 (다른 설명 금지)

입력: "{text}"

출력:
{{"ksl_gloss": "..."}}
"""
    return prompt.strip()


def load_llm(model_id):
    print(f"'{model_id}' 모델 로드를 시작합니다...")
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        torch_dtype=torch.bfloat16,
        trust_remote_code=True,
        device_map="auto"
    )
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    print("모델 로드가 완료되었습니다.")
    return model, tokenizer


def parse_llm_output(raw_output: str):
    """LLM 출력에서 JSON만 추출"""
    try:
        match = re.search(r"\{.*\}", raw_output, re.DOTALL)
        if match:
            return json.loads(match.group(0))
        return None
    except Exception:
        return None


def fallback_gloss(text: str) -> str:
    """
    LLM 실패 시 기본 전처리 방식으로 변환
    """
    text = re.sub(r"(은|는|이|가|을|를|과|와|에|에서|으로|에게|도|만|까지|부터)", "", text)  # 조사 제거
    text = re.sub(r"[^가-힣\s]", "", text)  # 특수문자 제거
    words = text.strip().split()
    return "/".join(words)


def run_llm_on_segments(model, tokenizer, segments, max_new_tokens=64):
    results = []
    for i, seg in enumerate(segments):
        stt_text = seg.get("text", "")
        prompt = create_prompt_for_llm(stt_text)

        inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.eos_token_id
        )
        raw_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

        parsed = parse_llm_output(raw_output)
        if parsed is None or "ksl_gloss" not in parsed:
            gloss = fallback_gloss(stt_text)
        else:
            gloss = parsed.get("ksl_gloss", "")

        result = {
            "text": stt_text,
            "ksl_gloss": gloss
        }
        results.append(result)

        # 👉 출력: 원문 텍스트와 변환된 수어 글로스
        print(f"[{i+1}/{len(segments)}] {result['text']} → {result['ksl_gloss']}")

    return results


if __name__ == "__main__":
    transcript_path = "/content/mapped_transcript.json"
    model_id = "LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct"

    # 1. 모델 로드
    model, tokenizer = load_llm(model_id)

    # 2. transcript 불러오기
    with open(transcript_path, "r", encoding="utf-8") as f:
        segments = json.load(f)

    # 3. LLM 실행
    gloss_results = run_llm_on_segments(model, tokenizer, segments)

    # 4. 결과 저장
    with open("ksl_gloss_results.json", "w", encoding="utf-8") as f:
        json.dump(gloss_results, f, ensure_ascii=False, indent=2)

    print("✅ 완료: ksl_gloss_results.json 생성됨")


'LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct' 모델 로드를 시작합니다...


Loading checkpoint shards:   0%|          | 0/7 [00:00<?, ?it/s]

모델 로드가 완료되었습니다.


KeyboardInterrupt: 