In [None]:
from tqdm import tqdm
import json
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any, Optional


class JsonToCsvProcessor:
    """
    一個用於處理 JSON 檔案並轉換為 CSV 的類別。

    該類別會遞迴遍歷來源目錄，**並自動過濾掉 macOS 的系統檔案**，
    讀取所有有效的 .json 檔案，提取特定鍵值，並將轉換後的路徑寫入 CSV。
    """

    def __init__(self, input_dir: str, output_csv: str, target_key: str = "台羅"):
        self.input_path = Path(input_dir)
        self.output_path = Path(output_csv)
        self.target_key = target_key
        self.processed_data: List[Dict[str, Any]] = []

        if not self.input_path.is_dir():
            raise FileNotFoundError(f"錯誤：來源目錄不存在 -> {self.input_path}")

    def _transform_path(self, json_path: Path) -> str:
        """
        根據指定規則，將 JSON 路徑轉換為目標 WAV 路徑 (此方法無需修改)。
        """
        path_str = str(json_path.resolve())
        transformed_path_str = path_str.replace("standard_json", "standard", 1)
        final_path = Path(transformed_path_str).with_suffix(".wav")
        return str(final_path)

    def _process_single_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
        """
        處理單一 JSON 檔案 (此方法無需修改)。
        """
        try:
            with open(file_path, "r", encoding="utf-8") as f:
                content = json.load(f)

            if self.target_key in content:
                target_path = self._transform_path(file_path)
                return {
                    self.target_key: content[self.target_key],
                    "pathfile": target_path,
                }
            else:
                print(
                    f"[警告] 檔案 {file_path.name} 中找不到鍵值 '{self.target_key}'，已略過。"
                )
                return None
        except (
            json.JSONDecodeError,
            UnicodeDecodeError,
        ) as e:  # 增加對 UnicodeDecodeError 的捕獲以提高穩健性
            print(f"[警告] 讀取檔案 {file_path.name} 時發生錯誤: {e}，已略過。")
            return None
        except Exception as e:
            print(f"[錯誤] 處理檔案 {file_path.name} 時發生未知錯誤: {e}")
            return None

    def run(self) -> None:
        """
        執行整個處理流程，增加對系統檔案的過濾。
        """
        print(f"[*] 開始遞迴掃描目錄: {self.input_path}")
        all_files = list(self.input_path.rglob("*.json"))

        if not all_files:
            print("[!] 目錄及其子目錄中未找到任何 .json 檔案。")
            return

        print(f"[*] 共找到 {len(all_files)} 個符合 .json 模式的檔案。")

        # --- 核心修改處：過濾掉以 '._' 開頭的 macOS 系統檔案 ---
        json_files = [f for f in all_files if not f.name.startswith("._")]

        print(f"[*] 過濾掉系統檔案後，將處理 {len(json_files)} 個有效 JSON 檔案...")

        for file_path in tqdm(json_files, desc="正在處理JSON檔案"):
            result = self._process_single_file(file_path)
            if result:
                self.processed_data.append(result)

        if self.processed_data:
            self._save_to_csv()
        else:
            print("[!] 沒有成功處理任何資料，不產生 CSV 檔案。")

    def _save_to_csv(self) -> None:
        """
        將處理完成的資料儲存為 CSV 檔案 (此方法無需修改)。
        """
        df = pd.DataFrame(self.processed_data)
        column_order = [self.target_key, "pathfile"]
        df = df[column_order]

        try:
            self.output_path.parent.mkdir(parents=True, exist_ok=True)
            df.to_csv(self.output_path, index=False, encoding="utf-8-sig")
            print(f"\n[+] 處理完成！資料已成功儲存至: {self.output_path}")
        except Exception as e:
            print(f"[錯誤] 儲存 CSV 檔案時發生錯誤: {e}")


# --- 主程式執行區塊 (無需修改) ---
if __name__ == "__main__":
    SOURCE_DIRECTORY = "./standard_json"
    OUTPUT_CSV_FILE = "./output/final_audio_paths.csv"

    try:
        processor = JsonToCsvProcessor(
            input_dir=SOURCE_DIRECTORY, output_csv=OUTPUT_CSV_FILE
        )
        processor.run()
    except FileNotFoundError as e:
        print(e)
    except Exception as e:
        print(f"程式執行時發生未預期的錯誤: {e}")

[*] 開始遞迴掃描目錄: standard_json
[*] 共找到 163448 個符合 .json 模式的檔案。
[*] 過濾掉系統檔案後，將處理 81724 個有效 JSON 檔案...


正在處理JSON檔案: 100%|██████████| 81724/81724 [04:54<00:00, 277.61it/s] 



[+] 處理完成！資料已成功儲存至: output\final_audio_paths_numTune.csv


In [4]:
pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [6]:
import pandas as pd

df = pd.read_csv("output_zh_optimized.csv")

df = df.drop_duplicates(subset=["中文意譯"])

df.info()

# 將去除重複後的 DataFrame 儲存為新的 CSV 檔案
df.to_csv("output_zh_optimized_dedup.csv", index=False, encoding="utf-8-sig")
print("output_zh_optimized_dedup.csv")

<class 'pandas.core.frame.DataFrame'>
Index: 32638 entries, 0 to 81692
Data columns (total 3 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   transcription  32638 non-null  object
 1   file           32638 non-null  object
 2   中文意譯           32637 non-null  object
dtypes: object(3)
memory usage: 1019.9+ KB
output_zh_optimized_dedup.csv


In [None]:
import os
import re

# 台羅字母集合
tailo_chars = "āēīōūm̄ńńḡûe̍i̍áâíáìóàâêîôûm̂n̂"
# 取得所有檔名（不含副檔名）
filenames = [
    os.path.splitext(f)[0]
    for f in os.listdir("audio_files")
    if os.path.isfile(os.path.join("audio_files", f))
]
result = []
for name in filenames:
    if re.search(r"POJ", name):
        result.append(name)
print(result)

[]


In [None]:
import os
import csv

FOLDER = "audio_files"
OUTPUT_CSV = "audio_files_list_2.csv"

files = [f for f in os.listdir(FOLDER) if os.path.isfile(os.path.join(FOLDER, f))]

with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=["file_path", "transcription"])
    writer.writeheader()
    for f in files:
        name, _ = os.path.splitext(f)
        writer.writerow({"file_path": os.path.join(FOLDER, f), "transcription": name})

In [None]:
import csv
import os


def normalize_file_key(file_path):
    """只取檔名，去除前後空白並轉小寫"""
    return os.path.basename(file_path).strip().lower()


# 1. 建立 file: transcription 對照表
transcription_dict = {}
with open("output/final_audio_paths_numTune.csv", newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f, fieldnames=["transcription", "file"])
    for row in reader:
        file_key = normalize_file_key(row.get("file", ""))
        transcription = row.get("transcription", "").strip()
        if file_key:
            transcription_dict[file_key] = transcription

# 2. 讀取合併主檔，並合併 transcription 到新欄位「台羅數字調」
output_rows = []
with open("output_zh_optimized_dedup_merged.csv", newline="", encoding="utf-8") as f:
    reader = list(csv.DictReader(f))
    if not reader:
        raise Exception("output_zh_optimized_dedup_merged.csv 無資料")
    fieldnames = list(reader[0].keys())
    if "台羅數字調" not in fieldnames:
        fieldnames.append("台羅數字調")
    for row in reader:
        file_key = normalize_file_key(row.get("file", ""))
        row["台羅數字調"] = transcription_dict.get(file_key, "")
        output_rows.append(row)

# 3. 輸出新檔案
with open(
    "output_zh_optimized_dedup_with_tailed.csv",
    "w",
    newline="",
    encoding="utf-8",
) as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(output_rows)

print("合併完成，已另存為 output_zh_optimized_dedup_with_tailed.csv")

合併完成，已另存為 output_zh_optimized_dedup_with_tailed.csv


In [None]:
import csv
import os


def normalize_file_key(file_path):
    """只取檔名，去除前後空白並轉小寫"""
    return os.path.basename(file_path).strip().lower()


# 1. 建立 file: transcription 對照表
transcription_dict = {}
with open("output/final_audio_paths.csv", newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f, fieldnames=["transcription", "file"])
    for row in reader:
        file_key = normalize_file_key(row.get("file", ""))
        transcription = row.get("transcription", "").strip()
        if file_key:
            transcription_dict[file_key] = transcription

# 2. 讀取合併主檔，並合併 transcription 到新欄位「台羅數字調」
output_rows = []
with open(
    "output_zh_optimized_dedup_with_tailed.csv", newline="", encoding="utf-8"
) as f:
    reader = list(csv.DictReader(f))
    if not reader:
        raise Exception("output_zh_optimized_dedup_merged.csv 無資料")
    fieldnames = list(reader[0].keys())
    if "台羅" not in fieldnames:
        fieldnames.append("台羅")
    for row in reader:
        file_key = normalize_file_key(row.get("file", ""))
        row["台羅"] = transcription_dict.get(file_key, "")
        output_rows.append(row)

# 3. 輸出新檔案
with open(
    "output_zh_optimized_dedup_with_tailed_2.csv",
    "w",
    newline="",
    encoding="utf-8",
) as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(output_rows)

print("合併完成，已另存為 output_zh_optimized_dedup_with_tailed_2.csv")

合併完成，已另存為 output_zh_optimized_dedup_with_tailed_2.csv


In [None]:
import csv
import re


def is_all_english(text):
    """
    判斷字串是否全部為英文（只含英文字母、空白、標點）
    """
    if not text:
        return False
    # 移除空白與標點後，檢查是否只剩英文字母
    cleaned = re.sub(r"[^A-Za-z]", "", text)
    return bool(cleaned) and len(cleaned) == len(
        text.replace(" ", "").replace("\t", "")
    )


input_file = "output_zh_optimized_dedup_with_tailed_2.csv"
output_file = "output_zh_optimized_dedup_with_tailed_2_no_english.csv"

rows_to_keep = []
with open(input_file, newline="", encoding="utf-8") as f:
    reader = list(csv.DictReader(f))
    if not reader:
        raise Exception(f"{input_file} 無資料")
    fieldnames = reader[0].keys()
    for row in reader:
        text = row.get("chinese_text", "")
        # 若不是全英文，則保留
        if not is_all_english(text):
            rows_to_keep.append(row)

with open(output_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(rows_to_keep)

print(f"已完成，純英文 chinese_text 的列已刪除，結果輸出為 {output_file}")

已完成，純英文 chinese_text 的列已刪除，結果輸出為 output_zh_optimized_dedup_with_tailed_2_no_english.csv


In [None]:
import os


def get_filename(path):
    return os.path.basename(path).strip().lower()


# 建立 file: transcription 對照表（只用檔名比對）
transcription_dict = {}
with open("output/final_audio_paths_numTune.csv", newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        file_key = get_filename(row.get("file", ""))
        transcription = row.get("transcription", "").strip()
        if file_key:
            transcription_dict[file_key] = transcription

output_rows = []
with open("output_zh_optimized_dedup_merged.csv", newline="", encoding="utf-8") as f:
    reader = list(csv.DictReader(f))
    if not reader:
        raise Exception("output_zh_optimized_dedup_merged.csv 無資料")
    fieldnames = list(reader[0].keys())
    if "台羅數字調" not in fieldnames:
        fieldnames.append("台羅數字調")
    for row in reader:
        file_key = get_filename(row.get("file", ""))
        row["台羅數字調"] = transcription_dict.get(file_key, "")
        output_rows.append(row)

with open(
    "output_zh_optimized_dedup_with_tailed.csv",
    "w",
    newline="",
    encoding="utf-8",
) as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(output_rows)

print("合併完成，已另存為 output_zh_optimized_dedup_with_tailed.csv")

合併完成，已另存為 output_zh_optimized_dedup_with_tailed.csv


In [None]:
import csv

# 欲合併的檔案
input_files = [
    "output_zh_optimized_dedup_part1.csv",
    "output_zh_optimized_dedup_part2.csv",
    "output_zh_optimized_dedup_part3.csv",
    "output_zh_optimized_dedup_part4.csv",
]
output_file = "output_zh_optimized_dedup_merged.csv"

all_rows = []
fieldnames = None

for idx, file in enumerate(input_files):
    with open(file, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        if idx == 0:
            fieldnames = reader.fieldnames
        for row in reader:
            all_rows.append(row)

with open(output_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(all_rows)

print(f"合併完成，已輸出為 {output_file}")

合併完成，已輸出為 output_zh_optimized_dedup_merged.csv


In [None]:
import csv

# 將台羅篩選掉
# 1. 讀取台羅音標 list
tailo_list = [
    "ā",
    "á",
    "à",
    "â",
    "a̍",
    "ē",
    "é",
    "è",
    "ê",
    "e̍",
    "ī",
    "í",
    "ì",
    "i̍",
    "ō",
    "ó",
    "ò",
    "ô",
    "o̍",
    "ū",
    "ú",
    "ù",
    "u̍",
]

csv_file = "output_zh_optimized_v2.csv"
rows_to_keep = []
rows_deleted = []

# 2. 讀取 CSV
with open(csv_file, newline="", encoding="utf-8") as f:
    reader = list(csv.DictReader(f))
    fieldnames = reader[0].keys() if reader else []

# 3. 檢查每一列
for row in reader:
    tailo_value = row.get("中文意譯", "")
    # 判斷是否包含所有台羅音標
    if any(tone in tailo_value for tone in tailo_list):
        rows_deleted.append(row)
    else:
        rows_to_keep.append(row)

# 4. 覆蓋寫回 CSV
with open(csv_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(rows_to_keep)

# 5. 輸出被刪除的列資訊
print("已刪除的列數：", len(rows_deleted))
for idx, row in enumerate(rows_deleted, 1):
    print(
        f"{idx}. chinese_text{row.get('chinese_text', '')} | 台羅：{row.get('台羅', '')}"
    )

已刪除的列數： 1105
1. chinese_texta-kong-á hō͘ lâng chhiáⁿ到家吃飯會帶禮物，拎燒酒比較多。 | 台羅：A-tok-á hōo lâng tshiánn khì tshù tsia̍h-pn̄g-huē Tsah lé-bu̍t , kuānn sio-tsiú khah-tsè .
2. chinese_textchok-chiá: tân hông-hāu | 台羅：tsok-tsiá : Tân Hong-huī
3. chinese_texttī這個五十一歲的生日的轉折點， | 台羅：tī tsit-ê gōo-tsa̍p-it huè senn-ji̍t ê khám-tsām,
4. chinese_textchok-chiá: lîm bí-lêng | 台羅：tsok-tsiá : lîm bí-lē
5. chinese_textthiⁿ ná ē pang--lo̍h-lâi? | 台羅：Thinn ná-ē pang --lo̍h-lâi ?
6. chinese_textná-tio̍h kiaⁿ kúi? | 台羅：ná tio̍h-kiann kuí ?
7. chinese_text人 leh kóng: | 台羅：Lâng leh kóng :
8. chinese_text就是呂副總統所做--ê好事。 | 台羅：tse Tiō-sī Lū hù-tsóng-thóng sóo tsò--ê hó-tāi .
9. chinese_texthō͘它再一次蒸餾。 | 台羅：koh the̍h-tshut-lâi liū --tsi̍t-pái .
10. chinese_texthō͘我再看看， | 台羅：koh tsim-tsiok kā khuànn ,
11. chinese_textchiah hō͘ i an-sim--lo̍h-lâi。 | 台羅：tsiah hōo i an-sim --lo̍h-lâi .
12. chinese_textnā án-ne就湊巧了， | 台羅：án-ne tō thìng-hó tsai-iánn ,
13. chinese_textsī--lah, | 台羅：sī--lah ,
14. chinese_textthiⁿ ná ē pang--l

In [18]:
import csv
import shutil
import re


def extract_poj_content(text):
    """
    有 [POJ]...[/POJ] 則取出並去除 "，否則保留原始內容並去除 "。
    """
    if not text:
        return ""
    match = re.search(r"\[POJ\](.*?)\[/POJ\]", text, re.DOTALL)
    if match:
        result = match.group(1).strip()
    else:
        result = text
    return result.replace('"', "")


def process_csv_extract_poj(csv_file, column):
    # 備份原始檔
    shutil.copyfile(csv_file, csv_file + ".bak")

    rows = []
    with open(csv_file, newline="", encoding="utf-8") as f:
        reader = list(csv.DictReader(f))
        if not reader:
            return
        fieldnames = reader[0].keys()
        for row in reader:
            value = row.get(column, "")
            row[column] = extract_poj_content(value)
            rows.append(row)
    with open(csv_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, quoting=csv.QUOTE_MINIMAL)
        writer.writeheader()
        writer.writerows(rows)
    print(
        "已完成：[POJ]...[/POJ] 內容提取（無則保留原始內容），並去除所有引號，原始檔已備份為 .bak"
    )


if __name__ == "__main__":
    process_csv_extract_poj("output_zh_optimized_v2.csv", "中文意譯")

已完成：[POJ]...[/POJ] 內容提取（無則保留原始內容），並去除所有引號，原始檔已備份為 .bak


In [None]:
import csv
import os

# 1. 讀取 CSV，建立 chinese_text: row 對照表
csv_file = "output_zh_optimized_dedup_with_tailed_2_no_english.csv"
audio_dir = "audio_files"
output_file = "audio_matched.csv"

rows_dict = {}
with open(csv_file, newline="", encoding="utf-8") as f:
    reader = list(csv.DictReader(f))
    if not reader:
        raise Exception(f"{csv_file} 無資料")
    fieldnames = list(reader[0].keys())
    for row in reader:
        key = row.get("chinese_text", "").strip()
        if key:
            rows_dict[key] = row

# 2. 準備新欄位（去除「中文意譯」並加上 file_path）
new_fieldnames = [fn for fn in fieldnames if fn != "中文意譯"]
if "file_path" not in new_fieldnames:
    new_fieldnames.append("file_path")

# 3. 比對音檔檔名與 chinese_text
output_rows = []
for fname in os.listdir(audio_dir):
    if not os.path.isfile(os.path.join(audio_dir, fname)):
        continue
    name, _ = os.path.splitext(fname)
    if name in rows_dict:
        row = {k: v for k, v in rows_dict[name].items() if k != "中文意譯"}
        row["file_path"] = os.path.join(audio_dir, fname)
        output_rows.append(row)

# 4. 輸出新 CSV
with open(output_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=new_fieldnames)
    writer.writeheader()
    writer.writerows(output_rows)

print(f"比對完成，已輸出 {output_file}，共 {len(output_rows)} 筆資料。")

比對完成，已輸出 audio_matched.csv，共 30878 筆資料。
