In [None]:
# build_shap.py
import os, joblib, pandas as pd, shap

BASE    = os.path.dirname(__file__)
DATA    = os.path.join(BASE, 'data')
MODEL   = os.path.join(DATA, 'model.pkl')
VAL_CSV = os.path.join(DATA, 'val_results.csv')
OUT     = os.path.join(DATA, 'shap_values.pkl')

# 1) 필수 파일 체크
for p in (MODEL, VAL_CSV):
    if not os.path.exists(p):
        raise FileNotFoundError(f"파일 없음: {p}")

# 2) 데이터 로드 & feature만 골라 ndarray 생성
df    = pd.read_csv(VAL_CSV)
drops = ['unit','RUL','y_true','y_pred']
feats = [c for c in df.columns if c not in drops]
X     = df[feats].values    # (n_samples, n_features)

# 3) 모델 로드
model = joblib.load(MODEL)

# 4) KernelExplainer 생성
#    - 모델 예측 함수: model.predict (ndarray → ndarray)
#    - background: X 중 랜덤 100개 (속도 조절 가능)
bg_idx    = list(range(min(100, X.shape[0])))
background = X[bg_idx]
explainer  = shap.KernelExplainer(model.predict, background)

# 5) SHAP 값 계산
n = X.shape[0]
print(f"SHAP 계산 시작: {n} samples, {X.shape[1]} features")
# 속도 위해 nsamples=100 (필요시 늘리세요)
shap_out = explainer.shap_values(X, nsamples=100)
# shap_out이 리스트면 첫 번째 엘리먼트 사용
shap_vals = shap_out[0] if isinstance(shap_out, (list,tuple)) else shap_out

# 6) unit별 dict 정리
units     = df['unit'].tolist()
shap_data = {
    u: {'feature_names': feats, 'values': shap_vals[i].tolist()}
    for i, u in enumerate(units)
}

# 7) 저장
joblib.dump(shap_data, OUT)
print(f"✔ shap_values.pkl 생성 완료: {OUT}")


NameError: name '__file__' is not defined

In [4]:
# build_shap.py (수정)
import os, joblib, pandas as pd, shap

BASE    = os.getcwd()
DATA    = os.path.join(BASE, 'data')
MODEL   = os.path.join(DATA, 'model.pkl')
# val_results.csv 대신 FD001 test cleaned 사용
TEST_CSV= os.path.join(DATA, 'test_FD001_cleaned.csv')
OUT     = os.path.join(DATA, 'shap_values.pkl')

# 1) 파일 체크
for p in (MODEL, TEST_CSV):
    if not os.path.exists(p):
        raise FileNotFoundError(f"파일 없음: {p}")

# 2) test CSV 로드 & 마지막 타임스텝만 추출
df_test = pd.read_csv(TEST_CSV)
latest  = df_test.groupby('unit').last().reset_index()

# 3) feature_cols 자동 감지 (unit, RUL 제외)
drop_cols = ['unit','RUL']
feats     = [c for c in latest.columns if c not in drop_cols]
print("▶ test_cleaned feature_cols:", feats)

X = latest[feats].values  # ndarray (n_units, n_features)

# 4) 모델 로드 & KernelExplainer 생성
model = joblib.load(MODEL)
background = X[: min(100, X.shape[0])]
explainer  = shap.KernelExplainer(model.predict, background)

# 5) SHAP 계산
print(f"▶ SHAP 계산 시작: samples={X.shape[0]}, features={X.shape[1]}")
shap_out = explainer.shap_values(X, nsamples=100)
shap_vals = shap_out[0] if isinstance(shap_out, (list,tuple)) else shap_out

# 6) dict 정리 & 저장
units = latest['unit'].tolist()
shap_data = {
    u: {'feature_names': feats, 'values': shap_vals[i].tolist()}
    for i, u in enumerate(units)
}
joblib.dump(shap_data, OUT)
print("✔ shap_values.pkl 생성 완료")


▶ test_cleaned feature_cols: ['time', 'op1', 'op2', 'op3', 's1', 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14', 's15', 's16', 's17', 's18', 's19', 's20', 's21']
Provided model function fails when applied to the provided data set.


ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input Tensor("data:0", shape=(32, 25), dtype=float32). Expected shape (None, 50, 24), but input has incompatible shape (32, 25)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(32, 25), dtype=float32)
  • training=False
  • mask=None
  • kwargs=<class 'inspect._empty'>

In [None]:
# build_shap.py
import os, joblib, pandas as pd, shap

# 1) 경로 설정
BASE     = os.getcwd()
DATA_DIR = os.path.join(BASE, 'data')
MODEL_PKL= os.path.join(DATA_DIR, 'model.pkl')
TEST_CSV = os.path.join(DATA_DIR, 'test_FD001_cleaned.csv')  # FD001 예시
OUT_PKL  = os.path.join(DATA_DIR, 'shap_values.pkl')

# 2) 필수 파일 확인
for p in (MODEL_PKL, TEST_CSV):
    if not os.path.exists(p):
        raise FileNotFoundError(f"파일이 없습니다: {p}")

# 3) 테스트 데이터 로드 & 마지막 타임스텝만
df_test = pd.read_csv(TEST_CSV)
latest  = df_test.groupby('unit').last().reset_index()

# 4) feature 컬럼 자동 감지
drop_cols = ['unit','RUL']
feats     = [c for c in latest.columns if c not in drop_cols]
print("▶ 감지된 features:", feats)

X = latest[feats].values   # numpy array (n_units, n_features)

# 5) 모델 로드
model = joblib.load(MODEL_PKL)

# 6) KernelExplainer 준비
#    - model.predict의 결과를 1D array로 flatten
predict_fn = lambda x: model.predict(x).flatten()
bg_size    = min(100, X.shape[0])
background = X[:bg_size]
explainer  = shap.KernelExplainer(predict_fn, background)

# 7) SHAP 값 계산
print(f"▶ SHAP 계산: samples={X.shape[0]}, features={X.shape[1]}")
shap_out = explainer.shap_values(X, nsamples=100)
# list 반환이면 첫 번째 요소 사용
shap_vals = shap_out[0] if isinstance(shap_out, (list, tuple)) else shap_out

# 8) shap_data dict 생성 & 저장
units     = latest['unit'].tolist()
shap_data = {
    u: {'feature_names': feats, 'values': shap_vals[i].tolist()}
    for i, u in enumerate(units)
}
joblib.dump(shap_data, OUT_PKL)
print(f"✔ shap_values.pkl 생성 완료: {OUT_PKL}")


FileNotFoundError: [Errno 2] No such file or directory: 'c:\\Users\\temp\\nasa_cmas\\cmaps\\data\\data\\test_FD001_cleaned.csv'

In [1]:
# build_shap.py
import os, joblib, pandas as pd, shap
import tensorflow as tf

# 1) 경로 설정
BASE      = os.getcwd()
DATA_DIR  = os.path.join(BASE, 'data')
MODEL_PKL = os.path.join(DATA_DIR, 'model.pkl')
TEST_CSV  = os.path.join(DATA_DIR, 'test_FD001_cleaned.csv')
OUT_PKL   = os.path.join(DATA_DIR, 'shap_values.pkl')

# 2) 필수 파일 확인
for p in (MODEL_PKL, TEST_CSV):
    if not os.path.exists(p):
        raise FileNotFoundError(f"파일이 없습니다: {p}")

# 3) 테스트 데이터 로드 & 마지막 타임스텝만
df_test = pd.read_csv(TEST_CSV)
latest  = df_test.groupby('unit').last().reset_index()

# 4) feature 컬럼 자동 감지 & ndarray 변환
drop_cols = ['unit','RUL']
feats     = [c for c in latest.columns if c not in drop_cols]
print("▶ feature_cols:", feats)
X = latest[feats].values.astype(float)  # ensure float32/64

# 5) 모델 로드
#    만약 joblib로 Keras 모델을 저장/불러왔으면, 
#    대신 아래처럼 load_model 쓰세요:
# from tensorflow.keras.models import load_model
# model = load_model(MODEL_KERAS_H5)
model = joblib.load(MODEL_PKL)  

# 6) DeepExplainer 생성
#    DeepExplainer는 내부적으로 gradient 기반 SHAP을 쓰며,
#    TF/Keras 모델에 최적화되어 있습니다.
bg_size    = min(100, X.shape[0])
background = X[:bg_size]
explainer  = shap.DeepExplainer(model, background)

# 7) SHAP 값 계산
print(f"▶ Deep SHAP 계산: samples={X.shape[0]}, features={X.shape[1]}")
shap_out   = explainer.shap_values(X)  
# shap_out도 list일 수 있으니:
shap_vals  = shap_out[0] if isinstance(shap_out, (list, tuple)) else shap_out

# 8) shap_data dict 생성 & 저장
units     = latest['unit'].tolist()
shap_data = {
    u: {'feature_names': feats, 'values': shap_vals[i].tolist()}
    for i, u in enumerate(units)
}
joblib.dump(shap_data, OUT_PKL)
print(f"✔ shap_values.pkl 생성 완료: {OUT_PKL}")


▶ feature_cols: ['time', 'op1', 'op2', 'op3', 's1', 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14', 's15', 's16', 's17', 's18', 's19', 's20', 's21']


Expected: input_layer_3
Received: inputs=['Tensor(shape=(100, 25))']


ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input [[ 3.10000e+01 -6.00000e-04  4.00000e-04 ...  1.00000e+02  3.88100e+01
   2.33552e+01]
 [ 4.90000e+01  1.80000e-03 -1.00000e-04 ...  1.00000e+02  3.88100e+01
   2.32618e+01]
 [ 1.26000e+02 -1.60000e-03  4.00000e-04 ...  1.00000e+02  3.89300e+01
   2.32740e+01]
 ...
 [ 1.21000e+02  1.70000e-03  1.00000e-04 ...  1.00000e+02  3.87600e+01
   2.33608e+01]
 [ 9.70000e+01  4.70000e-03 -0.00000e+00 ...  1.00000e+02  3.89500e+01
   2.33595e+01]
 [ 1.98000e+02  1.30000e-03  3.00000e-04 ...  1.00000e+02  3.87000e+01
   2.31855e+01]]. Expected shape (None, 50, 24), but input has incompatible shape (100, 25)[0m

Arguments received by Sequential.call():
  • inputs=['tf.Tensor(shape=(100, 25), dtype=float32)']
  • training=None
  • mask=['None']
  • kwargs=<class 'inspect._empty'>