In [None]:
import os
import subprocess
import csv
from dotenv import load_dotenv
import whisperx
from pyannote.audio import Pipeline
import torch

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
print("환경 설정을 시작합니다...")

# .env 파일에서 환경 변수 로드
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")

if not HF_TOKEN:
    raise ValueError("❌ .env 파일에 HF_TOKEN이 설정되지 않았습니다.")

In [None]:
# ==================================================================================
# WhisperX와 Pyannote를 활용한 STT, 화자 분리, 역할 분류 자동화 스크립트
# ==================================================================================
#
# [ 주요 기능 ]
# 1. 오디오 전처리: FFmpeg을 사용하여 모든 오디오 파일(예: MP3)을 STT 모델에 적합한
#    16kHz 모노 WAV 파일로 자동 변환합니다.
#
# 2. 고성능 STT 및 화자 분리:
#    - `WhisperX`: OpenAI의 Whisper 모델을 기반으로 빠르고 정확한 음성 인식(STT) 및
#      단어 단위 시간 정렬(Word-level Alignment)을 수행합니다.
#    - `Pyannote.audio`: Hugging Face의 사전 학습된 모델을 사용하여 오디오에 포함된
#      화자들을 분리(Diarization)합니다. (최대/최소 2명으로 설정)
#
# 3. 키워드 기반 역할 분류:
#    - 분리된 화자의 발화 내용에 특정 키워드('대출', '검찰' 등)가 포함되어 있는지 여부를
#      판단하여 '피싱범'과 '피해자' 역할을 자동으로 분류합니다.
#
# 4. 결과 통합 및 저장:
#    - 모든 처리 결과를 결합하여 [역할, 화자, 시작시간, 발화내용] 형태로 터미널에 출력합니다.
#    - 최종 결과를 원본 파일명에 기반한 CSV 파일로 깔끔하게 저장합니다.
#
# [ 사전 준비 사항 ]
# 1. FFmpeg 설치: 시스템에 FFmpeg이 설치되어 있어야 오디오 변환이 가능합니다.
# 2. Hugging Face Token: Pyannote 모델 사용을 위해 Hugging Face의 인증 토큰이
#    필요합니다. (코드 내 `HF_TOKEN` 변수 설정)
#
# [ 사용 방법 ]
# 1. `dataset` 폴더를 만들고 그 안에 분석할 오디오 파일을 넣습니다.
# 2. 코드 내 `AUDIO_FILENAME` 변수에 해당 파일명을 정확히 입력합니다.
# 3. 스크립트를 실행하면 `output` 폴더에 변환된 WAV 파일과 최종 CSV 결과물이 생성됩니다.
#
# ==================================================================================


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

print(f"사용할 장치: {DEVICE}")


# 데이터 및 결과 폴더 설정

DATASET_DIR = "dataset"

OUTPUT_DIR = "output"

os.makedirs(OUTPUT_DIR, exist_ok=True)  # 결과 폴더 자동 생성

print(f"✅ 데이터 폴더: '{DATASET_DIR}', 결과 폴더: '{OUTPUT_DIR}'")


# 처리할 오디오 파일 경로 (dataset 폴더 안에 있어야 함)

AUDIO_FILENAME = "1. 기존 대출금 일부 변제해야 저금리 대출 가능(햇살론 사칭)_.mp3"

audio_path = os.path.join(DATASET_DIR, AUDIO_FILENAME)


if not os.path.exists(audio_path):

    raise FileNotFoundError(f"오디오 파일을 찾을 수 없습니다: {audio_path}")

print(f"처리할 오디오 파일: {audio_path}")



# --- 2. 오디오 파일 WAV로 변환 ---

def convert_to_wav(input_path, output_dir, sample_rate=16000):

    """MP3 파일을 STT에 적합한 16kHz 모노 WAV 파일로 변환합니다."""

    filename_without_ext = os.path.splitext(os.path.basename(input_path))[0]

    output_path = os.path.join(output_dir, f"{filename_without_ext}_converted.wav")


    print(f"\n🔹 '{input_path}'를 WAV 파일로 변환 중...")

    try:

        # ffmpeg을 사용하여 변환 (ffmpeg 설치 필요)

        cmd = [

            "ffmpeg",

            "-y",

            "-i",

            input_path,

            "-ac",

            "1",  # 모노 채널

            "-ar",

            str(sample_rate),  # 16kHz 샘플레이트

            output_path,

        ]

        subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        print(f"WAV 변환 완료: {output_path}")

        return output_path

    except (subprocess.CalledProcessError, FileNotFoundError) as e:
        print(

            "'ffmpeg'이 설치되어 있는지 확인해주세요. ffmpeg을 사용하여 오디오 파일을 변환하는 데 실패했습니다."
        )
        raise e



wav_path = convert_to_wav(audio_path, OUTPUT_DIR)



# --- 3. WhisperX STT 및 화자 분리 실행 ---

print("\n WhisperX 모델 로딩 및 음성 인식 시작...")

# 모델 로드 (CPU 사용 시 float32, GPU 사용 시 float16 또는 bfloat16 권장)

model = whisperx.load_model(

    "large-v2", DEVICE, compute_type="float32" if DEVICE == "cpu" else "float16"
)


# 1. 음성 인식 (Transcribe)

stt_result = model.transcribe(wav_path, language="ko")


# 2. 단어 시간 정렬 (Align)

print("단어 시간 정렬 중...")

model_a, metadata = whisperx.load_align_model(language_code="ko", device=DEVICE)

aligned_result = whisperx.align(

    stt_result["segments"], model_a, metadata, wav_path, DEVICE
)


# 3. Pyannote 화자 분리 (Diarize)

print("Pyannote 화자 분리 모델 로딩 및 실행 중...")

diarization_pipeline = Pipeline.from_pretrained(

    "pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN
)

if DEVICE == "cuda":

    diarization_pipeline.to(torch.device(DEVICE))  # GPU 사용 설정


diarization_result = diarization_pipeline(wav_path, min_speakers=2, max_speakers=2)


# 4. STT 결과와 화자 분리 결과 결합

print("STT와 화자 분리 결과 결합 중...")

final_result = whisperx.assign_word_speakers(diarization_result, aligned_result)



# --- 4. 키워드 기반 역할 분류 및 결과 저장 ---

print("\n역할 분류 및 최종 결과 생성...")


# 키워드 설정 (필요에 따라 수정)

PHISHER_KEYWORDS = [

    "대출",

    "검찰",

    "경찰",

    "송금",

    "계좌",

    "수사관",

    "저금리",

    "햇살론",

]

VICTIM_KEYWORDS = ["네", "제가", "저", "어떻게", "진짜요", "몰랐어요"]


results_for_csv = []

current_speaker = None

current_text = ""

current_role = "알 수 없음"

start_time = 0


print("\n" + "=" * 50)

print("              대화 내용            ")

print("=" * 50)


for segment in final_result.get("segments", []):

    speaker = segment.get("speaker", "UNKNOWN")

    text = segment.get("text", "").strip()


    if not text:
        continue


    # 화자가 바뀌면 이전 대화 내용 처리

    if speaker != current_speaker and current_speaker is not None:

        # 역할 분류

        if any(kw in current_text for kw in PHISHER_KEYWORDS):

            current_role = "피싱범"

        elif any(kw in current_text for kw in VICTIM_KEYWORDS):

            current_role = "피해자"

        else:

            current_role = f"{current_speaker}(역할 불분명)"


        # 결과 저장

        entry = {

            "역할": current_role,

            "화자": current_speaker,

            "시작시간": f"{start_time:.2f}s",

            "발화내용": current_text,

        }

        results_for_csv.append(entry)


        # 터미널 출력
        print(

            f"[{entry['역할']} / {entry['화자']}] ({entry['시작시간']}): {entry['발화내용']}"
        )


        # 초기화

        current_text = ""


    # 현재 대화 내용 업데이트

    if current_speaker is None or speaker != current_speaker:

        current_speaker = speaker

        start_time = segment.get("start", 0)


    current_text += text + " "


# 마지막 대화 내용 처리

if current_text:

    if any(kw in current_text for kw in PHISHER_KEYWORDS):

        current_role = "피싱범"

    elif any(kw in current_text for kw in VICTIM_KEYWORDS):

        current_role = "피해자"

    else:

        current_role = f"{current_speaker}(역할 불분명)"


    entry = {

        "역할": current_role,

        "화자": current_speaker,

        "시작시간": f"{start_time:.2f}s",

        "발화내용": current_text,

    }

    results_for_csv.append(entry)
    print(

        f"[{entry['역할']} / {entry['화자']}] ({entry['시작시간']}): {entry['발화내용']}"
    )


print("=" * 50)


# CSV 파일로 저장

csv_filename = os.path.splitext(AUDIO_FILENAME)[0] + "_dialogue.csv"

csv_path = os.path.join(OUTPUT_DIR, csv_filename)


with open(csv_path, "w", newline="", encoding="utf-8-sig") as f:

    writer = csv.DictWriter(f, fieldnames=["역할", "화자", "시작시간", "발화내용"])

    writer.writeheader()

    writer.writerows(results_for_csv)


print(f"\n모든 작업 완료! 최종 결과가 다음 파일에 저장되었습니다: {csv_path}")