### Prepare Environment

In [38]:
# speechbrain (used for speaker embedding)
!pip install torch==2.3.1 torchaudio==2.3.1 torch-audiomentations==0.11.1
!pip install -qq speechbrain==1.0.0

# pyannote.audio (used for speaker diarization)
!pip install -qq pyannote.audio==3.3.1

# OpenAI whisper (used for automatic speech recognition)
!pip install -qq git+https://github.com/openai/whisper.git
!pip install -qq ipython==8.12.2

# deVAD
!pip install -U denoiser
!pip install soundfile
!pip install torch torchvision torchaudio
!pip install pydub

In [3]:
import numpy as np
import pandas as pd

import time
import io
import os
import subprocess

import torch
import soundfile as sf
from scipy.io import wavfile
import librosa
from IPython.display import Audio

from subprocess import run, CalledProcessError, PIPE
from concurrent.futures import ThreadPoolExecutor

# pyannote
from pyannote.audio import Pipeline

# STT
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq, AutoTokenizer

## 1. STT, 화자분리 모델 로드

In [39]:
# Hugging Face 토큰
HUGGINGFACE_TOKEN = "token"

# STT 모델, 프로세서, 토크나이저를 각각의 저장소에서 로드
model = AutoModelForSpeechSeq2Seq.from_pretrained("NexoChatFuture/whisper-small-youtube-extra", token=HUGGINGFACE_TOKEN)
processor = AutoProcessor.from_pretrained("NexoChatFuture/whisper-small-youtube-extra", token=HUGGINGFACE_TOKEN)
tokenizer = AutoTokenizer.from_pretrained("NexoChatFuture/whisper-small-youtube-extra", token=HUGGINGFACE_TOKEN)

# 화자분리 모델 로드
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization-3.1', use_auth_token=HUGGINGFACE_TOKEN)

In [40]:
# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
pipeline.to(device)

## 2. 화자분리 및 STT 전사 진행

### 2-1. 오디오 로드 함수 및 Constants 정의

- whisper input을 위해 sampling rate를 16000로 설정하고, 원 음성을 30분 단위로 나누어 전사한다.

     - SR = 16000
     - CHUNK_DURATION = 30

In [9]:
# Define constants
SR = 16000
CHUNK_DURATION = 30  # 30초 단위

- whisper 모델을 오디오 데이터를 input으로 받고, 화자분리 모델은 오디오의 경로를 필요로 한다.
- 따라서, 오디오 전처리 후 변환된 오디오와 경로를 모두 반환한다.

In [10]:
# 오디오 로드
def load_audio(path):
    output_path = './output_audio.wav'
    cmd = [
        "ffmpeg", "-nostdin",
        "-threads", "0",
        "-i", path,
        "-f", "f32le",
        "-ac", "1",
        "-acodec", "pcm_f32le",
        "-ar", str(SR),
        "-"
    ]
    out = subprocess.run(cmd, capture_output=True, check=True).stdout
    audio = np.frombuffer(out, np.float32)
    sf.write(output_path, audio, SR)
    return audio, output_path

### 2-2. STT 전사 함수 정의

In [11]:
# 30초 단위로 STT 전사 진행
def return_transcription(audio_data):
    audio = np.frombuffer(audio_data.read(), dtype=np.float32)
    total_duration = len(audio) / SR
    transcriptions = []

    for start in range(0, int(total_duration), CHUNK_DURATION):
        end = min(start + CHUNK_DURATION, total_duration)
        chunk = audio[int(start * SR):int(end * SR)]
        inputs = processor(chunk, return_tensors="pt", sampling_rate=SR)

        if torch.cuda.is_available():
            inputs = inputs.to("cuda")

        with torch.no_grad():
            predicted_ids = model.generate(inputs.input_features)

        transcription = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0]
        transcriptions.append(transcription)

    # 30초 단위로 전사 진행 후 결과 합침
    return ' '.join(transcriptions)

### 2-3. 화자분리 함수 정의

`merge_speaker_rows`
- 단일 화자의 연속 발화 중 무음 구간이 감지되면, 화자분리 모델은 해당 구간을 기준으로 음성을 분할한다.
- 결과적으로 동일 화자의 발화도 복수의 세그먼트로 분할될 수 있다.
- STT 모델은 짧은 음성에서 할루시네이션이 자주 발생한다.
- 따라서, STT 모델의 전사 성능과 가독성 향상을 위해 같은 화자의 연속된 발화 세그먼트를 병합한다.

`rename_speakers`
- 프로젝트의 목표인 회의 데이터 처리를 위해, 화자분리 모델에서 출력하는 화자 변수인 'speaker' 를 '참석자' 로 변경한다.

In [15]:
# 같은 화자 병합
def merge_speaker_rows(df):
    merged_data = []
    current_speaker = None
    current_start = None
    current_end = None
    current_audio = []
    current_transcription = []

    for index, row in df.iterrows():
        if row['speakers'] == current_speaker:
            current_end = row['end_timestamp']
            current_audio.append(row['audio'])
            current_transcription.append(row.get('transcription', ''))
        else:
            if current_speaker is not None:
                merged_data.append({
                    'speakers': current_speaker,
                    'start_timestamp': current_start,
                    'end_timestamp': current_end,
                    'audio': b''.join(current_audio),
                    'transcription': ' '.join(current_transcription).strip()
                })
            current_speaker = row['speakers']
            current_start = row['start_timestamp']
            current_end = row['end_timestamp']
            current_audio = [row['audio']]
            current_transcription = [row.get('transcription', '')]

    if current_speaker is not None:
        merged_data.append({
            'speakers': current_speaker,
            'start_timestamp': current_start,
            'end_timestamp': current_end,
            'audio': b''.join(current_audio),
            'transcription': ' '.join(current_transcription).strip()
        })

    return pd.DataFrame(merged_data)

# 'speaker' -> '참석자 n' 으로 변경
def rename_speakers(df):
    unique_speakers = df['speakers'].unique()
    speaker_map = {speaker: f'참석자 {i+1}' for i, speaker in enumerate(unique_speakers)}
    df['speakers'] = df['speakers'].map(speaker_map)
    return df

### 2-4. 화자분리 및 STT 전사 진행

- 화자분리를 먼저 진행한다.
- 분리된 단일 화자의 연속된 세그먼트를 병합하여 STT 모델에 입력시킨다.
- 결측치 제거 후 최종 결과를 반환한다.

In [33]:
# 화자분리 및 STT 전사
def speaker_diarize(path, num_speakers):
    start_time = time.time()
    audio, audio_path = load_audio(path)
    speakers = []
    start_timestamp = []
    end_timestamp = []
    audio_segments = []

    # 화자분리 진행
    diarization = pipeline({'uri': 'unique_audio_identifier', 'audio': audio_path}, num_speakers=num_speakers)
    print(f'Finished with diarization, took {time.time() - start_time} sec')
    start_time = time.time()

    # 화자분리 완료된 timestamp 에 맞춰서 STT 전사 진행
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        cmd = [
            "ffmpeg", "-nostdin",
            "-i", path,
            "-ss", str(turn.start),
            "-to", str(turn.end),
            "-f", "f32le",
            "-ac", "1",
            "-acodec", "pcm_f32le",
            "-ar", str(SR),
            "pipe:1"
        ]
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()

        if process.returncode != 0:
            raise Exception(f"Error occurred during ffmpeg process: {stderr.decode()}")

        audio_data = io.BytesIO(stdout)

        speakers.append(speaker)
        start_timestamp.append(turn.start)
        end_timestamp.append(turn.end)
        audio_segments.append(audio_data.read())

    df = pd.DataFrame({
        'speakers': speakers,
        'start_timestamp': start_timestamp,
        'end_timestamp': end_timestamp,
        'audio': audio_segments
    })

    # 같은 화자의 연속된 발화는 한 음성으로 합친 후 전사

    merged_df = merge_speaker_rows(df)

    # STT 전사 진행
    transcriptions = []
    for _, row in merged_df.iterrows():
        audio_data = io.BytesIO(row['audio'])
        transcription = return_transcription(audio_data)
        transcriptions.append(transcription)

    merged_df['transcription'] = transcriptions

    # Transcription 결측치 제거
    merged_df.dropna(subset=['transcription'], inplace=True)
    merged_df = merged_df[merged_df['transcription'] != '']
    merged_df.reset_index(drop=True, inplace=True)

    # Transcription 결측치 제거 후 화자 병합 재진행
    final_merged_df = merge_speaker_rows(merged_df)
    final_output_df = final_merged_df.drop(columns=[])

    # 'speaker' -> '참석자 n' 으로 변경
    final_output_df = rename_speakers(final_output_df)

    print(f'Finished with transcribing, took {time.time() - start_time} sec\n')

    return final_output_df[['speakers', 'transcription', 'start_timestamp', 'end_timestamp']]

In [41]:
path = "audio_path" # 오디오 파일 경로
num_speakers = 2 # 참석자 수가 2명일 경우

df = speaker_diarize(path, num_speakers)

In [42]:
for idx, row in df.iterrows():
    print(f"{row['speakers']}: {row['transcription']}")