In [None]:
# === 步驟一：升級函式庫 ===
!pip install --upgrade pandas pyarrow

## 儲存格 1：掛載 Google Drive 並設定工作目錄

In [3]:
# @title
from google.colab import drive
import os
import pandas as pd
import numpy as np
import glob
from datetime import datetime
import warnings

# 忽略一些 Pandas 未來可能移除的功能的警告，讓輸出乾淨些
warnings.simplefilter(action='ignore', category=FutureWarning)

# 1. 掛載 Drive
drive.mount('/content/drive')

# 2. 建立並切換工作目錄 (請根據你的實際路徑修改)
WORK_DIR = '/content/drive/MyDrive/FRL_Slicing_Sim' # 你可以改成你想要的目錄名稱
if not os.path.exists(WORK_DIR):
    os.makedirs(WORK_DIR)
os.chdir(WORK_DIR)
print(f"目前工作目錄: {os.getcwd()}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
目前工作目錄: /content/drive/MyDrive/FRL_Slicing_Sim


## 儲存格 2：下載資料集

In [4]:
# @title
DATASET_GIT_URL = "https://github.com/wineslab/colosseum-oran-coloran-dataset.git"
DATASET_LOCAL_NAME = 'colosseum-oran-coloran-dataset'

if not os.path.exists(DATASET_LOCAL_NAME):
    print(f"正在下載資料集從 {DATASET_GIT_URL}...")
    # Corrected indentation for the shell command
    !git clone -q {DATASET_GIT_URL} # 使用 -q 來減少輸出
    # Corrected indentation for the print statement
    print("資料集下載完成。")
else:
    # Ensure consistent indentation for the else block as well
    print(f"資料集資料夾 '{DATASET_LOCAL_NAME}' 已存在。")

資料集資料夾 'colosseum-oran-coloran-dataset' 已存在。


## 儲存格 3：設定、參數配置、和來自 README 的資料

In [5]:
# @title
import os

print("\n--- Cell 3: 設定與參數 ---")

# 1️⃣ 定義資料集資料夾與路徑
#    假設你已在 Cell 1 & 2 中設定 WORK_DIR 與 DATASET_LOCAL_NAME
DATASET_DIR            = os.path.join(WORK_DIR, DATASET_LOCAL_NAME)
#    指向 rome_static_medium 資料夾
ROME_STATIC_MEDIUM_DIR = os.path.join(DATASET_DIR, 'rome_static_medium')
#    如果你希望直接使用雲端根目錄下的「201FL研究」資料夾，請改為：
# ROME_STATIC_MEDIUM_DIR = '/content/drive/MyDrive/201FL研究/rome_static_medium'

# 2️⃣ 輸出 Parquet 的路徑（保存在雲端硬碟裡的 201FL研究 資料夾）
OUTPUT_PARQUET_PATH    = '/content/drive/MyDrive/201FL研究/kpi_traces.parquet'

# 3️⃣ README 中的 UE_ID -> slice_id, traffic_type 對應
ue_info_coloran = {
    3:  {'slice_id': 0, 'traffic_type': 'eMBB'},   6: {'slice_id': 0, 'traffic_type': 'eMBB'},
    10: {'slice_id': 0, 'traffic_type': 'eMBB'},  13: {'slice_id': 0, 'traffic_type': 'eMBB'},
    17: {'slice_id': 0, 'traffic_type': 'eMBB'},  20: {'slice_id': 0, 'traffic_type': 'eMBB'},
    24: {'slice_id': 0, 'traffic_type': 'eMBB'},  27: {'slice_id': 0, 'traffic_type': 'eMBB'},
    31: {'slice_id': 0, 'traffic_type': 'eMBB'},  34: {'slice_id': 0, 'traffic_type': 'eMBB'},
    38: {'slice_id': 0, 'traffic_type': 'eMBB'},  41: {'slice_id': 0, 'traffic_type': 'eMBB'},
    45: {'slice_id': 0, 'traffic_type': 'eMBB'},  48: {'slice_id': 0, 'traffic_type': 'eMBB'},
    4:  {'slice_id': 1, 'traffic_type': 'MTC'},    7: {'slice_id': 1, 'traffic_type': 'MTC'},
    11: {'slice_id': 1, 'traffic_type': 'MTC'},   14: {'slice_id': 1, 'traffic_type': 'MTC'},
    18: {'slice_id': 1, 'traffic_type': 'MTC'},   21: {'slice_id': 1, 'traffic_type': 'MTC'},
    25: {'slice_id': 1, 'traffic_type': 'MTC'},   28: {'slice_id': 1, 'traffic_type': 'MTC'},
    32: {'slice_id': 1, 'traffic_type': 'MTC'},   35: {'slice_id': 1, 'traffic_type': 'MTC'},
    39: {'slice_id': 1, 'traffic_type': 'MTC'},   42: {'slice_id': 1, 'traffic_type': 'MTC'},
    46: {'slice_id': 1, 'traffic_type': 'MTC'},   49: {'slice_id': 1, 'traffic_type': 'MTC'},
    2:  {'slice_id': 2, 'traffic_type': 'URLLC'},  5: {'slice_id': 2, 'traffic_type': 'URLLC'},
    9:  {'slice_id': 2, 'traffic_type': 'URLLC'}, 12: {'slice_id': 2, 'traffic_type': 'URLLC'},
    16: {'slice_id': 2, 'traffic_type': 'URLLC'}, 19: {'slice_id': 2, 'traffic_type': 'URLLC'},
    23: {'slice_id': 2, 'traffic_type': 'URLLC'}, 26: {'slice_id': 2, 'traffic_type': 'URLLC'},
    30: {'slice_id': 2, 'traffic_type': 'URLLC'}, 33: {'slice_id': 2, 'traffic_type': 'URLLC'},
    37: {'slice_id': 2, 'traffic_type': 'URLLC'}, 40: {'slice_id': 2, 'traffic_type': 'URLLC'},
    44: {'slice_id': 2, 'traffic_type': 'URLLC'}, 47: {'slice_id': 2, 'traffic_type': 'URLLC'},
}

# 4️⃣ tr_id -> RBG 分配
training_configs_coloran = {
    'tr0':  {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1': 13, 'RBG_Alloc_Slice2':  2},
    'tr1':  {'RBG_Alloc_Slice0':  4, 'RBG_Alloc_Slice1': 11, 'RBG_Alloc_Slice2':  2},
    'tr2':  {'RBG_Alloc_Slice0':  6, 'RBG_Alloc_Slice1':  9, 'RBG_Alloc_Slice2':  2},
    'tr3':  {'RBG_Alloc_Slice0':  8, 'RBG_Alloc_Slice1':  7, 'RBG_Alloc_Slice2':  2},
    'tr4':  {'RBG_Alloc_Slice0': 10, 'RBG_Alloc_Slice1':  5, 'RBG_Alloc_Slice2':  2},
    'tr5':  {'RBG_Alloc_Slice0': 12, 'RBG_Alloc_Slice1':  3, 'RBG_Alloc_Slice2':  2},
    'tr6':  {'RBG_Alloc_Slice0': 14, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2':  2},
    'tr7':  {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1': 11, 'RBG_Alloc_Slice2':  4},
    'tr8':  {'RBG_Alloc_Slice0':  4, 'RBG_Alloc_Slice1':  9, 'RBG_Alloc_Slice2':  4},
    'tr9':  {'RBG_Alloc_Slice0':  6, 'RBG_Alloc_Slice1':  7, 'RBG_Alloc_Slice2':  4},
    'tr10': {'RBG_Alloc_Slice0':  8, 'RBG_Alloc_Slice1':  5, 'RBG_Alloc_Slice2':  4},
    'tr11': {'RBG_Alloc_Slice0': 10, 'RBG_Alloc_Slice1':  3, 'RBG_Alloc_Slice2':  4},
    'tr12': {'RBG_Alloc_Slice0': 12, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2':  4},
    'tr13': {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1':  9, 'RBG_Alloc_Slice2':  6},
    'tr14': {'RBG_Alloc_Slice0':  4, 'RBG_Alloc_Slice1':  7, 'RBG_Alloc_Slice2':  6},
    'tr15': {'RBG_Alloc_Slice0':  6, 'RBG_Alloc_Slice1':  5, 'RBG_Alloc_Slice2':  6},
    'tr16': {'RBG_Alloc_Slice0':  8, 'RBG_Alloc_Slice1':  3, 'RBG_Alloc_Slice2':  6},
    'tr17': {'RBG_Alloc_Slice0': 10, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2':  6},
    'tr18': {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1':  7, 'RBG_Alloc_Slice2':  8},
    'tr19': {'RBG_Alloc_Slice0':  4, 'RBG_Alloc_Slice1':  5, 'RBG_Alloc_Slice2':  8},
    'tr20': {'RBG_Alloc_Slice0':  6, 'RBG_Alloc_Slice1':  3, 'RBG_Alloc_Slice2':  8},
    'tr21': {'RBG_Alloc_Slice0':  8, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2':  8},
    'tr22': {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1':  5, 'RBG_Alloc_Slice2': 10},
    'tr23': {'RBG_Alloc_Slice0':  4, 'RBG_Alloc_Slice1':  3, 'RBG_Alloc_Slice2': 10},
    'tr24': {'RBG_Alloc_Slice0':  6, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2': 10},
    'tr25': {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1':  3, 'RBG_Alloc_Slice2': 12},
    'tr26': {'RBG_Alloc_Slice0':  4, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2': 12},
    'tr27': {'RBG_Alloc_Slice0':  2, 'RBG_Alloc_Slice1':  1, 'RBG_Alloc_Slice2': 14},
}

# 5️⃣ scheduling policy map
scheduling_policy_map = {'sched0': 'RR', 'sched1': 'WF', 'sched2': 'PF'}

# 6️⃣ BS_ID 列表（Cell 6 會自動覆蓋）
bs_ids_from_readme = [1,8,15,22,29,36,43]

# 7️⃣ CSV 欄位名稱
COLS_BS_CSV = ['time','nof_ue','dl_brate','ul_brate']
COLS_UE_CSV = ['time','cc','pci','earfcn','rsrp','pl','cfo','dl_mcs','dl_snr','dl_turbo',
                'dl_brate','dl_bler','ul_ta','ul_mcs','ul_buff','ul_brate','ul_bler',
                'rf_o','rf_u','rf_l','is_attached']
COLS_SLICE_METRICS_CSV_EXPECTED_CLEAN = [
    'Timestamp','num_ues','IMSI','RNTI','slicing_enabled','slice_id',
    'slice_prb','power_multiplier','scheduling_policy',
    'dl_mcs','dl_n_samples','dl_buffer [bytes]',
    'tx_brate downlink [Mbps]','tx_pkts downlink','tx_errors downlink (%)',
    'dl_cqi','ul_mcs','ul_n_samples','ul_buffer [bytes]',
    'rx_brate uplink [Mbps]','rx_pkts uplink','rx_errors uplink (%)',
    'ul_rssi','ul_sinr','phr','sum_requested_prbs','sum_granted_prbs',
    'dl_pmi','dl_ri','ul_n','ul_turbo_iters'
]

# 8️⃣ 最終 KPI 欄位
TARGET_KPI_COLUMNS = [
    'BS_ID','exp_id','Training_Config_ID','Scheduling_Policy_Active','timestamp'
] + COLS_SLICE_METRICS_CSV_EXPECTED_CLEAN

print("設定完成。")



--- Cell 3: 設定與參數 ---
設定完成。


## 儲存格 4：找出所有三種類型的 CSV 檔案

In [6]:
# @title
# === 儲存格 4：尋找所有 CSV 檔案 ===
import os
from glob import glob

print("\n--- Cell 4: 尋找 CSV 檔案 ---")

# 1️⃣ 確認原始資料夾存在
if not os.path.exists(ROME_STATIC_MEDIUM_DIR):
    raise FileNotFoundError(f"錯誤：找不到資料夾：{ROME_STATIC_MEDIUM_DIR}")

# 2️⃣ 定義 glob 模式
bs_pattern    = os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*/bs*.csv")
ue_pattern    = os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*/ue*.csv")
slice_pattern = os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*/slices_bs*/*_metrics.csv")

# 3️⃣ 抓路徑並排序
all_bs_level_csv_files    = sorted(glob(bs_pattern))
all_ue_level_csv_files    = sorted(glob(ue_pattern))
all_slice_level_csv_files = sorted(glob(slice_pattern))

# 4️⃣ 列印結果
print(f"找到了 {len(all_bs_level_csv_files)} 個基地台層級 CSV：")
for f in all_bs_level_csv_files:
    print("  ", f)

print(f"\n找到了 {len(all_ue_level_csv_files)} 個 UE 層級 CSV：")
for f in all_ue_level_csv_files:
    print("  ", f)

print(f"\n找到了 {len(all_slice_level_csv_files)} 個切片層級 CSV：")
for f in all_slice_level_csv_files:
    print("  ", f)


[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-dataset/rome_static_medium/sched2/tr12/exp1/bs6/slices_bs6/1010123456042_metrics.csv
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-dataset/rome_static_medium/sched2/tr12/exp1/bs7/slices_bs7/1010123456044_metrics.csv
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-dataset/rome_static_medium/sched2/tr12/exp1/bs7/slices_bs7/1010123456045_metrics.csv
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-dataset/rome_static_medium/sched2/tr12/exp1/bs7/slices_bs7/1010123456046_metrics.csv
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-dataset/rome_static_medium/sched2/tr12/exp1/bs7/slices_bs7/1010123456047_metrics.csv
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-dataset/rome_static_medium/sched2/tr12/exp1/bs7/slices_bs7/1010123456048_metrics.csv
   /content/drive/MyDrive/FRL_Slicing_Sim/colosseum-oran-coloran-

## 儲存格 5：定義處理三種 CSV 的函式

In [7]:
# @title
print("\n--- Cell 5: 定義 CSV 處理函式 ---")

import os
import re
import pandas as pd

def parse_path_common_info(file_path, sched_map, bs_ids_readme_list):
    parts = os.path.normpath(file_path).split(os.sep)
    info = {}
    try:
        if "slices_bs" in file_path:
            info['file_type_parsed'] = "slice_metrics"
            bs_name, exp, tr, sched = parts[-3], parts[-4], parts[-5], parts[-6]
        elif os.path.basename(file_path).startswith("ue") and file_path.endswith(".csv"):
            info['file_type_parsed'] = "ue_metrics"
            bs_name, exp, tr, sched = parts[-2], parts[-3], parts[-4], parts[-5]
        elif os.path.basename(file_path).startswith("bs") and file_path.endswith(".csv"):
            info['file_type_parsed'] = "bs_metrics"
            bs_name, exp, tr, sched = parts[-2], parts[-3], parts[-4], parts[-5]
        else:
            return {}
        bs_id = int(bs_name.replace("bs", ""))
        if bs_id not in bs_ids_readme_list:
            print(f"  警告：BS_ID {bs_id} 不在 README ({file_path})")
        info.update({
            'BS_ID': bs_id,
            'exp_id': exp,
            'Training_Config_ID': tr,
            'Scheduling_Policy_Active': sched_map.get(sched, "UnknownSched"),
            'file_path_source': file_path
        })
    except Exception:
        return {}
    return info

def process_bs_level_csv(csv_file_path, common_path_info, training_config_map):
    try:
        df = pd.read_csv(csv_file_path, dtype={'time': str})
    except Exception as e:
        print(f"    錯誤讀取 {csv_file_path}: {e}")
        return pd.DataFrame()
    # 加入 metadata & RBG
    for k, v in common_path_info.items(): df[k] = v
    for rk, rv in training_config_map.get(common_path_info['Training_Config_ID'], {}).items():
        df[rk] = rv
    # time → timestamp
    if 'time' in df:
        df['timestamp'] = pd.to_datetime(df['time'].astype(float), unit='ms', errors='coerce')
        df.drop(columns=['time'], inplace=True)
    # 重新命名欄位
    return df.rename(columns={
        'nof_ue': 'BS_Total_UEs',
        'dl_brate': 'BS_Total_DL_Brate_kbps',
        'ul_brate': 'BS_Total_UL_Brate_kbps'
    })

def process_ue_level_csv(csv_file_path, common_path_info, training_config_map, first_abs_timestamps_map):
    try:
        df = pd.read_csv(csv_file_path, dtype={'time': str})
    except Exception as e:
        print(f"    錯誤讀取 {csv_file_path}: {e}")
        return pd.DataFrame()
    # metadata
    for k, v in common_path_info.items(): df[k] = v
    m = re.search(r'ue(\d+)\.csv$', os.path.basename(csv_file_path))
    df['UE_Index_Filename'] = int(m.group(1)) if m else pd.NA
    # RBG
    for rk, rv in training_config_map.get(common_path_info['Training_Config_ID'], {}).items():
        df[rk] = rv
    # 相對 time → 絕對 timestamp
    if 'time' in df:
        rel = df['time'].astype(float)
        key = (common_path_info['BS_ID'], common_path_info['exp_id'],
               common_path_info['Training_Config_ID'], common_path_info['Scheduling_Policy_Active'])
        start = first_abs_timestamps_map.get(key)
        df['timestamp'] = (start + pd.to_timedelta(rel, unit='ms')
                           if start is not None else pd.NaT)
        df.drop(columns=['time'], inplace=True)
    # 後綴 _ue
    meta_keys = list(common_path_info.keys()) + ['UE_Index_Filename','timestamp']
    rename_map = {c: f"{c}_ue" for c in df.columns if c not in meta_keys}
    df.rename(columns=rename_map, inplace=True)
    # kbps→Mbps & buffer
    if 'dl_brate_ue' in df: df['dl_brate_ue'] /= 1000
    if 'ul_brate_ue' in df: df['ul_brate_ue'] /= 1000
    if 'ul_buff_ue' in df: df.rename(columns={'ul_buff_ue':'Buffer_UL_bytes_ue'}, inplace=True)
    return df

def process_slice_level_csv(csv_file_path, common_path_info, ue_info_readme_map, training_config_map):
    try:
        df = pd.read_csv(csv_file_path)
    except Exception as e:
        print(f"    錯誤讀取 {csv_file_path}: {e}")
        return pd.DataFrame()
    # 移除 Unnamed
    df.drop(columns=[c for c in df if 'Unnamed' in c], inplace=True)
    # metadata
    for k, v in common_path_info.items(): df[k] = v
    # Timestamp → timestamp
    if 'Timestamp' in df:
        df['timestamp'] = pd.to_datetime(df['Timestamp'].astype(float), unit='ms', errors='coerce')
        df.drop(columns=['Timestamp'], inplace=True)
    # 重命名 ID 欄位
    df.rename(columns={'RNTI':'UE_RNTI','IMSI':'UE_IMSI','slice_id':'Slice_ID'}, inplace=True)
    # traffic type
    df['Traffic_Type'] = df['Slice_ID'].map({0:'eMBB',1:'MTC',2:'URLLC'}).fillna(pd.NA)
    # RBG
    for rk, rv in training_config_map.get(common_path_info['Training_Config_ID'], {}).items():
        df[rk] = rv
    # 重命名量測欄
    df.rename(columns={
        'tx_brate downlink [Mbps]':'Throughput_DL_Mbps',
        'rx_brate uplink [Mbps]':'Throughput_UL_Mbps',
        'dl_buffer [bytes]':'Buffer_DL_bytes',
        'ul_buffer [bytes]':'Buffer_UL_bytes',
        'slice_prb':'Slice_PRB_Allocated',
        'sum_granted_prbs':'PRB_Usage_DL',
        'dl_mcs':'UE_DL_MCS', 'ul_mcs':'UE_UL_MCS',
        'dl_cqi':'UE_DL_CQI', 'ul_sinr':'UE_UL_SINR',
        'phr':'UE_PHR',
        'tx_errors downlink (%)':'Packet_Loss_DL_pct',
        'rx_errors uplink (%)':'Packet_Loss_UL_pct'
    }, inplace=True)
    # PRB 利用率
    if 'sum_requested_prbs' in df and 'PRB_Usage_DL' in df:
        df['PRB_Utilization'] = (df['PRB_Usage_DL'] / df['sum_requested_prbs'])\
            .where(df['sum_requested_prbs']>0, pd.NA)
    return df

print("CSV 處理函式定義完成。")



--- Cell 5: 定義 CSV 處理函式 ---
CSV 處理函式定義完成。


## 儲存格 6：迭代讀取所有 CSV 並初步整理

In [8]:
# @title
# === 儲存格 6：逐檔處理並增量寫入 Parquet，降低記憶體佔用 ===
import os, glob, pandas as pd, pyarrow as pa
import pyarrow.parquet as pq
from concurrent.futures import ProcessPoolExecutor, as_completed

print("\n--- Cell 6: 優化版：逐檔處理並增量寫入 Parquet ---")

# 1️⃣ 先取出所有檔案清單（同你原本的寫法）
bs_dirs = glob.glob(os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*"))
bs_ids_from_readme = sorted({int(os.path.basename(d).replace("bs", "")) for d in bs_dirs})

all_bs_level_csv_files    = []
all_ue_level_csv_files    = []
all_slice_level_csv_files = []

for sched_path in glob.glob(os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*")):
    for tr_path in glob.glob(os.path.join(sched_path, "tr*")):
        for exp_path in glob.glob(os.path.join(tr_path, "exp*")):
            for bs_path in glob.glob(os.path.join(exp_path, "bs*")):
                bs_name = os.path.basename(bs_path)
                csv_bs = os.path.join(bs_path, f"{bs_name}.csv")
                if os.path.isfile(csv_bs):
                    all_bs_level_csv_files.append(csv_bs)
                all_ue_level_csv_files.extend(glob.glob(os.path.join(bs_path, "ue*.csv")))
                slice_dir = os.path.join(bs_path, f"slices_{bs_name}")
                all_slice_level_csv_files.extend(glob.glob(os.path.join(slice_dir, "*_metrics.csv")))

print(f"  ▸ BS-level  CSV：{len(all_bs_level_csv_files)}")
print(f"  ▸ UE-level  CSV：{len(all_ue_level_csv_files)}")
print(f"  ▸ Slice-level CSV：{len(all_slice_level_csv_files)}")

# 2️⃣ 計算每組 (BS,exp,tr,sched) 的起始 Timestamp（可保留並行：只有 Timestamp 欄，記憶體佔用低）
first_slice_metric_timestamps = {}

def _scan_min_timestamp(path):
    try:
        df = pd.read_csv(
            path,
            usecols=['Timestamp'],
            dtype={'Timestamp': 'Int64'},
            engine='python'
        )
        if df.empty: return None
        min_ts = pd.to_datetime(df['Timestamp'], unit='ms', errors='coerce').min()
        if pd.isna(min_ts): return None
        info = parse_path_common_info(path, scheduling_policy_map, bs_ids_from_readme)
        return ( (info['BS_ID'], info['exp_id'], info['Training_Config_ID'], info['Scheduling_Policy_Active']),
                 min_ts )
    except:
        return None

print("▶ 掃描 slice-level CSV 的起始 Timestamp …")
with ProcessPoolExecutor(max_workers=2) as ex:
    futures = [ex.submit(_scan_min_timestamp, p) for p in all_slice_level_csv_files]
    for fut in as_completed(futures):
        res = fut.result()
        if not res: continue
        key, ts = res
        prev = first_slice_metric_timestamps.get(key)
        if prev is None or ts < prev:
            first_slice_metric_timestamps[key] = ts

print(f"  已記錄 {len(first_slice_metric_timestamps)} 組起始 Timestamp")

# 3️⃣ 準備 ParquetWriter，一邊寫入一邊清釋記憶體
os.makedirs(os.path.dirname(OUTPUT_PARQUET_PATH), exist_ok=True)
_parquet_writer = None

def _process_and_append(path, kind):
    global _parquet_writer
    info = parse_path_common_info(path, scheduling_policy_map, bs_ids_from_readme)
    if not info:
        return
    # 依 kind 讀取最小欄位
    if kind == 'bs':
        usecols = ['time','nof_ue','dl_brate','ul_brate']
        df = pd.read_csv(path, usecols=usecols, dtype={'time': 'Int64'}, engine='python')
        # 你的 process_bs_level_csv 內部邏輯：把 time 轉 timestamp、加 metadata、rename
        df['timestamp'] = pd.to_datetime(df['time'], unit='ms', errors='coerce')
        df.drop(columns=['time'], inplace=True)
        df.rename(columns={
            'nof_ue':'BS_Total_UEs',
            'dl_brate':'BS_Total_DL_Brate_kbps',
            'ul_brate':'BS_Total_UL_Brate_kbps'
        }, inplace=True)
        for k,v in info.items(): df[k] = v
        rbg = training_configs_coloran.get(info['Training_Config_ID'],{})
        for rk,rv in rbg.items(): df[rk] = rv

    elif kind == 'slice':
        # 只讀必要欄位：Timestamp, RNTI, IMSI, slice_id，還有 metrics
        slice_cols = ['Timestamp','RNTI','IMSI','slice_id'] + COLS_SLICE_METRICS_CSV_EXPECTED_CLEAN
        df = pd.read_csv(path, usecols=slice_cols, dtype={'Timestamp':'Int64'}, engine='python')
        df['timestamp'] = pd.to_datetime(df['Timestamp'], unit='ms', errors='coerce')
        df.drop(columns=['Timestamp'], inplace=True)
        df.rename(columns={'slice_id':'Slice_ID'}, inplace=True)
        for k,v in info.items(): df[k] = v
        rbg = training_configs_coloran.get(info['Training_Config_ID'],{})
        for rk,rv in rbg.items(): df[rk] = rv

    else:  # kind == 'ue'
        usecols = ['time']  # 你可以選擇只讀 time；其餘若真的要，也可加
        df = pd.read_csv(path, usecols=usecols, dtype={'time':'Int64'}, engine='python')
        # 將 ue-level 放到單獨檔案或略過，以節省記憶體
        return

    # append to Parquet
    table = pa.Table.from_pandas(df, preserve_index=False)
    if _parquet_writer is None:
        _parquet_writer = pq.ParquetWriter(OUTPUT_PARQUET_PATH, table.schema)
    _parquet_writer.write_table(table)
    # 立刻釋放
    del df, table

print(f"▶ 逐檔處理並追加到 {OUTPUT_PARQUET_PATH} …")
with ProcessPoolExecutor(max_workers=2) as ex:
    # 只處理 bs-level & slice-level
    jobs = []
    for p in all_bs_level_csv_files:
        jobs.append(ex.submit(_process_and_append, p, 'bs'))
    for p in all_slice_level_csv_files:
        jobs.append(ex.submit(_process_and_append, p, 'slice'))
    for fut in as_completed(jobs):
        pass

# close writer
if _parquet_writer:
    _parquet_writer.close()

print("✅ Cell 6 完成 —— Parquet 已增量產出，過程中記憶體佔用大幅降低。")



--- Cell 6: 優化版：逐檔處理並增量寫入 Parquet ---
  ▸ BS-level  CSV：3080
  ▸ UE-level  CSV：18473
  ▸ Slice-level CSV：18329
▶ 掃描 slice-level CSV 的起始 Timestamp …
  已記錄 3080 組起始 Timestamp
▶ 逐檔處理並追加到 /content/drive/MyDrive/201FL研究/kpi_traces.parquet …
✅ Cell 6 完成 —— Parquet 已增量產出，過程中記憶體佔用大幅降低。


## Cell 7

In [35]:
# === Cell 6: 生成並保存 Parquet 檔案（分開 BS & Slice） ===
import os
import glob
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from concurrent.futures import ProcessPoolExecutor, as_completed

print("\n--- Cell 6: 生成並保存 Parquet 檔案（分開 BS & Slice） ---")

# 1️⃣ 檔案清單
bs_dirs = glob.glob(os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*"))
bs_ids_from_readme = sorted({int(os.path.basename(d).replace("bs", "")) for d in bs_dirs})
all_bs_level_csv = []
all_slice_level_csv = []
for sched in glob.glob(os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*")):
    for tr in glob.glob(os.path.join(sched, "tr*")):
        for exp in glob.glob(os.path.join(tr, "exp*")):
            for bs in glob.glob(os.path.join(exp, "bs*")):
                name = os.path.basename(bs)
                path_bs = os.path.join(bs, f"{name}.csv")
                if os.path.isfile(path_bs):
                    all_bs_level_csv.append(path_bs)
                slice_dir = os.path.join(bs, f"slices_{name}")
                all_slice_level_csv.extend(glob.glob(os.path.join(slice_dir, "*_metrics.csv")))
print(f"  ▸ BS-level CSV：{len(all_bs_level_csv)}")
print(f"  ▸ Slice-level CSV：{len(all_slice_level_csv)}")

# 2️⃣ 處理並保存 BS-level 整張 Parquet
bs_dfs = []
for f in all_bs_level_csv:
    info = parse_path_common_info(f, scheduling_policy_map, bs_ids_from_readme)
    if not info:
        continue
    df = process_bs_level_csv(f, info, training_configs_coloran)
    if not df.empty:
        bs_dfs.append(df)
if bs_dfs:
    df_bs_all = pd.concat(bs_dfs, ignore_index=True)
    bs_parquet = OUTPUT_PARQUET_PATH.replace('.parquet','_bs.parquet')
    print(f"Saving BS-level Parquet to {bs_parquet} ...")
    df_bs_all.to_parquet(bs_parquet, index=False, engine='pyarrow')
    print("✅ BS-level Parquet 完成！")
else:
    print("⚠️ 沒有 BS-level 資料可保存。")

# 3️⃣ 逐檔處理並增量寫入 Slice-level Parquet（Memory friendly）
slice_parquet = OUTPUT_PARQUET_PATH.replace('.parquet','_slice.parquet')
# 先取第一份來建立 schema
first_schema = None
with ProcessPoolExecutor(max_workers=1) as ex:
    # 只取第一個文件來推斷 schema
    sample = all_slice_level_csv[0] if all_slice_level_csv else None
    if sample:
        info = parse_path_common_info(sample, scheduling_policy_map, bs_ids_from_readme)
        df_sample = process_slice_level_csv(sample, info, ue_info_coloran, training_configs_coloran)
        if 'Timestamp' in df_sample.columns:
            df_sample.rename(columns={'Timestamp':'timestamp'}, inplace=True)
        df_sample['timestamp'] = pd.to_datetime(df_sample['timestamp'], unit='ms', errors='coerce')
        # 確保所有欲存欄位存在
        cols = ['BS_ID','exp_id','Training_Config_ID','Scheduling_Policy_Active','timestamp'] + COLS_SLICE_METRICS_CSV_EXPECTED_CLEAN
        for c in cols:
            if c not in df_sample: df_sample[c] = pd.NA
        df_sample = df_sample[cols]
        first_schema = pa.Table.from_pandas(df_sample, preserve_index=False).schema

# 初始化 ParquetWriter
if first_schema is None:
    raise RuntimeError("無 Slice-level 範例可用於建立 Parquet schema。")
writer = pq.ParquetWriter(slice_parquet, first_schema, compression='snappy')

print(f"▶ Incremental write to {slice_parquet} …")
for f in all_slice_level_csv:
    info = parse_path_common_info(f, scheduling_policy_map, bs_ids_from_readme)
    if not info:
        continue
    df = process_slice_level_csv(f, info, ue_info_coloran, training_configs_coloran)
    if df.empty:
        continue
    # timestamp
    if 'Timestamp' in df.columns:
        df.rename(columns={'Timestamp':'timestamp'}, inplace=True)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce')
    # 補全欄位並排序
    cols = ['BS_ID','exp_id','Training_Config_ID','Scheduling_Policy_Active','timestamp'] + COLS_SLICE_METRICS_CSV_EXPECTED_CLEAN
    for c in cols:
        if c not in df:
            df[c] = pd.NA
    df = df[cols]
    # 寫入
    tbl = pa.Table.from_pandas(df, schema=first_schema, preserve_index=False)
    writer.write_table(tbl)
writer.close()
print("✅ Slice-level Parquet 完成！")



--- Cell 6: 生成並保存 Parquet 檔案（分開 BS & Slice） ---
  ▸ BS-level CSV：3080
  ▸ Slice-level CSV：18329
Saving BS-level Parquet to /content/drive/MyDrive/201FL研究/kpi_traces_bs.parquet ...
✅ BS-level Parquet 完成！
▶ Incremental write to /content/drive/MyDrive/201FL研究/kpi_traces_slice.parquet …
✅ Slice-level Parquet 完成！


## 分割線

In [2]:
# @title
# === Cell 7: 數據準備 - 從 CSV 讀取、清理並生成中間 Parquet 檔案 ===
import os
import pandas as pd
import numpy as np
import re
import glob
from tqdm.notebook import tqdm

print("\n--- Cell 7: 數據準備流程啟動 ---")

# --- STAGE 1: 參數與路徑定義 ---
print("[1/4] 正在定義參數與路徑...")
# 確保這些路徑與您之前的設定一致
WORK_DIR = '/content/drive/MyDrive/FRL_Slicing_Sim'
DATASET_LOCAL_NAME = 'colosseum-oran-coloran-dataset'
DATASET_DIR = os.path.join(WORK_DIR, DATASET_LOCAL_NAME)
ROME_STATIC_MEDIUM_DIR = os.path.join(DATASET_DIR, 'rome_static_medium')
# 輸出中間檔案的路徑
BS_PARQUET_PATH = os.path.join(WORK_DIR, 'kpi_traces_bs.parquet')
SLICE_PARQUET_PATH = os.path.join(WORK_DIR, 'kpi_traces_slice.parquet')

# 從之前儲存格來的配置
scheduling_policy_map = {'sched0': 'RR', 'sched1': 'WF', 'sched2': 'PF'}

# --- STAGE 2: 探索檔案 ---
print("\n[2/4] 正在探索所有相關的 CSV 檔案路徑...")
all_slice_level_csv_files = sorted(glob.glob(os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*/slices_bs*/*_metrics.csv")))
all_bs_level_csv_files = sorted(glob.glob(os.path.join(ROME_STATIC_MEDIUM_DIR, "sched*/tr*/exp*/bs*/bs*.csv")))
print(f"      找到 {len(all_slice_level_csv_files)} 個 slice-level CSV 和 {len(all_bs_level_csv_files)} 個 bs-level CSV。")

# --- STAGE 3: 處理與轉換 ---
print("\n[3/4] 正在處理所有 CSV 檔案...")
processed_bs_dfs = []
processed_slice_dfs = []

# 解析路徑以獲取元數據的輔助函式
def parse_path_info(file_path):
    try:
        match = re.search(r'/(sched\d+)/(tr\d+)/(exp\d+)/(bs\d+)/', file_path)
        if match:
            sched, tr, exp, bs = match.groups()
            return {
                'Scheduling_Policy_Active': scheduling_policy_map.get(sched, 'Unknown'),
                'Training_Config_ID': tr,
                'exp_id': exp,
                'BS_ID': int(bs.replace('bs', ''))
            }
    except (ValueError, TypeError):
        return {}
    return {}

# 處理 BS-Level CSV
for file in tqdm(all_bs_level_csv_files, desc="Processing BS files"):
    info = parse_path_info(file)
    if not info: continue
    try:
        df = pd.read_csv(file)
        df.rename(columns={'time': 'timestamp'}, inplace=True)
        for k, v in info.items():
            df[k] = v
        processed_bs_dfs.append(df)
    except Exception:
        continue

# 處理 Slice-Level CSV
for file in tqdm(all_slice_level_csv_files, desc="Processing Slice files"):
    info = parse_path_info(file)
    if not info: continue
    try:
        df = pd.read_csv(file, low_memory=False)
        # 清理因 `,,` 產生的空欄位
        df.drop(columns=[col for col in df.columns if 'Unnamed:' in str(col)], inplace=True)
        df.rename(columns={'Timestamp': 'timestamp', 'slice_id': 'Slice_ID'}, inplace=True)
        for k, v in info.items():
            df[k] = v
        processed_slice_dfs.append(df)
    except Exception:
        continue

# --- STAGE 4: 合併與儲存中間檔案 ---
print("\n[4/4] 正在合併並儲存中間 Parquet 檔案...")
if processed_bs_dfs:
    df_bs_all = pd.concat(processed_bs_dfs, ignore_index=True)
    df_bs_all.to_parquet(BS_PARQUET_PATH, index=False)
    print(f"      ✅ BS-level 中間檔案已儲存至: {BS_PARQUET_PATH} (Shape: {df_bs_all.shape})")
else:
    print("      ⚠️ 未能處理任何 BS-level 檔案。")

if processed_slice_dfs:
    df_slice_all = pd.concat(processed_slice_dfs, ignore_index=True)
    df_slice_all.to_parquet(SLICE_PARQUET_PATH, index=False)
    print(f"      ✅ Slice-level 中間檔案已儲存至: {SLICE_PARQUET_PATH} (Shape: {df_slice_all.shape})")
else:
    print("      ⚠️ 未能處理任何 Slice-level 檔案。")

print("\n--- Cell 7 數據準備完成 ---")


--- Cell 7: 數據準備流程 ---
[1/4] 正在定義參數與路徑...

[2/4] 正在探索所有 CSV 檔案路徑...
      找到 18329 個 slice-level CSV 和 3080 個 bs-level CSV。

[3/4] 正在處理所有 CSV 檔案...


Processing BS files:   0%|          | 0/3080 [00:00<?, ?it/s]

Processing Slice files:   0%|          | 0/18329 [00:00<?, ?it/s]


[4/4] 正在合併並儲存中間 Parquet 檔案...
      ✅ BS-level 中間檔案已儲存: /content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_bs.parquet (Shape: (6534544, 8))
      ✅ Slice-level 中間檔案已儲存: /content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_slice.parquet (Shape: (35512393, 35))

--- Cell 7 數據準備完成 ---


In [2]:
# @title
# === 最終解決方案：分組執行 As-Of Merge ===
import pandas as pd
from tqdm.notebook import tqdm

print("\n--- 最終解決方案：啟動分組合併流程 ---")

# 1. 檔案路徑定義
BS_PARQUET_PATH = '/content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_bs.parquet'
SLICE_PARQUET_PATH = '/content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_slice.parquet'
FINAL_OUTPUT_PATH = '/content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_final_v_robust.parquet'

try:
    # --- STAGE 1: 載入並執行所有準備工作 ---
    print("\n[1/4] 正在載入並執行所有數據準備步驟...")

    # 載入數據
    df_bs = pd.read_parquet(BS_PARQUET_PATH)
    df_slice = pd.read_parquet(SLICE_PARQUET_PATH)

    # 定義合併鍵
    by_keys = ['BS_ID', 'exp_id', 'Training_Config_ID', 'Scheduling_Policy_Active']
    on_key = 'timestamp'
    all_sort_keys = by_keys + [on_key]

    # 執行類型轉換、清理、排序、去重
    for df in [df_bs, df_slice]:
        df[on_key] = pd.to_datetime(df[on_key], unit='ms', errors='coerce')
        if not pd.api.types.is_numeric_dtype(df['exp_id']):
            df['exp_id'] = pd.to_numeric(df['exp_id'].str.replace('exp', ''), errors='coerce')
        if not pd.api.types.is_numeric_dtype(df['Training_Config_ID']):
            df['Training_Config_ID'] = pd.to_numeric(df['Training_Config_ID'].str.replace('tr', ''), errors='coerce')

        df.dropna(subset=all_sort_keys, inplace=True)

        for col in ['BS_ID', 'exp_id', 'Training_Config_ID']:
            df[col] = df[col].astype(int)

        df.sort_values(by=all_sort_keys, inplace=True)
        df.drop_duplicates(subset=all_sort_keys, keep='first', inplace=True)

    print("      數據準備完成。")

    # --- STAGE 2: 創建用於快速查找的索引 ---
    print("\n[2/4] 正在為右表(df_bs)創建多重索引以便快速查找...")
    df_bs_indexed = df_bs.set_index(by_keys)
    print("      索引創建完成。")

    # --- STAGE 3: 分組執行 merge_asof (核心 workaround) ---
    print("\n[3/4] 正在遍歷各個數據組，逐一執行合併...")

    all_merged_groups = []
    # 按照 by_keys 對左表進行分組
    grouped_slice = df_slice.groupby(by_keys)

    for group_name, left_group in tqdm(grouped_slice, total=len(grouped_slice)):
        try:
            # 從已建立索引的右表中，快速找到對應的組
            # 使用 .get() 避免因找不到鍵而引發的 KeyError
            right_group = df_bs_indexed.loc[group_name]

            # 對這個小組執行 merge_asof
            merged_group = pd.merge_asof(
                left=left_group.reset_index(drop=True),
                right=right_group.reset_index(drop=True),
                on='timestamp',
                # 此處不再需要 'by'，因為我們已經在一個組內了
                direction='nearest',
                tolerance=pd.Timedelta('250ms'),
                suffixes=('', '_bs')
            )
            all_merged_groups.append(merged_group)

        except KeyError:
            # 如果在右表中找不到對應的組，就跳過這個左表組
            # print(f"      提示：在 df_bs 中找不到組 {group_name}，已跳過。")
            all_merged_groups.append(left_group) # 或者保留原始的左表組
            continue
        except Exception as e:
            print(f"      ❌ 在處理組 {group_name} 時發生錯誤: {e}")
            # 如果某個特定組出錯，我們就跳過它，並繼續處理下一個
            continue

    # --- STAGE 4: 合併所有結果並儲存 ---
    print("\n[4/4] 正在將所有合併後的分組結果拼接起來...")
    if all_merged_groups:
        final_df = pd.concat(all_merged_groups, ignore_index=True)

        print(f"      合併完成！最終 DataFrame shape: {final_df.shape}")
        print(f"      正在將結果儲存至: {FINAL_OUTPUT_PATH}")
        final_df.to_parquet(FINAL_OUTPUT_PATH, index=False, engine='pyarrow')
        print("✅✅✅ 流程成功結束！ ✅✅✅")
        display(final_df.head())
    else:
        print("⚠️ 未能成功合併任何數據組。")

except Exception as e:
    print(f"\n處理過程中發生致命錯誤: {e}")
    import traceback
    traceback.print_exc()


--- 最終解決方案：啟動分組合併流程 ---

[1/4] 正在載入並執行所有數據準備步驟...
      數據準備完成。

[2/4] 正在為右表(df_bs)創建多重索引以便快速查找...
      索引創建完成。

[3/4] 正在遍歷各個數據組，逐一執行合併...


  0%|          | 0/3080 [00:00<?, ?it/s]


[4/4] 正在將所有合併後的分組結果拼接起來...
      合併完成！最終 DataFrame shape: (6183234, 38)
      正在將結果儲存至: /content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_final_v_robust.parquet
✅✅✅ 流程成功結束！ ✅✅✅


Unnamed: 0,timestamp,num_ues,UE_IMSI,UE_RNTI,slicing_enabled,Slice_ID,slice_prb,power_multiplier,scheduling_policy,dl_mcs,...,dl_ri,ul_n,ul_turbo_iters,Scheduling_Policy_Active,Training_Config_ID,exp_id,BS_ID,BS_Total_UEs,BS_Total_DL_Brate_kbps,BS_Total_UL_Brate_kbps
0,2021-02-17 21:34:29.447,1,1010123456002,70,1,2,5,1,2,3.17647,...,0,0,1.0,PF,0,1,1,1.0,9536.0,113376.0
1,2021-02-17 21:34:29.697,1,1010123456002,70,1,2,5,1,2,0.0,...,0,0,1.0,PF,0,1,1,1.0,2176.0,37856.0
2,2021-02-17 21:34:29.981,2,1010123456002,70,1,2,5,1,2,0.0,...,0,0,0.0,PF,0,1,1,2.0,0.0,1166.666626
3,2021-02-17 21:34:30.197,4,1010123456002,70,1,2,5,1,2,0.0,...,0,0,0.0,PF,0,1,1,4.0,3047.619141,989.0
4,2021-02-17 21:34:30.447,4,1010123456002,70,1,2,5,1,2,0.0,...,0,0,0.0,PF,0,1,1,4.0,0.0,0.0


## 下方區段用於測試

In [None]:
# @title
# === 最終診斷儲存格：對 df_slice 進行法醫分析 ===
import pandas as pd
import numpy as np

print("--- 法醫分析啟動：正在深入檢查 df_slice ---")

# 1. 檔案路徑定義 (請確保與您儲存的檔案路徑一致)
SLICE_PARQUET_PATH = '/content/drive/MyDrive/FRL_Slicing_Sim/kpi_traces_slice.parquet'
by_keys = ['BS_ID', 'exp_id', 'Training_Config_ID', 'Scheduling_Policy_Active']
on_key = 'timestamp'
all_sort_keys = by_keys + [on_key]

try:
    # --- STAGE 1: 載入並執行所有已知的轉換 ---
    print("\n[1/4] 正在載入並重新執行所有轉換步驟...")
    df_slice = pd.read_parquet(SLICE_PARQUET_PATH)

    # 執行類型轉換
    df_slice[on_key] = pd.to_datetime(df_slice[on_key], unit='ms', errors='coerce')
    if not pd.api.types.is_numeric_dtype(df_slice['exp_id']):
        df_slice['exp_id'] = pd.to_numeric(df_slice['exp_id'].str.replace('exp', ''), errors='coerce')
    if not pd.api.types.is_numeric_dtype(df_slice['Training_Config_ID']):
        df_slice['Training_Config_ID'] = pd.to_numeric(df_slice['Training_Config_ID'].str.replace('tr', ''), errors='coerce')

    # 移除NA並設定最終類型
    df_slice.dropna(subset=all_sort_keys, inplace=True)
    df_slice['BS_ID'] = df_slice['BS_ID'].astype(np.int64)
    df_slice['exp_id'] = df_slice['exp_id'].astype(np.int64)
    df_slice['Training_Config_ID'] = df_slice['Training_Config_ID'].astype(np.int64)
    df_slice['Scheduling_Policy_Active'] = df_slice['Scheduling_Policy_Active'].astype('category')

    # 執行排序與去重
    df_slice.sort_values(by=all_sort_keys, inplace=True)
    df_slice.drop_duplicates(subset=all_sort_keys, keep='first', inplace=True)
    df_slice.reset_index(drop=True, inplace=True)

    print("      所有已知的轉換與排序步驟已完成。")

    # --- STAGE 2: 深度類型與NA檢查 ---
    print("\n[2/4] 正在進行深度類型與NA檢查...")
    print("      排序鍵的最終資料類型：")
    df_slice[all_sort_keys].info()
    na_sum = df_slice[all_sort_keys].isna().sum()
    if na_sum.sum() > 0:
        print("\n      ❌ 警告：即使在 dropna 之後，排序鍵中仍存在 NA 值！")
        print(na_sum)
    else:
        print("\n      ✅ 排序鍵中無 NA 值。")

    # --- STAGE 3: 手動逐行排序驗證 ---
    print("\n[3/4] 正在手動逐行驗證 'by' 鍵的全局排序...")

    # 將 'by' 鍵轉換為一個元組列表，以便進行比較
    key_tuples = [tuple(x) for x in df_slice[by_keys].to_numpy()]

    is_by_sorted = True
    first_error_index = -1

    # 遍歷元組，比較相鄰的兩個
    for i in range(1, len(key_tuples)):
        if key_tuples[i-1] > key_tuples[i]:
            is_by_sorted = False
            first_error_index = i
            break

    if not is_by_sorted:
        print(f"      ❌ 致命錯誤：'by' 鍵的全局排序在索引 {first_error_index} 處被破壞！")
        print("      這表示即使在 sort_values 之後，數據的順序仍然是錯誤的。")
        print("      前一行的鍵:", df_slice.iloc[first_error_index - 1][by_keys].to_dict())
        print("      出錯行的鍵:", df_slice.iloc[first_error_index][by_keys].to_dict())
    else:
        print("      ✅ 'by' 鍵已按字典序正確排序。")

    # --- STAGE 4: 組內時間戳單調性驗證 ---
    print("\n[4/4] 正在驗證組內 'timestamp' 的單調性...")
    if not is_by_sorted:
        print("      由於 'by' 鍵排序失敗，跳過此步驟。")
    else:
        # 使用 groupby().diff() 檢查時間戳是否總是增加
        ts_diffs = df_slice.groupby(by_keys, sort=False)['timestamp'].diff()

        # 負的時間差表示順序錯誤
        non_monotonic_rows = ts_diffs[ts_diffs < pd.Timedelta(0)]

        if not non_monotonic_rows.empty:
            error_idx = non_monotonic_rows.index[0]
            print(f"      ❌ 致命錯誤：在索引 {error_idx} 處發現時間戳不是單調遞增的！")
            print("      這直接違反了 merge_asof 的核心要求。")
            print("\n      --- 問題數據點的前後文 ---")
            display(df_slice.iloc[error_idx-2 : error_idx+2][all_sort_keys])
        else:
            print("      ✅ 組內 'timestamp' 已驗證為單調遞增。")

except Exception as e:
    print(f"\n診斷過程中發生錯誤: {e}")
    import traceback
    traceback.print_exc()

In [1]:
# @title
# === 步驟三：執行最終合併與儲存 ===

print("\n--- Cell 8: 最終合併流程 (函式庫升級後) ---")

# 檢查 Cell 7 的變數是否存在
if 'df_slice' not in locals() or 'df_bs' not in locals():
    print("錯誤：找不到 df_slice 或 df_bs。請確保您已重新執行了載入與準備數據的儲存格。")
else:
    try:
        # 再次確認排序，以防萬一
        by_keys = ['BS_ID', 'exp_id', 'Training_Config_ID', 'Scheduling_Policy_Active']
        on_key = 'timestamp'
        all_sort_keys = by_keys + [on_key]

        df_slice.sort_values(by=all_sort_keys, inplace=True)
        df_bs.sort_values(by=all_sort_keys, inplace=True)

        df_slice.drop_duplicates(subset=all_sort_keys, keep='first', inplace=True)
        df_bs.drop_duplicates(subset=all_sort_keys, keep='first', inplace=True)

        df_slice.reset_index(drop=True, inplace=True)
        df_bs.reset_index(drop=True, inplace=True)

        print("數據準備就緒，正在執行 merge_asof...")

        # 執行合併
        final_df = pd.merge_asof(
            left=df_slice,
            right=df_bs,
            on='timestamp',
            by=by_keys,
            direction='nearest',
            tolerance=pd.Timedelta('250ms'),
            suffixes=('', '_bs')
        )

        # 儲存
        FINAL_OUTPUT_PATH = '/content/drive/MyDrive/201FL研究/kpi_traces_final.parquet'
        final_df.to_parquet(FINAL_OUTPUT_PATH, index=False, engine='pyarrow')

        print("\n✅✅✅ 流程成功結束！ ✅✅✅")
        print(f"   最終合併檔案已儲存至: {FINAL_OUTPUT_PATH}")
        print(f"   最終 shape: {final_df.shape}")
        display(final_df.head())

    except Exception as e:
        print(f"\n處理過程中發生未預期的錯誤: {e}")
        import traceback
        traceback.print_exc()


--- Cell 8: 最終合併流程 (函式庫升級後) ---
錯誤：找不到 df_slice 或 df_bs。請確保您已重新執行了載入與準備數據的儲存格。
