In [4]:
import os
import xml.etree.ElementTree as ET
import numpy as np
import pandas as pd
import re
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsRegressor

folder_path = r'C:\Users\maggi\Downloads\ML\Weather' 
file_name = 'O-A0038-003.xml'
full_path = os.path.join(folder_path, file_name)

nx = 67
ny = 120

print("正在讀取 XML 資料...")
tree = ET.parse(full_path)
root = tree.getroot()
ns = 'urn:cwa:gov:tw:cwacommon:0.1'

content_text = None
for content in root.findall(f'.//{{{ns}}}Content'):
    if content.text and "," in content.text:
        content_text = content.text
        break
if content_text is None:
    raise ValueError("找不到有效的 <Content> 區塊!")

print("正在清理資料...")
clean_text = content_text.replace("\n", "").strip()
clean_text = re.sub(r'(E[+-]\d{2})(-)', r'\1,\2', clean_text)
numbers = [float(item) for item in clean_text.split(",") if item.strip() != ""]

if len(numbers) != nx * ny:
    raise ValueError(f"資料大小 ({len(numbers)}) 與網格 ({ny}x{nx}) 不符!")

print(f"資料清理完成，總數: {len(numbers)}")

values_grid = np.array(numbers).reshape(ny, nx)
lon_start, lat_start, step = 120.0, 21.88, 0.03
lon_list = [lon_start + i*step for i in range(nx)]
lat_list = [lat_start + i*step for i in range(ny)]
lon_grid, lat_grid = np.meshgrid(lon_list, lat_list)
df = pd.DataFrame({
    'longitude': lon_grid.flatten(),
    'latitude': lat_grid.flatten(),
    'value': values_grid.flatten()
})

print("正在準備分類和迴歸資料...")

df_class = df.copy()
df_class['label'] = (df_class['value'] != -999.0).astype(int)
X_class = df_class[['longitude', 'latitude']]
y_class = df_class['label']
Xc_train, Xc_test, yc_train, yc_test = train_test_split(X_class, y_class, test_size=0.3, random_state=42)


df_reg = df[df['value'] != -999.0].copy()
X_reg = df_reg[['longitude', 'latitude']]
y_reg = df_reg['value']
Xr_train, Xr_test, yr_train, yr_test = train_test_split(X_reg, y_reg, test_size=0.3, random_state=42)

print("資料準備完成。\n")


print("--- 第二題：組合迴歸模型 ---")
print("依賴項 (A): 定義 GDA Class (C(x))...")

class GDA:
    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.classes = np.unique(y)
        n_classes = len(self.classes)
        self.phi = np.zeros(n_classes)
        self.mu = np.zeros((n_classes, n_features))
        sigma = np.zeros((n_features, n_features))

        for idx, c in enumerate(self.classes):
            X_c = X[y == c]
            self.phi[idx] = X_c.shape[0] / n_samples
            self.mu[idx, :] = np.mean(X_c, axis=0)
            sigma += (X_c - self.mu[idx, :]).T @ (X_c - self.mu[idx, :])

        self.sigma = sigma / n_samples
        self.inv_sigma = np.linalg.inv(self.sigma)

    def predict(self, X):
        log_posteriors = []
        for idx, c in enumerate(self.classes):
            prior = np.log(self.phi[idx])
            likelihood_term = -0.5 * np.sum((X - self.mu[idx, :]) @ self.inv_sigma * (X - self.mu[idx, :]), axis=1)
            log_posteriors.append(prior + likelihood_term)
        
        predictions_idx = np.argmax(np.array(log_posteriors).T, axis=1)
        return self.classes[predictions_idx]

print("訓練 C(x) (GDA)...")
gda_model = GDA()
gda_model.fit(Xc_train.to_numpy(), yc_train.to_numpy())

print("依賴項 (B): 訓練 R(x) (KNeighborsRegressor)...")
knn_reg = KNeighborsRegressor(n_neighbors=5)
knn_reg.fit(Xr_train, yr_train) 
print("依賴模型訓練完成。\n")

print("組合函數 h(x)...")
def combined_model(X_input, classification_model, regression_model):
    """
    實作組合函數 h(x)
    h(x) = R(x)  如果 C(x) = 1
    h(x) = -999  如果 C(x) = 0
    """
    X_input_np = X_input.to_numpy() if isinstance(X_input, pd.DataFrame) else X_input
    
    
    class_preds = classification_model.predict(X_input_np)
    reg_preds = regression_model.predict(X_input_np)
    
    
    h_x_output = np.full(X_input_np.shape[0], -999.0)
    
    is_class_1 = (class_preds == 1)
    
    h_x_output[is_class_1] = reg_preds[is_class_1]
    
    return h_x_output

print("組合函數 h(x) 已定義。")


print(" 應用組合模型")


c_predictions = gda_model.predict(Xc_test.to_numpy())


r_predictions = knn_reg.predict(Xc_test)


h_predictions = combined_model(Xc_test, gda_model, knn_reg)


print("\n--- (d) 展示模型行為的強化表格 ---")

results_df = pd.DataFrame({
    'longitude': Xc_test['longitude'],
    'latitude': Xc_test['latitude'],
    'Actual_Label': yc_test.values,                 
    'C(x)_GDA_Pred': c_predictions,                 
    'R(x)_KNN_Pred': r_predictions,                 
    'h(x)_Final_Output': h_predictions              
})

print("\n【展示 1】: 當真實標籤 (Actual_Label) = 0 時 (無效點)")
print(results_df[results_df['Actual_Label'] == 0].head(10).round(2))
print("... (GDA 預測為 0，h(x) 輸出 -999，行為正確)")


print("\n【展示 2】: 當真實標籤 (Actual_Label) = 1 時 (有效點)")
print(results_df[results_df['Actual_Label'] == 1].head(10).round(2))


正在讀取 XML 資料...
正在清理資料...
資料清理完成，總數: 8040
正在準備分類和迴歸資料...
資料準備完成。

--- 第二題：組合迴歸模型 ---
依賴項 (A): 定義 GDA Class (C(x))...
訓練 C(x) (GDA)...
依賴項 (B): 訓練 R(x) (KNeighborsRegressor)...
依賴模型訓練完成。

組合函數 h(x)...
組合函數 h(x) 已定義。
 應用組合模型

--- (d) 展示模型行為的強化表格 ---

【展示 1】: 當真實標籤 (Actual_Label) = 0 時 (無效點)
      longitude  latitude  Actual_Label  C(x)_GDA_Pred  R(x)_KNN_Pred  \
737      120.00     22.21             0              0          26.10   
3334     121.53     23.35             0              0          23.94   
7386     120.48     25.18             0              0          27.28   
1373     120.99     22.48             0              0          27.04   
5628     120.00     24.40             0              0          26.40   
5574     120.39     24.37             0              0          26.92   
7688     121.50     25.30             0              0          26.08   
3467     121.50     23.41             0              0          22.30   
6255     120.72     24.67             0              0

