# 한국어 말뭉치 합성 데이터

### Json 타임스탬프에 맞춰서 오디오 통합

In [None]:
import os
import json
from pydub import AudioSegment

def process_json_file(json_path):
    """JSON 파일을 읽고 start_time 리스트를 반환"""
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    time_data = []
    endtime = -1
    
    for speech in data.get('document', [{}])[0].get('utterance', []):
        start_time = speech.get('start', 0)
        end_time = speech.get('end', 0)

        if start_time < endtime:
            print(f"{os.path.basename(json_path)}: speech overlap at {start_time}")

        time_data.append(start_time)
        endtime = end_time
    
    return time_data

def sort_wav_files(directory):
    wav_files = [f for f in os.listdir(directory) if f.endswith('.wav')]
    wav_files.sort(key=lambda x: int(x.split('.')[-2]))
    return wav_files

def merge_wav_files(json_file, json_dir, wav_base_dir, output_dir):
    """JSON 파일과 같은 SDRW ID 폴더의 WAV 파일을 start_time 순서로 합쳐 저장"""
    sdrw_id = json_file.replace(".json", "")  # SDRW ID 추출
    json_path = os.path.join(json_dir, json_file)
    wav_dir = os.path.join(wav_base_dir, sdrw_id)  # 해당 SDRW ID 폴더
    output_file = os.path.join(output_dir, f"{sdrw_id}_merged.wav")

    if not os.path.exists(wav_dir):
        print(f"Warning: WAV directory not found for {sdrw_id}")
        return

    # JSON에서 start_time 리스트 가져오기
    start_times = process_json_file(json_path)

    # WAV 파일 불러오기
    wav_files = sort_wav_files(wav_dir)
    
    print(f"{sdrw_id}: {len(wav_files)} WAV files, {len(start_times)} timestamps")

    if len(wav_files) != len(start_times):
        print(f"⚠ Warning: Mismatch in WAV count for {sdrw_id}. {len(wav_files)} wavs vs {len(start_times)} timestamps.")

    # 초기 combined_audio 길이 조정
    combined_audio = AudioSegment.silent(duration=max(start_times) * 1000 + 1000)
    max_duration = 0  # 전체 길이를 추적하기 위한 변수

    for wav_file, start_time in zip(wav_files, start_times):
        wav_path = os.path.join(wav_dir, wav_file)
        try:
            audio = AudioSegment.from_wav(wav_path)
        except Exception as e:
            print(f"Error loading {wav_path}: {e}")
            continue

        start_ms = round(float(start_time) * 1000)  # 반올림 사용
        print(f"Processing {wav_file}, start_ms: {start_ms}, audio length: {len(audio)}") #디버깅 정보 추가

        combined_audio = combined_audio.overlay(audio, position=start_ms)
        max_duration = max(max_duration, start_ms + len(audio))
        print(f"Combined audio length: {len(combined_audio)}") #디버깅 정보 추가

    # 최종 길이 맞추기 (마지막 오디오 이후에 무음 추가)
    if len(combined_audio) < max_duration:
        combined_audio += AudioSegment.silent(duration=max_duration - len(combined_audio))

    # 최종 오디오 저장
    os.makedirs(output_dir, exist_ok=True)
    combined_audio.export(output_file, format="wav")
    print(f"✅ Merged {sdrw_id} saved as {output_file}")

# 디렉토리 설정
json_dir = "kor/json/"
wav_base_dir = "kor/kor_corpus/"
output_dir = "kor/output/"

json_files = [f for f in os.listdir(json_dir) if f.startswith('SDRW') and f.endswith('.json')]

for json_file in json_files:
    merge_wav_files(json_file, json_dir, wav_base_dir, output_dir)

### 한국어 말뭉치 RTTM Formatting

In [None]:
import os
import json

def json_to_rttm(json_file, json_dir, output_dir):
    """JSON 파일을 읽고 RTTM 포맷으로 변환 후 저장"""
    json_path = os.path.join(json_dir, json_file)
    
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    rttm_lines = []
    data_name = os.path.splitext(json_file)[0]  

    for speech in data.get('document', [{}])[0].get('utterance', []):
        start_ms = float(speech.get('start', 0))
        end_ms = float(speech.get('end', 0))
        duration = round(end_ms - start_ms, 5)
        speaker_id = speech.get('speaker_id', 'unknown')

        # RTTM 포맷 라인 생성
        rttm_line = f"SPEAKER {data_name} 1 {start_ms:.3f} {duration:.3f} <NA> <NA> {speaker_id} <NA>"
        rttm_lines.append(rttm_line)

    # RTTM 파일 저장
    rttm_file = os.path.join(output_dir, f"{data_name}.rttm")
    with open(rttm_file, 'w', encoding='utf-8') as f:
        f.write("\n".join(rttm_lines) + "\n")

    print(f"✅ RTTM 파일 생성 완료: {rttm_file}")

# 디렉토리 설정
json_dir = "kor/json/"
output_dir = "kor/rttm/"

# 출력 디렉토리 생성 (존재하지 않으면)
os.makedirs(output_dir, exist_ok=True)

# JSON 파일 리스트 가져오기
json_files = [f for f in os.listdir(json_dir) if f.startswith('SDRW') and f.endswith('.json')]

# 변환 실행
for json_file in json_files:
    json_to_rttm(json_file, json_dir, output_dir)

### Alimeeting RTTM Formatting

In [None]:
import os
from praatio import textgrid

def tg_to_rttm(tg_file, text_dir, output_dir):
    tg_path = os.path.join(text_dir, tg_file)
    
    tg = textgrid.openTextgrid(tg_path, False)
    all_log = []
    for name in tg.tierNames:
        entries = tg._tierDict[name].entries
        for entry in entries:
            duration = round(float(entry.end) - float(entry.start), 2)
            all_log.append((entry.start, duration, name, entry.label))

    all_log.sort(key=lambda x: x[0])

    rttm_lines = []
    data_name = tg_file.replace(".TextGrid", "")

    for i, log in enumerate(all_log):
        start_time, duration, speaker_id, label = log
        rttm_line = f"SPEAKER {data_name} 1 {start_time:.2f} {duration:.2f} <NA> <NA> {speaker_id} <NA>"
        rttm_lines.append(rttm_line)

    # RTTM 파일 저장
    rttm_file = os.path.join(output_dir, f"{data_name}.rttm")
    with open(rttm_file, 'w', encoding='utf-8') as f:
        f.write("\n".join(rttm_lines) + "\n")

    print(f"✅ RTTM 파일 생성 완료: {rttm_file}")
    
text_dir = "Train_Ali_far/textgrid_dir/"
output_dir = "Train_Ali_far/rttm/"

os.makedirs(output_dir, exist_ok=True)

tg_files = [f for f in os.listdir(text_dir) if f.endswith('.TextGrid')]

for tg_file in tg_files:
    tg_to_rttm(tg_file, text_dir, output_dir)



### Overlap 합성
- 총 데이터의 30%
- 각 발화의 30% ~ 40% 오버랩
- 랜덤 샘플링
- 포맷 : 발화자, 발화시작, 발화끝, 발화내용

In [11]:
import os

text_dir = "Ali/Train_Ali_far/rttm/"
train_dir = "Ali/Train_Ali_far/rttm/train/"
test_dir = "Ali/Train_Ali_far/rttm/test/"

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

tg_files = [f for f in os.listdir(text_dir) if f.endswith('.rttm')]

# split dataset
train_ratio = 0.9 
train_count = int(len(tg_files) * train_ratio)

train_files = tg_files[:train_count]
test_files = tg_files[train_count:]


with open("Ali/Train_Ali_far/train_list.txt", "w", encoding="utf-8") as f:
    f.write("\n".join(train_files))

# test list txt file
with open("Ali/Train_Ali_far/test_list.txt", "w", encoding="utf-8") as f:
    f.write("\n".join(test_files))

# move files
for f in train_files:
    os.rename(os.path.join(text_dir, f), os.path.join(train_dir, f))

for f in test_files:
    os.rename(os.path.join(text_dir, f), os.path.join(test_dir, f))

