In [1]:
import glob
import os
from pyspark.sql import SparkSession

# 1. 啟動 Spark Session
spark = SparkSession.builder \
    .appName("Local-Audio-Pipeline") \
    .master("local[*]") \
    .getOrCreate()

# 2. 找到你所有的 .mp3 檔案
# 使用 os.path.expanduser 來處理 '~'
data_directory = os.path.expanduser("~/data/dataset-acm-mirum")
local_files = glob.glob(os.path.join(data_directory, "*.mp3"))

if not local_files:
    print(f"在 {data_directory} 中找不到 *.mp3 檔案")
else:
    print(f"找到了 {len(local_files)} 個 .mp3 檔案")

    # 3. 將 Python 列表轉換為 Spark DataFrame
    # 為了讓 Spark 知道這是一個檔案路徑，使用 "file://" 協定
    # struct 'StructField' 允許我們定義欄位名稱
    from pyspark.sql.types import StructType, StructField, StringType
    
    # 將 ['path1.mp3', 'path2.mp3'] 轉換為 [('path1.mp3',), ('path2.mp3',)]
    path_data = [(f,) for f in local_files] 
    path_schema = StructType([StructField("local_path", StringType(), False)])

    df = spark.createDataFrame(path_data, schema=path_schema)

    df.show(5, truncate=False)
    # +-----------------------------------------------+
    # | local_path                                    |
    # +-----------------------------------------------+
    # | /Users/your_user/data/dataset-acm-mirum/1.mp3 |
    # | /Users/your_user/data/dataset-acm-mirum/2.mp3 |
    # ...
    # +-----------------------------------------------+

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/13 14:29:23 WARN Utils: Your hostname, MacBook-Pro-M5.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.232 instead (on interface en0)
25/11/13 14:29:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/13 14:29:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


找到了 1410 個 .mp3 檔案


[Stage 0:>                                                          (0 + 1) / 1]

+------------------------------------------------------------+
|local_path                                                  |
+------------------------------------------------------------+
|/Users/chenghungyeh/data/dataset-acm-mirum/2169604.clip.mp3 |
|/Users/chenghungyeh/data/dataset-acm-mirum/6930048.clip.mp3 |
|/Users/chenghungyeh/data/dataset-acm-mirum/11812770.clip.mp3|
|/Users/chenghungyeh/data/dataset-acm-mirum/3048624.clip.mp3 |
|/Users/chenghungyeh/data/dataset-acm-mirum/2204435.clip.mp3 |
+------------------------------------------------------------+
only showing top 5 rows


                                                                                

In [2]:
import librosa
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, StringType

# 1. 這是我們的 UDF 函式。它非常乾淨。
def process_local_audio(local_path):
    try:
        # Librosa 直接處理本地路徑
        y, sr = librosa.load(local_path, sr=None) # sr=None 讀取原始採樣率
        duration = librosa.get_duration(y=y, sr=sr)
        
        return (local_path, float(duration), int(sr), None) # (路徑, 時長, 採樣率, 錯誤)

    except Exception as e:
        return (local_path, None, None, str(e))

# 2. 定義 UDF 的輸出 Schema
output_schema = StructType([
    StructField("path", StringType(), False),
    StructField("duration", FloatType(), True),
    StructField("sample_rate", IntegerType(), True),
    StructField("error", StringType(), True)
])

# 3. 註冊 UDF
extract_metadata_udf = udf(process_local_audio, output_schema)

# 4. 應用 UDF
#    'df' 是我們在 Day 1/2 建立的 DataFrame
metadata_df = df.withColumn("metadata", extract_metadata_udf(col("local_path"))) \
                .select("metadata.*") # 展平 struct

print("元資料提取完成:")
metadata_df.show()

# +--------------------+--------+-------------+-----+
# | path               | duration | sample_rate | error|
# +--------------------+--------+-------------+-----+
# | /.../1.mp3         | 30.12    | 44100       | null |
# | /.../2.mp3         | 15.5     | 44100       | null |
# | /.../broken.mp3    | null     | null        | ...  |
# +--------------------+--------+-------------+-----+

元資料提取完成:


[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+---------+-----------+-----+
|                path| duration|sample_rate|error|
+--------------------+---------+-----------+-----+
|/Users/chenghungy...|     30.0|      22050| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...| 30.02882|      44100| NULL|
|/Users/chenghungy...|29.947392|      22050| NULL|
|/Users/chenghungy...|29.988209|      22050| NULL|
|/Users/chenghungy...| 59.94739|      22050| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...|30.040817|      44100| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...| 59.94739|      22050| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...|29.947392|      22050| NULL|
|/Users/chenghungy...|     30.0|      44100| NULL|
|/Users/chenghungy...| 30.02882|      44100| NULL|
|/Users/chenghungy...| 30.02882

                                                                                

步驟 2：在 Spark DataFrame 上產生統計資料

In [None]:
import tensorflow_data_validation as tfdv
from pyspark.sql.functions import col # 確保 col 被 import

# 這是 Day 3 (Cell 2) 產生的 metadata_df
print("正在 TFDV... (可能需要幾分鐘)")

# TFDV 可以直接在 Spark DataFrame 上運作
# 丟掉有 null 的行，避免 TFDV 報錯
stats = tfdv.generate_statistics_from_spark_dataframe(metadata_df.na.drop()) 
print("統計資料產生完畢。")

# 推斷 Schema
schema = tfdv.infer_schema(statistics=stats)

# 在 Jupyter Notebook 中顯示互動式 UI
# (注意: 你可能需要安裝 ipywidgets 才能看到 UI，如 Cell 5 的警告所示)
tfdv.display_schema(schema)

# --- 接著是過濾 (Filter) ---
print("TFDV Schema 已顯示。現在根據規則過濾：")

# 根據 Cell 2 的輸出，我們看到 sample_rate 有 22050 和 44100
# 假設我們的 ML 模型只接受 44100
clean_df = metadata_df.filter(
    (col("duration") > 1.0) &  # 假設我們要 > 1 秒
    (col("sample_rate") == 44100) & # 假設我們只要 44.1k
    (col("error").isNull())
)

print("過濾前的總數:", metadata_df.count())
print("過濾後的乾淨資料:", clean_df.count())
clean_df.show()

ModuleNotFoundError: No module named 'tensorflow_data_validation'

In [9]:
import ray
import librosa
import time
import os # 確保 os 被 import

# 1. (重新) 確保 Ray 已初始化
if ray.is_initialized():
    ray.shutdown()
ray.init(ignore_reinit_error=True)

# 2. 這是 Day 3 (Cell 2) 的函式，現在加上 @ray.remote
@ray.remote
def ray_process_local_audio(local_path):
    try:
        y, sr = librosa.load(local_path, sr=None) 
        duration = librosa.get_duration(y=y, sr=sr)
        return (local_path, float(duration), int(sr), None)
    except Exception as e:
        return (local_path, None, None, str(e))

# 3. 'local_files' 列表來自 Cell 1
#    我們在這裡重新載入它，以確保 cell 可以獨立執行
if 'local_files' not in globals():
    print("重新載入 'local_files' 列表...")
    data_directory = os.path.expanduser("~/data/dataset-acm-mirum")
    local_files = glob.glob(os.path.join(data_directory, "*.mp3"))

if not local_files:
    print("錯誤: 'local_files' 列表為空。")
else:
    print(f"正在提交 {len(local_files)} 個 Ray tasks...")
    start_time = time.time()
    
    # 提交所有任務
    futures = [ray_process_local_audio.remote(p) for p in local_files]
    
    # 等待並取得所有結果
    results = ray.get(futures)
    
    end_time = time.time()
    # 在 1410 個檔案上，這應該比 Spark UDF 快非常多
    print(f"Ray 處理完成！總耗時: {end_time - start_time:.2f} 秒")
    
    # 4. 顯示前 5 個結果
    print("\n--- Ray 處理結果 (前 5 筆) ---")
    for res in results[:5]:
        print(res)

ray.shutdown()

2025-11-13 14:39:51,714	INFO worker.py:1951 -- Started a local Ray instance.


正在提交 1410 個 Ray tasks...


[36m(ray_process_local_audio pid=4938)[0m [src/libmpg123/layer3.c:INT123_do_layer3():1804] error: dequantization failed!


Ray 處理完成！總耗時: 22.34 秒

--- Ray 處理結果 (前 5 筆) ---
('/Users/chenghungyeh/data/dataset-acm-mirum/2169604.clip.mp3', 30.0, 22050, None)
('/Users/chenghungyeh/data/dataset-acm-mirum/6930048.clip.mp3', 30.0, 44100, None)
('/Users/chenghungyeh/data/dataset-acm-mirum/11812770.clip.mp3', 30.0, 44100, None)
('/Users/chenghungyeh/data/dataset-acm-mirum/3048624.clip.mp3', 30.028820861678003, 44100, None)
('/Users/chenghungyeh/data/dataset-acm-mirum/2204435.clip.mp3', 29.947392290249432, 22050, None)


In [10]:
!pip install transformers torch accelerate

Collecting transformers
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
Collecting accelerate
  Downloading accelerate-1.11.0-py3-none-any.whl.metadata (19 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting tokenizers<=0.23.0,>=0.22.0 (from transformers)
  Downloading tokenizers-0.22.1-cp39-abi3-macosx_10_12_x86_64.whl.metadata (6.8 kB)
Collecting safetensors>=0.4.3 (from transformers)
  Downloading safetensors-0.6.2-cp38-abi3-macosx_10_12_x86_64.whl.metadata (4.1 kB)
Collecting hf-xet<2.0.0,>=1.1.3 (from huggingface-hub<1.0,>=0.34.0->transformers)
  Downloading hf_xet-1.2.0-cp37-abi3-macosx_10_12_x86_64.whl.metadata (4.9 kB)
Downloading transformers-4.57.1-py3-none-any.whl (12.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.0/12.0 MB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hDownloading huggingface_hub-0.36.0-py3-none-any.wh

這是完整的 Day 6 程式碼，用於執行你的 Spark 標註管線：

In [17]:
import librosa
import warnings
import os
import torch
from transformers import pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, StringType

# 1. 這是 Day 6 (Cell 6) 的函式
def process_and_label_local_audio(local_path):
    # 這些 import 必須在 UDF 內部
    import librosa
    import warnings
    from transformers import pipeline
    import torch

    # 1. 初始化 Whisper
    try:
        # 為了加速，使用 tiny.en
        transcriber = pipeline(
            "automatic-speech-recognition", 
            model="openai/whisper-tiny.en",
            device=0 if torch.cuda.is_available() else -1 # 自動使用 GPU
        )
    except Exception as e:
        transcriber = None

    # 2. 處理元資料
    try:
        y, sr = librosa.load(local_path, sr=None)
        duration = librosa.get_duration(y=y, sr=sr)
        
        # 3. 處理標註 (Whisper)
        transcription = None
        if transcriber:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                # Whisper 可以直接讀取本地檔案路徑
                result = transcriber(local_path)
                transcription = result['text'] if result else None
        else:
            transcription = "Transcriber failed to load"

        return (local_path, float(duration), int(sr), transcription, None)

    except Exception as e:
        return (local_path, None, None, None, str(e))

# 2. 定義 *新* 的 UDF Schema (包含 transcription)
label_schema = StructType([
    StructField("path", StringType(), False),
    StructField("duration", FloatType(), True),
    StructField("sample_rate", IntegerType(), True),
    StructField("transcription", StringType(), True), # 新欄位
    StructField("error", StringType(), True)
])

# 3. 註冊 UDF
label_udf = udf(process_and_label_local_audio, label_schema)

# 4. 應用 UDF
#    'df' 是來自 Cell 1 的原始 DataFrame
#    'spark' 是來自 Cell 1 的 SparkSession
if 'spark' not in globals() or spark.sparkContext._jsc.sc().isStopped():
    print("Spark Session 已停止，正在重啟...")
    spark = SparkSession.builder.appName("Local-Audio-Pipeline-Day6").master("local[*]").getOrCreate()
else:
    print("使用來自 Cell 1 的現有 Spark Session。")

print("開始執行 Spark + Whisper 標註管線...")
print(f"警告：這將在 {df.count()} 個檔案上執行 Whisper。")
print("這會非常非常慢，因為 Spark 會為 *每個任務* 載入一次模型。")
print("強烈建議先用 .limit(10) 進行測試。")

# --- 測試 (建議先取消註解這部分) ---
print("--- 正在執行 5 個檔案的測試 ---")
test_df = df.limit(5)
final_labeled_df = test_df.withColumn("data", label_udf(col("local_path"))) \
                          .select("data.*")

# --- 完整執行 (如果測試成功，請註解掉上面的測試) ---
# print("--- 正在執行完整管線 ---")
# final_labeled_df = df.withColumn("data", label_udf(col("local_path"))) \
#                      .select("data.*")


print("標註處理完成。")

# 5. 過濾與顯示
final_clean_df = final_labeled_df.filter(col("error").isNull())
final_clean_df.show()

# 6. (可選) 儲存到 Parquet
print("正在儲存到本地 Parquet 檔案...")
output_parquet_path = os.path.expanduser("~/data/dataset-acm-mirum/labeled_output.parquet")
final_clean_df.write.mode("overwrite").parquet(output_parquet_path)
print(f"資料已儲存到: {output_parquet_path}")

使用來自 Cell 1 的現有 Spark Session。
開始執行 Spark + Whisper 標註管線...


                                                                                

警告：這將在 1410 個檔案上執行 Whisper。
這會非常非常慢，因為 Spark 會為 *每個任務* 載入一次模型。
強烈建議先用 .limit(10) 進行測試。
--- 正在執行 5 個檔案的測試 ---
標註處理完成。


25/11/13 15:05:31 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 109)]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/pg/lfngft2164d29lfxxj3c_c840000gn/T/ipykernel_1810/1159670147.py", line 15, in process_and_label_local_audio
  File "/Users/chenghungyeh/anaconda3/envs/py311/lib/python3.11/site-packages/transformers/utils/import_utils.py", line 2320, in __getattr__
    raise ModuleNotFoundError(
ModuleNotFoundError: Could not import module 'pipeline'. Are this object's requirements defined correctly?

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:581)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:107)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:90)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/pg/lfngft2164d29lfxxj3c_c840000gn/T/ipykernel_1810/1159670147.py", line 15, in process_and_label_local_audio
  File "/Users/chenghungyeh/anaconda3/envs/py311/lib/python3.11/site-packages/transformers/utils/import_utils.py", line 2320, in __getattr__
    raise ModuleNotFoundError(
ModuleNotFoundError: Could not import module 'pipeline'. Are this object's requirements defined correctly?


使用 Ray Actor（Day 5 的進階版）來更高效地執行這個標註任務

In [18]:
import ray
import librosa
import warnings
import os
import glob
import time
import torch
from transformers import pipeline
from ray.util.actor_pool import ActorPool

# 1. 定義 Ray Actor (這是一個有狀態的 '類別')
@ray.remote(num_cpus=1) # 每個 Actor 限制使用 1 個 CPU 核心
class WhisperActor:
    
    # __init__ (初始化) 只會在 Actor 建立時執行一次
    def __init__(self, model_size="openai/whisper-tiny.en"):
        print(f"Actor (PID: {os.getpid()}) 正在載入模型: {model_size}...")
        
        # 決定是使用 CPU 還是 GPU
        # device = 0 if torch.cuda.is_available() else -1 
        # 為了穩定並行，有時在 CPU-only 的 Actor 上 'device=None' (預設 CPU) 更好
        self.transcriber = pipeline(
            "automatic-speech-recognition", 
            model=model_size,
            device=None, # 讓 Actor 在 CPU 上運行
            return_timestamps=True   # 這行是關鍵
        )
        print(f"Actor (PID: {os.getpid()}) 模型載入完畢。")

    # 這是 Actor 的主要工作函式
    # 它可以被遠端呼叫 (actor.transcribe.remote())
    def transcribe_audio(self, local_path):
        # 函式內部的 import 保持良好習慣
        import librosa
        import warnings
        
        try:
            # --- 執行 Day 3 + Day 6 的所有工作 ---
            
            # 1. 處理元資料 (Day 3 邏輯)
            y, sr = librosa.load(local_path, sr=None)
            duration = librosa.get_duration(y=y, sr=sr)
            
            # 2. 處理標註 (Day 6 邏輯)
            transcription = None
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                result = self.transcriber(local_path)
                transcription = result['text'] if result else None

            # 3. 返回完整結果
            return (local_path, float(duration), int(sr), transcription, None)

        except Exception as e:
            return (local_path, None, None, None, str(e))

# --- Main 執行緒 ---

if ray.is_initialized():
    ray.shutdown()
ray.init(ignore_reinit_error=True)

# 1. 載入 Cell 1 的檔案列表
if 'local_files' not in globals():
    print("重新載入 'local_files' 列表...")
    data_directory = os.path.expanduser("~/data/dataset-acm-mirum")
    local_files = glob.glob(os.path.join(data_directory, "*.mp3"))



if not local_files:
    print("錯誤: 'local_files' 列表為空。")
else:
    # --- 測試 (建議先取消註解這部分) ---
    print("--- 正在執行 5 個檔案的測試 ---")
    local_files = local_files[:5]

    print(f"找到了 {len(local_files)} 個 .mp3 檔案。")

    # 2. 決定 Actor 數量
    #    使用 CPU 核心的一半，避免過度飽和 (例如 8 核心就用 4 個)
    num_actors = max(1, os.cpu_count() // 2)
    print(f"正在啟動 {num_actors} 個 WhisperActor... (這會載入 {num_actors} 次模型)")

    # 3. 建立 Actor
    #    這一步會觸發每個 Actor 的 __init__，開始載入模型
    start_load = time.time()
    actor_pool_list = [WhisperActor.remote() for _ in range(num_actors)]
    pool = ActorPool(actor_pool_list)
    print(f"Actor Pool 建立完畢 (耗時: {time.time() - start_load:.2f} s)")

    # 4. 提交工作 (使用 Actor Pool)
    #    pool.map_unordered 會自動將 1410 個檔案分配給可用的 Actor
    #    這會返回一個 *generator*，它會 *即時* 產生結果
    print(f"開始提交 {len(local_files)} 個檔案到 Actor Pool...")
    start_process = time.time()
    
    results_generator = pool.map_unordered(
        lambda actor, path: actor.transcribe_audio.remote(path), 
        local_files
    )

    # 5. 收集結果
    results = []
    processed_count = 0
    for result in results_generator:
        results.append(result)
        processed_count += 1
        if processed_count % 100 == 0: # 每處理 100 個檔案就回報一次
            print(f"  已處理 {processed_count} / {len(local_files)} 個檔案...")

    end_process = time.time()
    print(f"--- Ray Actor 處理完畢！---")
    print(f"總耗時: {end_process - start_process:.2f} 秒")
    print(f"平均每個檔案: {(end_process - start_process) / len(local_files):.4f} 秒")
    
    # 6. 顯示前 5 個結果
    print("\n--- 處理結果 (前 5 筆) ---")
    for res in results[:5]:
        print(res)

ray.shutdown()

2025-11-13 15:09:10,676	INFO worker.py:1951 -- Started a local Ray instance.


--- 正在執行 5 個檔案的測試 ---
找到了 5 個 .mp3 檔案。
正在啟動 5 個 WhisperActor... (這會載入 5 次模型)
Actor Pool 建立完畢 (耗時: 0.01 s)
開始提交 5 個檔案到 Actor Pool...
[36m(WhisperActor pid=21500)[0m Actor (PID: 21500) 正在載入模型: openai/whisper-tiny.en...


[36m(WhisperActor pid=21500)[0m Device set to use mps:0


[36m(WhisperActor pid=21500)[0m Actor (PID: 21500) 模型載入完畢。


[36m(WhisperActor pid=21500)[0m Using custom `forced_decoder_ids` from the (generation) config. This is deprecated in favor of the `task` and `language` flags/config options.
[36m(WhisperActor pid=21496)[0m `return_token_timestamps` is deprecated for WhisperFeatureExtractor and will be removed in Transformers v5. Use `return_attention_mask` instead, as the number of frames can be inferred from it.
[36m(WhisperActor pid=21496)[0m Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
[36m(WhisperActor pid=21496)[0m Device set to use mps:0[32m [repeated 4x across cluster][0m
[36m(WhisperActor pid=21495)[0m Using custom `forced_decoder_ids` from the (generation) config. This is deprecated in favor of the `task` and `language` flags/config options.[32m [repeated 4x across cluster][0m


--- Ray Actor 處理完畢！---
總耗時: 51.36 秒
平均每個檔案: 10.2719 秒

--- 處理結果 (前 5 筆) ---
('/Users/chenghungyeh/data/dataset-acm-mirum/11812770.clip.mp3', 30.0, 44100, " Yeah, but then I let you go And now it's only better that I should let you know What you should know I can't live When the limit is without you", None)
('/Users/chenghungyeh/data/dataset-acm-mirum/2169604.clip.mp3', 30.0, 22050, " Love to be with you, if only I could. She wrecked the car and she was sad and so afraid that I'd be mad, but what the heck? Though I pretended hard to be, guess you could say she saw through me and hugged my neck.........................................", None)
('/Users/chenghungyeh/data/dataset-acm-mirum/6930048.clip.mp3', 30.0, 44100, " So what if it hurts me, so what if I break down, so what if this world just goes beyond the edge, my feet run out of ground, I'll have fun my place, I wanna hear myself, don't care about all the pain in front of me. Thank you very much. Thank you very much. Thank you very