In [120]:
import os
import xgboost as xgb
import pandas as pd
import numpy as np

---

## auto encoder 模型

In [121]:
import glob
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from tensorflow import keras
from tensorflow.keras import layers

In [117]:
read_file_path = 'instance/on_system/ffill/PMDI_imputed_ffill_mean_on_system.csv'
df = pd.read_csv(read_file_path)

In [118]:
# --- 設定你要補的欄位 ---
cols = [
    'Air temperature',
    'Process temperature',
    'Rotational speed',
    'Torque',
    'Tool wear'
]

In [119]:
scaler = StandardScaler()
df_scaled = pd.DataFrame(
    scaler.fit_transform(df[cols]),
    columns=cols
)

train_data = df_scaled.values  # 補值過的完整資料

# 3. 建立 Autoencoder
input_dim = train_data.shape[1]
input_layer = keras.Input(shape=(input_dim,))
encoded = layers.Dense(8, activation='relu')(input_layer)
encoded = layers.Dense(4, activation='relu')(encoded)
decoded = layers.Dense(8, activation='relu')(encoded)
decoded = layers.Dense(input_dim)(decoded)

autoencoder = keras.Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer='adam', loss='mse')

# 4. 訓練 autoencoder
autoencoder.fit(train_data, train_data, epochs=100, batch_size=64, verbose=1)

Epoch 1/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 1.4346
Epoch 2/100
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 0.8933
Epoch 3/100
[1m147/157[0m [32m━━━━━━━━━━━━━━━━━━[0m[37m━━[0m [1m0s[0m 1ms/step - loss: 0.6414

KeyboardInterrupt: 

In [None]:
# 儲存模型
autoencoder.save('best_autoencoder_model.h5')

# 儲存 scaler
import joblib
joblib.dump(scaler, 'best_autoencoder_scaler.pkl')



['best_autoencoder_scaler.pkl']

## 補值

In [122]:
import tensorflow as tf
# def autoencoder_impute(df, cols, autoencoder, scaler):
#     # Step 1: 標準化
#     df_scaled = pd.DataFrame(
#         scaler.transform(df[cols]),
#         columns=cols
#     )

#     # Step 2: 補值
#     for i, row in df_scaled.iterrows():
#         if row.isna().any():
#             arr = row.values.astype(float)
#             nan_idx = np.isnan(arr)
#             if np.any(nan_idx):
#                 arr_filled = arr.copy()
#                 arr_filled[nan_idx] = 0
#                 arr_filled = arr_filled.reshape(1, -1)
#                 pred = autoencoder.predict(arr_filled, verbose=0)[0]
#                 arr[nan_idx] = pred[nan_idx]
#                 df_scaled.iloc[i] = arr

#     # Step 3: 反標準化
#     df_imputed = pd.DataFrame(
#         scaler.inverse_transform(df_scaled),
#         columns=cols
#     )

#     # Step 4: 把補好的欄位補回原始 df
#     df_out = df.copy()
#     for col in cols:
#         df_out[col] = np.where(df[col].isna(), df_imputed[col], df[col])

#     return df_out

def autoencoder_impute(sample, cols, generator, scaler):
    arr = scaler.transform(sample.values.reshape(1, -1))  # shape (1, D)
    arr = arr.astype(np.float32)

    nan_idx = np.isnan(arr[0])
    mask = (~nan_idx).astype(np.float32).reshape(1, -1)  # shape (1, D)

    arr_filled = arr.copy()
    arr_filled[0, nan_idx] = 0.0

    # ✅ 合併成 input (1, D*2)
    generator_input = np.concatenate([arr_filled, mask], axis=1)
    generator_input_tf = tf.convert_to_tensor(generator_input)

    # ✅ 補值
    x_tilde = generator(generator_input_tf).numpy()
    arr[0, nan_idx] = x_tilde[0, nan_idx]

    imputed = scaler.inverse_transform(arr)
    return pd.DataFrame(imputed, columns=cols)



---

## 預測

In [126]:
class GainGenerator(keras.Model):
    def __init__(self, input_dim=8, hidden_dim=64, **kwargs):  # 給 default 值 + **kwargs 接 Keras 系統參數
        super(GainGenerator, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.dense1 = layers.Dense(hidden_dim, activation='relu')
        self.dense2 = layers.Dense(hidden_dim, activation='relu')
        self.out_layer = layers.Dense(input_dim, activation=None)

    def call(self, inputs):
        h = self.dense1(inputs)
        h = self.dense2(h)
        return self.out_layer(h)

    def get_config(self):
        config = super().get_config()
        config.update({
            'input_dim': self.input_dim,
            'hidden_dim': self.hidden_dim
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)


In [129]:
from tensorflow.keras.models import load_model
import xgboost as xgb

# # Autoencoder 相關
# autoencoder = load_model("best_autoencoder_model.h5", compile=False)
# scaler = joblib.load("best_autoencoder_scaler.pkl")

# GAIN
generator = load_model("gain_generator_model_with_toolwear.keras", custom_objects={"GainGenerator": GainGenerator})
scaler = joblib.load("gain_scaler_with_toolwear.pkl")

# XGBoost 預測模型
model_bin = xgb.XGBClassifier()
model_bin.load_model("best_predictive_bin_model")
# model_bin.load_model("ffill_mean_bin_model")


model_multi = xgb.XGBClassifier()
model_multi.load_model("best_predictive_multi_model")
# model_multi.load_model("ffill_mean_multi_model")

In [130]:
# 所有模型預測時使用的欄位（要 match XGBoost 的訓練用欄位順序）
feature_cols = ['Air temperature', 'Rotational speed', 'Tool wear', 'Torque',
                 'Process temperature', 'Power', 'PowerWear', 'TempPerPower']

In [131]:
from exp_tools import *

In [132]:
# --- 設定你要補的欄位 ---
cols = [
    'Air temperature',
    'Process temperature',
    'Rotational speed',
    'Torque',
    'Tool wear'
]

In [133]:
def process_sample_raw(sample_raw):
    if sample_raw[cols].isna().any(axis=1).iloc[0]:
        imputed_df = autoencoder_impute(sample_raw[cols], cols, generator, scaler)
        sample_raw[cols] = imputed_df.values

    sample_raw["Power"] = sample_raw["Rotational speed"] * sample_raw["Torque"]
    sample_raw["Power"] = sample_raw["Power"].replace(0, 1e-6)
    sample_raw["PowerWear"] = sample_raw["Power"] * sample_raw["Tool wear"]
    sample_raw["TempPerPower"] = sample_raw["Process temperature"] / sample_raw["Power"]

    return sample_raw
    

In [None]:
def predict_label(sample):
    # 多分類標籤對應
    multi_class_labels = ['No Failure','HDF', 'PWF', 'OSF', 'TWF', 'RNF']

    # 預測欄位一致
    X = sample[feature_cols]

    # 預測
    y_pred_bin = model_bin.predict_proba(X)[0][1]
    y_pred_multi = model_multi.predict_proba(X)[0]

    # 整理成 DataFrame 顯示
    df_result = pd.DataFrame({
        "Class": multi_class_labels,
        "Probability": y_pred_multi
    })

    # 排序（選用）
    df_result = df_result.sort_values("Probability", ascending=False)

    return y_pred_bin, df_result

In [197]:
sample_raw = df.iloc[[0]].copy()
display(sample_raw[cols])

Unnamed: 0,Air temperature,Process temperature,Rotational speed,Torque,Tool wear
0,298.766667,309.104167,1542.510204,42.8,0.0


In [196]:
import numpy as np
pd.set_option("display.max_columns", None)

sample_raw = df.iloc[[50]].copy()

# sample_raw.loc[:, "Air temperature"] = np.nan
# sample_raw.loc[:, "Process temperature"] = np.nan
# sample_raw.loc[:, "Rotational speed"] = np.nan
# sample_raw.loc[:, "Torque"] = np.nan
# sample_raw.loc[:, "Tool wear"] = np.nan

display(sample_raw[cols])
processed_sample = process_sample_raw(sample_raw)
display(processed_sample[cols])

Unnamed: 0,Air temperature,Process temperature,Rotational speed,Torque,Tool wear
50,298.766667,309.104167,2861.0,4.6,135.0


Unnamed: 0,Air temperature,Process temperature,Rotational speed,Torque,Tool wear
50,298.766667,309.104167,2861.0,4.6,135.0


In [194]:
y_pred_bin, df_result = predict_label(processed_sample)
print(f"Binary Failure Probability: {y_pred_bin:.4f}")
print("Multi Classes Failure Probabilities:")
display(df_result)

Binary Failure Probability: 0.7247
Multi Classes Failure Probabilities:


Unnamed: 0,Class,Probability
2,PWF,0.989386
0,No Failure,0.009918
5,RNF,0.000391
4,TWF,0.000104
1,HDF,0.000102
3,OSF,9.9e-05
