In [4]:
import os 
import numpy as np
import pandas as pd
import librosa
from pathlib import Path

In [10]:
left_data_path = os.path.join(os.path.dirname(os.path.dirname(Path.cwd())), 'data', 'experiment_data', 'results-KFA-left')
left_data_path

'/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left'

In [11]:
resultPath = left_data_path
participant_list = [file for file in os.listdir(resultPath) if "participant" in file]
participant_list.sort()
participant_list


['participant_LY043(X)',
 'participant_LY058',
 'participant_LY059(X)',
 'participant_LY060',
 'participant_LY061',
 'participant_LY062']

In [12]:
def create_textgrid(filename, duration, intervals):
    """
    TextGrid 파일을 생성하는 함수
    
    Parameters:
    - filename: 저장할 파일 경로
    - duration: 음원 길이 (초)
    - intervals: [(시작시간, 종료시간, 전사내용), ...] 형식의 리스트
    """
    with open(filename, 'w', encoding='utf-8') as f:
        # 헤더 작성
        f.write('File type = "ooTextFile"\n')
        f.write('Object class = "TextGrid"\n\n')
        
        # 시간 정보
        f.write(f'xmin = 0\n')
        f.write(f'xmax = {duration}\n')
        f.write('tiers? <exists>\n')
        f.write('size = 1\n')
        
        # IntervalTier 정보
        f.write('item []:\n')
        f.write('    item [1]:\n')
        f.write('        class = "IntervalTier"\n')
        f.write('        name = "transcription"\n')
        f.write(f'        xmin = 0\n')
        f.write(f'        xmax = {duration}\n')
        f.write(f'        intervals: size = {len(intervals)}\n')
        
        # 각 구간 정보 작성
        for i, (start, end, text) in enumerate(intervals, 1):
            f.write(f'        intervals [{i}]:\n')
            f.write(f'            xmin = {start}\n')
            f.write(f'            xmax = {end}\n')
            f.write(f'            text = "{text}"\n')

# 사용 예시
intervals = [
    (0.0, 1.5, "안녕하세요"),
    (1.5, 3.0, "반갑습니다"),
    (3.0, 4.5, "오늘도 좋은 하루 되세요")
]

#create_textgrid(os.path.join("../../../../Desktop/results/participant_LY001(-)", "test.TextGrid"), 4.5, intervals)

In [13]:
def get_wav_duration(wav_path):
    """
    WAV 파일의 길이를 초 단위로 반환
    """
    duration = librosa.get_duration(path=wav_path)
    return duration

In [14]:
for participant in participant_list:
    
    participant_path = os.path.join(resultPath, participant)
    file_list = os.listdir(participant_path)
    
    if len(file_list) == 0:
        raise ValueError(f"No files found in {participant_path}")
    
    excel_path = [file for file in file_list if file.endswith(".xlsx")]
    if len(excel_path) == 0:
        raise ValueError(f"No excel file found in {participant_path}")
    elif len(excel_path) > 1:
        raise ValueError(f"Multiple excel files found in {participant_path}")
    excel_path = os.path.join(participant_path, excel_path[0])
    
    # read excel file
    excel_file = pd.ExcelFile(excel_path)
    sheet_names = excel_file.sheet_names
    for sheet_name in sheet_names:
        transcript = ""
        if not("Stage" in sheet_name):
            continue

        excel_df = pd.read_excel(excel_path, sheet_name=sheet_name)
        target_column_name = "단어" if "단어" in excel_df.columns else "음성파일"

        if target_column_name == "단어":
            transcript = excel_df[target_column_name].to_string(index=False).replace("\n", " ")
        elif target_column_name == "음성파일":
            transcript = excel_df[target_column_name].to_string(index=False).replace(".wav\n", " ").replace(".wav", "")

        #print(transcript)
        sheet_name = sheet_name.lower()
        file_list = os.listdir(participant_path)
        wav_file_name = [file for file in file_list if sheet_name.lower() in file.lower() and file.endswith(".wav")][0]
        
        transcriptPath = os.path.join(participant_path, wav_file_name.replace(".wav", ".txt"))
        print(transcriptPath)
        
        with open(transcriptPath, "w") as f:
            f.write(transcript)

        """
        wav_file_name = [file for file in file_list if sheet_name.lower() in file.lower() and file.endswith(".wav")][0]
        textGrid_name = wav_file_name.replace(".wav", ".TextGrid")
        textGrid_path = os.path.join(participant_path, textGrid_name)
        duration = get_wav_duration(os.path.join(participant_path, wav_file_name))
        start_time = 0.0
        end_time = duration
        intervals = [(start_time, end_time, transcript)]
        
        create_textgrid(textGrid_path, duration, intervals)
        """

/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY043(X)/43_stage1_20250502_1328.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY043(X)/43_stage2_20250502_1328.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY043(X)/43_stage3_20250502_1330.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY043(X)/43_stage4_20250502_1330.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY043(X)/43_stage5_20250502_1335.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY043(X)/43_stage6_20250502_1339.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/data/experiment_data/results-KFA-left/participant_LY058/58_stage1_202506

In [48]:
class TextGridReader:
    def __init__(self, textgrid_path):
        self.textgrid_path = textgrid_path
        self.tiers_data = {}
        self.file_info = {}
        
    def read(self):
        """
        TextGrid 파일을 읽어서 구조화된 데이터로 변환
        """
        with open(self.textgrid_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            
            # 파일 정보 읽기
            self._read_file_info(lines)
            
            # tier 정보 읽기
            self._read_tiers(lines)
            
        return self.tiers_data
    
    def _read_file_info(self, lines):
        """
        파일의 기본 정보 읽기
        """
        for line in lines:
            line = line.strip()
            if '=' in line:
                key, value = line.split('=', 1)
                key = key.strip()
                value = value.strip().strip('"')
                self.file_info[key] = value
    
    def _read_tiers(self, lines):
        """
        tier 정보 읽기
        """
        current_tier = None
        current_intervals = []
        reading_interval = False
        interval_data = {}
        
        for line in lines:
            line = line.strip()
            
            # tier 시작
            if 'name = "' in line:
                if current_tier is not None:
                    self.tiers_data[current_tier] = current_intervals
                current_tier = line.split('"')[1]
                current_intervals = []
                reading_interval = False
            
            # interval 시작
            elif 'intervals [' in line:
                reading_interval = True
                interval_data = {}
            
            # interval 데이터 읽기
            elif reading_interval and '=' in line:
                key, value = line.split('=', 1)
                key = key.strip()
                value = value.strip().strip('"')
                interval_data[key] = value
                
                # interval 완성
                if 'text' in interval_data:
                    start = float(interval_data['xmin'])
                    end = float(interval_data['xmax'])
                    text = interval_data['text']
                    current_intervals.append((start, end, text))
                    reading_interval = False
        
        # 마지막 tier 추가
        if current_tier is not None:
            self.tiers_data[current_tier] = current_intervals
    
    def get_tier_names(self):
        """
        tier 이름 목록 반환
        """
        return list(self.tiers_data.keys())
    
    def get_intervals_by_tier(self, tier_name):
        """
        특정 tier의 interval 목록 반환
        """
        return self.tiers_data.get(tier_name, [])
    
    def get_total_duration(self):
        """
        전체 음성 길이 반환
        """
        return float(self.file_info.get('xmax', 0))

In [66]:
import parselmouth
import numpy as np

def get_average_formants(wav_path, start_time, end_time, time_step=0.01):
    """
    특정 구간의 평균 포먼트 값 계산
    
    Parameters:
    - wav_path: WAV 파일 경로
    - start_time: 시작 시간 (초)
    - end_time: 종료 시간 (초)
    - time_step: 시간 간격 (초)
    
    Returns:
    - avg_f1, avg_f2: 평균 F1, F2 값 (Hz)
    """
    # 음성 파일 로드
    snd = parselmouth.Sound(wav_path)

    # Formant 객체 생성 (Praat의 Formant settings와 동일한 파라미터 사용)
    formant = snd.to_formant_burg(
        max_number_of_formants=5,  # Praat 기본값: 5
        maximum_formant=5500,      # Praat 기본값: 5500 Hz (여성)
        window_length=0.025,       # Praat 기본값: 0.025초
        pre_emphasis_from=50,      # Praat 기본값: 50 Hz
        time_step=0.025/4 # 시간 간격
    )
    
    # 시간 포인트 생성
    time_points = np.arange(start_time, end_time, time_step)
    
    # 각 시간 포인트에서의 F1, F2 값 수집
    f1_values = []
    f2_values = []
    
    for time in time_points:
        f1 = formant.get_value_at_time(1, time)
        f2 = formant.get_value_at_time(2, time)
        
        # 유효한 값만 수집 (NaN이 아닌 경우)
        if not np.isnan(f1):
            f1_values.append(f1)
        if not np.isnan(f2):
            f2_values.append(f2)
    
    # 평균값 계산
    avg_f1 = np.mean(f1_values) if f1_values else None
    avg_f2 = np.mean(f2_values) if f2_values else None
    
    return avg_f1, avg_f2

In [77]:
def detect_formants(participant_path, textGrid_name):
    if not os.path.exists(participant_path):
        raise ValueError(f"없는 디렉토리: {participant_path}")

    textgrid_path = os.path.join(participant_path, "output-new", textGrid_name)
    reader = TextGridReader(textgrid_path)
    try:
        # TextGrid 파일 읽기
        tiers_data = reader.read()
        
        # 파일 정보 출력
        print("파일 정보:")
        print(f"전체 길이: {reader.get_total_duration():.3f}초")
        print(f"Tier 목록: {reader.get_tier_names()}")
        print("\n" + "="*50)
        
        # 각 tier의 내용 출력
        phoneme_cnt = 0
        phonemes = []
        f1_values = []
        f2_values = []
        for tier_name in reader.get_tier_names():
            # 단어를 분석하는게 아니라, 모음만 포먼트 딸거니까,,
            if tier_name == "words":
                continue

            print(f"\nTier: {tier_name}")
            print("-"*30)
            
            intervals = reader.get_intervals_by_tier(tier_name)
            print(type(intervals))
            for start, end, text in intervals:
                if text == "":
                    continue
            
                if text == "spn":
                    phoneme_cnt+=2
                    print(f"구간 {start:.2f}-{end:.2f}초의 평균 포먼트 값:")
                    print("detection failed.")
                    print(f"F1: None")
                    print(f"F2: None")
                    phonemes.append(None)
                    phonemes.append(None)
                    f1_values.append(None)
                    f1_values.append(None)
                    f2_values.append(None)
                    f2_values.append(None)
                    continue


                elif text not in ["ɐ","ɐː", "ɛ", "ɛː", "i", "iː", "o", "oː", "u", "uː"]:
                    continue
                print(f"phoneme: {text}")
                phoneme_cnt+=1
                wav_file_name = textGrid_name.replace(".TextGrid", ".wav")
                avg_f1, avg_f2 = get_average_formants(os.path.join(participant_path, wav_file_name), start, end)
    
                if avg_f1 is not None and avg_f2 is not None:
                    print(f"구간 {start:.2f}-{end:.2f}초의 평균 포먼트 값:")
                    print(f"F1: {avg_f1:.2f} Hz")
                    print(f"F2: {avg_f2:.2f} Hz")
                    phonemes.append(text)
                    f1_values.append(avg_f1)
                    f2_values.append(avg_f2)
                else:
                    print("유효한 포먼트 값을 찾을 수 없습니다.")
                    phonemes.append(None)
                    f1_values.append(None)
                    f2_values.append(None)
                
                print("-"*20)
        print(f"total detected phonemes: {phoneme_cnt}")
        pandas_data = pd.DataFrame({
            "phoneme": phonemes,
            "f1": f1_values,
            "f2": f2_values
        })
        pandas_data.to_csv(os.path.join(participant_path, textGrid_name.replace(".TextGrid", ".csv")), index=False)
        
    except Exception as e:
        print(f"에러 발생: {str(e)}")

In [84]:
expPath = "../../../../Desktop/results"
participant_list = [file for file in os.listdir(expPath) if "participant_LY" in file]
participant_list.sort()

In [85]:
for participant in participant_list:
    participant_path = os.path.join(expPath, participant)
    file_list = os.listdir(participant_path)
    
    if len(file_list) == 0:
        print(ValueError(f"No files found in {participant_path}"))
        continue

    textgrids_path = os.path.join(participant_path, "output-new")
    textgrids_list = os.listdir(textgrids_path)
    textgrids_list = [file for file in textgrids_list if file.endswith(".TextGrid")]
    textgrids_list.sort()
    for textgrid in textgrids_list:
        detect_formants(participant_path, textgrid)

파일 정보:
전체 길이: 7.656초
Tier 목록: ['words', 'phones']


Tier: phones
------------------------------
<class 'list'>
구간 2.34-2.96초의 평균 포먼트 값:
detection failed.
F1: None
F2: None
phoneme: i
구간 5.15-5.39초의 평균 포먼트 값:
F1: 1344.16 Hz
F2: 2616.98 Hz
--------------------
구간 6.59-7.13초의 평균 포먼트 값:
detection failed.
F1: None
F2: None
total detected phonemes: 5
파일 정보:
전체 길이: 68.587초
Tier 목록: ['words', 'phones']


Tier: phones
------------------------------
<class 'list'>
phoneme: oː
구간 1.97-2.00초의 평균 포먼트 값:
F1: 518.84 Hz
F2: 1116.06 Hz
--------------------
phoneme: u
구간 2.28-2.51초의 평균 포먼트 값:
F1: 520.32 Hz
F2: 2417.09 Hz
--------------------
phoneme: ɐ
구간 3.31-3.37초의 평균 포먼트 값:
F1: 975.47 Hz
F2: 1314.75 Hz
--------------------
phoneme: i
구간 3.67-3.92초의 평균 포먼트 값:
F1: 806.48 Hz
F2: 2467.52 Hz
--------------------
phoneme: ɐ
구간 4.84-5.02초의 평균 포먼트 값:
F1: 902.03 Hz
F2: 1157.42 Hz
--------------------
phoneme: u
구간 5.15-5.23초의 평균 포먼트 값:
F1: 881.84 Hz
F2: 2449.10 Hz
--------------------
phoneme: ɛː
구간 6.42-6.49

KeyboardInterrupt: 

In [89]:
# 개선: 병렬 처리 적용
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def process_participant(participant):
    participant_path = os.path.join(expPath, participant)
    file_list = os.listdir(participant_path)
    
    if len(file_list) == 0:
        print(ValueError(f"No files found in {participant_path}"))
        return
    
    if any(csv_file.endswith(".csv") for csv_file in file_list):
        print(f"{participant} already analyzed.")
        return
    textgrids_path = os.path.join(participant_path, "output-new")
    
    textgrids_list = os.listdir(textgrids_path)
    textgrids_list = [file for file in textgrids_list if file.endswith(".TextGrid")]
    textgrids_list.sort()
    for textgrid in textgrids_list:
        detect_formants(participant_path, textgrid)
    return results

# CPU 코어 수에 맞춰 병렬 처리
with ProcessPoolExecutor(max_workers=int(0.4*multiprocessing.cpu_count())) as executor:
    results = list(executor.map(process_participant, participant_list))

Process SpawnProcess-18:
Process SpawnProcess-16:
Process SpawnProcess-17:
Process SpawnProcess-15:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/concurrent/futures/process.py", line 252, in _process_worker
    call_item = call_que

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [91]:
import os
import numpy as np
import pandas as pd
import parselmouth
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
from tqdm import tqdm
import logging
from functools import partial
import time

# 로깅 설정
logging.basicConfig(
    filename='formant_analysis.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def process_single_textgrid(participant_path, textgrid):
    """
    단일 TextGrid 파일 처리
    """
    try:
        textgrid_path = os.path.join(participant_path, "output-new", textgrid)
        reader = TextGridReader(textgrid_path)
        tiers_data = reader.read()
        
        phonemes = []
        f1_values = []
        f2_values = []
        
        for tier_name in reader.get_tier_names():
            if tier_name == "words":
                continue
                
            intervals = reader.get_intervals_by_tier(tier_name)
            for start, end, text in intervals:
                if text == "":
                    continue
                    
                if text == "spn":
                    phonemes.extend([None, None])
                    f1_values.extend([None, None])
                    f2_values.extend([None, None])
                    continue
                    
                if text not in ["ɐ","ɐː", "ɛ", "ɛː", "i", "iː", "o", "oː", "u", "uː"]:
                    continue
                    
                wav_file_name = textgrid.replace(".TextGrid", ".wav")
                wav_path = os.path.join(participant_path, wav_file_name)
                
                # 포먼트 분석
                snd = parselmouth.Sound(wav_path)
                formant = snd.to_formant_burg(
                    max_number_of_formants=5,
                    maximum_formant=5500,
                    window_length=0.025,
                    pre_emphasis_from=50,
                    time_step=0.025/4
                )
                
                time_points = np.arange(start, end, 0.025/4)
                f1_values_temp = []
                f2_values_temp = []
                
                for time in time_points:
                    f1 = formant.get_value_at_time(1, time)
                    f2 = formant.get_value_at_time(2, time)
                    
                    if not np.isnan(f1):
                        f1_values_temp.append(f1)
                    if not np.isnan(f2):
                        f2_values_temp.append(f2)
                
                avg_f1 = np.mean(f1_values_temp) if f1_values_temp else None
                avg_f2 = np.mean(f2_values_temp) if f2_values_temp else None
                
                phonemes.append(text)
                f1_values.append(avg_f1)
                f2_values.append(avg_f2)
        
        # 결과 저장
        pandas_data = pd.DataFrame({
            "phoneme": phonemes,
            "f1": f1_values,
            "f2": f2_values
        })
        
        output_path = os.path.join(participant_path, textgrid.replace(".TextGrid", ".csv"))
        pandas_data.to_csv(output_path, index=False)
        
        return True
    except Exception as e:
        logging.error(f"Error processing {textgrid}: {str(e)}")
        return False

def process_participant(args):
    """
    단일 참가자 처리 함수
    """
    participant, expPath = args
    try:
        participant_path = os.path.join(expPath, participant)
        textgrids_path = os.path.join(participant_path, "output-new")
        
        if not os.path.exists(textgrids_path):
            logging.error(f"Directory not found: {textgrids_path}")
            return False
        
        if any(csv_file.endswith(".csv") for csv_file in os.listdir(textgrids_path)):
            print(f"{participant} already analyzed.")
            return True
            
        textgrids_list = [f for f in os.listdir(textgrids_path) if f.endswith(".TextGrid")]
        textgrids_list.sort()
        
        results = []
        for textgrid in textgrids_list:
            result = process_single_textgrid(participant_path, textgrid)
            results.append(result)
        
        return all(results)
    except Exception as e:
        logging.error(f"Error processing participant {participant}: {str(e)}")
        return False

def main():
    # 기본 경로 설정
    expPath = "/Users/bagjuhyeon/Desktop/results"
    participant_list = [f for f in os.listdir(expPath) if "participant_LY" in f]
    participant_list.sort()
    
    # CPU 코어 수 설정 (전체 코어의 50%만 사용)
    num_cores = max(1, int(multiprocessing.cpu_count() * 0.5))
    
    # 작업 인자 준비
    work_args = [(participant, expPath) for participant in participant_list]
    
    # 진행 상황 표시를 위한 tqdm 설정
    with tqdm(total=len(participant_list), desc="Processing participants") as pbar:
        # ProcessPoolExecutor를 사용한 병렬 처리
        with ProcessPoolExecutor(max_workers=num_cores) as executor:
            # 작업 제출
            futures = [executor.submit(process_participant, args) for args in work_args]
            
            # 결과 처리
            for future in futures:
                try:
                    result = future.result(timeout=3600)  # 1시간 타임아웃 설정
                    if not result:
                        logging.warning("Some processing failed")
                except Exception as e:
                    logging.error(f"Error in parallel processing: {str(e)}")
                finally:
                    pbar.update(1)
                    time.sleep(0.1)  # CPU 부하 감소를 위한 짧은 대기

if __name__ == "__main__":
    # Windows에서 실행할 경우 필요
    multiprocessing.freeze_support()
    main()

Processing participants:   0%|          | 0/37 [00:00<?, ?it/s]Process SpawnProcess-28:
Process SpawnProcess-25:
Process SpawnProcess-26:
Process SpawnProcess-27:
Process SpawnProcess-24:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/phoneticConvergenceAnalysis/lib/python3.12/concurrent/futures/process.py", line 252, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^

## File gen

In [12]:
import os
import shutil
import glob

def move_files(source_dir, target_dir, file_pattern="*"):
    """
    특정 폴더 내의 파일들을 다른 폴더로 이동
    
    Args:
        source_dir (str): 소스 폴더 경로
        target_dir (str): 대상 폴더 경로
        file_pattern (str): 이동할 파일 패턴 (기본값: "*")
    """
    try:
        # 대상 폴더가 없으면 생성
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)
            
        # 소스 폴더 내의 모든 파일 경로 가져오기
        file_paths = glob.glob(os.path.join(source_dir, file_pattern))
        
        # 파일 이동
        for file_path in file_paths:
            if os.path.isfile(file_path):  # 파일인 경우에만 이동
                file_name = os.path.basename(file_path)
                target_path = os.path.join(target_dir, file_name)
                
                # 파일 이동
                shutil.move(file_path, target_path)
                print(f"이동 완료: {file_name}")
                
        print(f"총 {len(file_paths)}개의 파일 이동이 완료되었습니다.")
        
    except Exception as e:
        print(f"에러 발생: {str(e)}")

def delete_files(directory, file_pattern="*"):
    """
    특정 폴더 내의 파일들을 삭제
    
    Args:
        directory (str): 파일이 있는 폴더 경로
        file_pattern (str): 삭제할 파일 패턴 (기본값: "*")
    """
    try:
        # 폴더 내의 모든 파일 경로 가져오기
        file_paths = glob.glob(os.path.join(directory, file_pattern))
        
        # 파일 삭제
        for file_path in file_paths:
            if os.path.isfile(file_path):  # 파일인 경우에만 삭제
                os.remove(file_path)
                print(f"삭제 완료: {os.path.basename(file_path)}")
                
        print(f"총 {len(file_paths)}개의 파일 삭제가 완료되었습니다.")
        
    except Exception as e:
        print(f"에러 발생: {str(e)}")

def clear_directory(directory_path):
    """
    특정 폴더 내의 모든 파일과 하위 폴더를 삭제
    
    Args:
        directory_path (str): 삭제할 파일들이 있는 폴더 경로
    """
    try:
        # 폴더가 존재하는지 확인
        if not os.path.exists(directory_path):
            print(f"폴더가 존재하지 않습니다: {directory_path}")
            return
            
        # 폴더 내의 모든 항목 삭제
        for item in os.listdir(directory_path):
            item_path = os.path.join(directory_path, item)
            
            if os.path.isfile(item_path):
                # 파일인 경우 삭제
                os.remove(item_path)
                print(f"파일 삭제 완료: {item}")
            elif os.path.isdir(item_path):
                # 하위 폴더인 경우 폴더와 그 내용을 모두 삭제
                shutil.rmtree(item_path)
                print(f"폴더 삭제 완료: {item}")
                
        print(f"\n{directory_path} 폴더의 모든 내용이 삭제되었습니다.")
        
    except Exception as e:
        print(f"에러 발생: {str(e)}")


In [None]:
# 사용 예시
source_directory = "source_folder"  # 소스 폴더 경로
target_directory = "target_folder"  # 대상 폴더 경로
file_pattern = "*.txt"  # 이동할 파일 패턴 (예: 모든 txt 파일)

move_files(source_directory, target_directory, file_pattern)

# 사용 예시
target_directory = "target_folder"  # 삭제할 파일이 있는 폴더 경로
file_pattern = "*.txt"  # 삭제할 파일 패턴 (예: 모든 txt 파일)

delete_files(target_directory, file_pattern)

In [16]:
folderPath = "/Users/bagjuhyeon/Desktop/results-KFA"
participants_list = os.listdir(folderPath)
participants_list.sort()
participants_list = [participant for participant in participants_list if "participant_LY" in participant]

for participant in participants_list:
    participant_path = os.path.join(folderPath, participant)
    
    file_list = os.listdir(participant_path)

    if "KFA" in file_list:
        kfa_path = os.path.join(participant_path, "KFA")
        delete_files(participant_path, "*.wav")
        delete_files(participant_path, "*.TextGrid")
        delete_files(participant_path, "*.lab")
        delete_files(participant_path, "*.txt")
        delete_files(participant_path, "*.csv")
        delete_files(participant_path, "*.xlsx")

        if "output-new" in file_list:
            output_new_path = os.path.join(participant_path, "output-new")
            shutil.rmtree(output_new_path)
        if "temp" in file_list:
            temp_path = os.path.join(participant_path, "temp")
            shutil.rmtree(temp_path)

        move_files(kfa_path, participant_path, "*.wav")
        move_files(kfa_path, participant_path, "*.TextGrid")
        #move_files(kfa_path, participant_path, "*.lab")
        #move_files(kfa_path, participant_path, "*.txt")

        shutil.rmtree(kfa_path)
    else:
        shutil.rmtree(participant_path)


삭제 완료: 02_stage1_20250404_1349.wav
삭제 완료: 02_stage6_20250404_1401.wav
삭제 완료: 02_stage2_20250404_1349.wav
삭제 완료: 02_stage3_20250404_1350.wav
삭제 완료: 02_stage5_20250404_1357.wav
삭제 완료: 02_stage4_20250404_1351.wav
총 6개의 파일 삭제가 완료되었습니다.
삭제 완료: 02_stage4_20250404_1351.TextGrid
삭제 완료: 02_stage5_20250404_1357.TextGrid
삭제 완료: 02_stage3_20250404_1350.TextGrid
삭제 완료: 02_stage6_20250404_1401.TextGrid
삭제 완료: 02_stage2_20250404_1349.TextGrid
삭제 완료: 02_stage1_20250404_1349.TextGrid
총 6개의 파일 삭제가 완료되었습니다.
총 0개의 파일 삭제가 완료되었습니다.
삭제 완료: 02_stage1_20250404_1349.txt
삭제 완료: 02_stage2_20250404_1349.txt
삭제 완료: 02_stage6_20250404_1401.txt
삭제 완료: 02_stage3_20250404_1350.txt
삭제 완료: 02_stage4_20250404_1351.txt
삭제 완료: 02_stage5_20250404_1357.txt
총 6개의 파일 삭제가 완료되었습니다.
삭제 완료: 02_stage3_20250404_1350.csv
삭제 완료: 02_stage4_20250404_1351.csv
삭제 완료: 02_stage5_20250404_1357.csv
삭제 완료: 02_stage1_20250404_1349.csv
삭제 완료: 02_stage6_20250404_1401.csv
삭제 완료: 02_stage2_20250404_1349.csv
총 6개의 파일 삭제가 완료되었습니다.
삭제 완료: 02_experiment

# control 조건 음성 파일 KFA 돌릴 준비

In [6]:
import os

audio_path = "/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA"
audio_files = os.listdir(audio_path)
audio_files = [audio_file for audio_file in audio_files if audio_file.endswith(".wav")]
audio_files.sort()

In [7]:
data = {}

In [9]:
audio_files[0].replace('.wav', '')

'난색'

In [17]:
for audio_file in audio_files:
    audio_file_path = os.path.join(audio_path, audio_file)
    txt_name = audio_file.replace('.wav', '')
    txt_path = os.path.join(audio_path, txt_name + '.txt')
    
    # 명시적으로 UTF-8 인코딩과 LF 줄 끝 문자 사용
    with open(txt_path, 'w', encoding='utf-8', newline='\n') as f:
        f.write(txt_name)
    print(txt_path)

/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/난색.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/난파.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/내빈.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/내사.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/녹각.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/논개.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/누룩.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-version-phonetic/output-KFA/누진.txt
/Users/bagjuhyeon/Documents/WorkSpace/phoneticConvergence/audio-sample/short-ve

In [18]:
for audio_file in audio_files:
    audio_file_path = os.path.join(audio_path, audio_file)
    txt_name = audio_file.replace('.wav', '')
    txt_path = audio_path + '/'+ txt_name + '.txt'
    
    with open(txt_path, 'r') as f:
        print(f.read())

난색
난파
내빈
내사
녹각
논개
누룩
누진
닌자
닝닝
말미
망루
매복
맹호
몰딩
몽매
무골
무운
미비
밀사
박애
발탁
백태
뱃심
복시
봉독
분개
불모
비재
빈축
상기
상록
새참
샐쭉
소조
송부
수하
술책
시문
실족
파군
판본
패물
팽배
포문
폭압
품띠
풍산
피폭
필생


In [19]:
def inspect_file(file_path):
    """
    파일의 바이트 단위 내용을 확인하는 함수
    """
    print(f"\n파일 경로: {file_path}")
    print(f"파일 크기: {os.path.getsize(file_path)} bytes")
    
    # 바이트 단위로 파일 읽기
    with open(file_path, 'rb') as f:
        content = f.read()
    
    print("\n바이트 내용:")
    print(content)
    
    print("\n각 바이트의 16진수 값:")
    print(' '.join(f'{b:02x}' for b in content))

# 사용 예시
# Cursor로 만든 파일
inspect_file("../audio-sample/short-version-phonetic/output-KFA/text.txt")
# Python으로 만든 파일
inspect_file("../audio-sample/short-version-phonetic/output-KFA/난색.txt")


파일 경로: ../audio-sample/short-version-phonetic/output-KFA/text.txt
파일 크기: 6 bytes

바이트 내용:
b'\xed\x8c\xbd\xeb\xb0\xb0'

각 바이트의 16진수 값:
ed 8c bd eb b0 b0

파일 경로: ../audio-sample/short-version-phonetic/output-KFA/난색.txt
파일 크기: 18 bytes

바이트 내용:
b'\xe1\x84\x82\xe1\x85\xa1\xe1\x86\xab\xe1\x84\x89\xe1\x85\xa2\xe1\x86\xa8'

각 바이트의 16진수 값:
e1 84 82 e1 85 a1 e1 86 ab e1 84 89 e1 85 a2 e1 86 a8


In [16]:
def clean_text_file(input_path, output_path=None):
    """
    텍스트 파일의 보이지 않는 문자를 제거하고 정리하는 함수
    
    Args:
        input_path (str): 입력 파일 경로
        output_path (str, optional): 출력 파일 경로 (None이면 입력 파일을 덮어씀)
    """
    try:
        # 파일 읽기
        with open(input_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 줄 끝 문자를 LF로 통일
        content = content.replace('\r\n', '\n')
        
        # 연속된 공백 제거
        lines = content.split('\n')
        cleaned_lines = []
        
        for line in lines:
            # 줄 끝 공백 제거
            line = line.rstrip()
            # 연속된 공백을 하나로 통일
            line = ' '.join(line.split())
            cleaned_lines.append(line)
        
        # 빈 줄 제거
        cleaned_lines = [line for line in cleaned_lines if line.strip()]
        
        # 정리된 내용을 하나의 문자열로 합치기
        cleaned_content = '\n'.join(cleaned_lines)
        
        # 출력 파일 경로가 지정되지 않은 경우 입력 파일을 덮어씀
        if output_path is None:
            output_path = input_path
        
        # 파일 저장 (BOM 없이)
        with open(output_path, 'w', encoding='utf-8', newline='\n') as f:
            f.write(cleaned_content)
            
        print(f"파일 정리가 완료되었습니다: {output_path}")
        
    except Exception as e:
        print(f"에러 발생: {str(e)}")

# 사용 예시
if __name__ == "__main__":
    # 같은 파일을 덮어쓰기
    clean_text_file("../audio-sample/short-version-phonetic/output-KFA/난색.txt")
    
    # 다른 파일로 저장
    #clean_text_file("input.txt", "cleaned_output.txt")

파일 정리가 완료되었습니다: ../audio-sample/short-version-phonetic/output-KFA/난색.txt
