In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install nptdms

Collecting nptdms
  Downloading nptdms-1.10.0.tar.gz (181 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/181.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m174.1/181.5 kB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m181.5/181.5 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: nptdms
  Building wheel for nptdms (pyproject.toml) ... [?25l[?25hdone
  Created wheel for nptdms: filename=nptdms-1.10.0-py3-none-any.whl size=108456 sha256=03b871ee1c3661f483b595468d666904c4c57919e114058370ef06bf01568792
  Stored in directory: /root/.cache/pip/wheels/1b/4b/17/21e8b03b37ea51ce7ec9f5570cdf0decca93f537d61c06880f
Successfully b

In [None]:
import os
import numpy as np
import pandas as pd
from nptdms import TdmsFile
import pywt
from scipy.fft import rfft
from tensorflow.keras.models import load_model

전처리 파이프라인

In [None]:
# 전처리 파이프라인 변수
window_size = 25600
overlap = 0.5
wavelet = 'db4'
level = 3
top_k = 10  # WPT+FFT에서 상위 k개 주파수 특징
sampling_rate = 25600  # 진동 데이터 샘플링 레이트
step = int(window_size * (1 - overlap))
channels_raw = ['CH1', 'CH2', 'CH3', 'CH4']

# TDMS 로드 함수 (진동 채널만, 공백 처리 및 단일 그룹 가정)
def load_tdms_data(folder_path, target_channels_raw):
    all_data_dict = {ch.strip(): [] for ch in target_channels_raw}
    tdms_files = sorted([f for f in os.listdir(folder_path) if f.endswith(".tdms")])

    for file in tdms_files:
        file_path = os.path.join(folder_path, file)
        try:
            tdms = TdmsFile.read(file_path)
            groups = tdms.groups()

            if not groups:
                print(f"[경고] {file}에 그룹이 없습니다. 스킵합니다.")
                continue

            main_group = groups[0]
            channels_in_group = {ch.name.strip(): ch for ch in main_group.channels()} # 공백 제거된 채널명으로 맵핑

            data_per_file = {}
            min_len_file = float('inf') # 파일 내 채널들의 최소 길이를 찾기 위함

            for ch_raw in target_channels_raw:
                ch_stripped = ch_raw.strip()
                if ch_stripped in channels_in_group:
                    data = channels_in_group[ch_stripped].data
                    data_per_file[ch_stripped] = data
                    min_len_file = min(min_len_file, len(data))
                else:
                    print(f"[경고] 파일 '{file}'에서 채널 '{ch_raw}'을(를) 찾을 수 없습니다.")
                    data_per_file[ch_stripped] = np.array([]) # 찾을 수 없으면 빈 배열

            if min_len_file == float('inf') or min_len_file == 0:
                print(f"[경고] 파일 '{file}'에서 유효한 데이터가 없습니다. 스킵합니다.")
                continue

            # 파일 내 모든 채널 데이터를 최소 길이에 맞춰 자르고 저장
            for ch_stripped in all_data_dict.keys():
                if ch_stripped in data_per_file and len(data_per_file[ch_stripped]) >= min_len_file:
                    all_data_dict[ch_stripped].append(data_per_file[ch_stripped][:min_len_file])
                else:
                    # 데이터가 없거나 길이가 짧은 경우, 해당 파일의 모든 채널에 대해 0으로 채워진 배열을 추가
                    all_data_dict[ch_stripped].append(np.zeros(min_len_file))


        except Exception as e:
            print(f"[에러] {file} 처리 중 오류 발생: {e}")

    # 파일별로 로드된 데이터를 채널별로 모두 합치기
    concatenated_data = {}
    for ch_stripped, data_list in all_data_dict.items():
        if data_list:
            concatenated_data[ch_stripped] = np.concatenate(data_list, axis=0)
        else:
            concatenated_data[ch_stripped] = np.array([]) # 데이터가 없으면 빈 배열

    return concatenated_data

# 슬라이딩 윈도우 함수
def sliding_window(data, window_size, overlap):
    step = int(window_size * (1 - overlap))
    return np.array([data[i:i+window_size] for i in range(0, len(data) - window_size + 1, step)])

# WPT+FFT 특징 추출 함수
def extract_wpt_fft_features(signal, wavelet='db4', level=3, top_k=10):
    wp = pywt.WaveletPacket(data=signal, wavelet=wavelet, mode='symmetric', maxlevel=level)
    nodes = [node.path for node in wp.get_level(level, 'freq')]
    features = []
    for node in nodes:
        coeffs = wp[node].data
        if len(coeffs) > 0: # coeffs가 비어있지 않은지 확인
            fft_vals = np.abs(rfft(coeffs))
            if len(fft_vals) >= top_k:
                top_features = np.sort(fft_vals)[-top_k:]
            else: # top_k보다 작으면 있는 그대로 사용하고 나머지는 0으로 채움
                top_features = np.pad(np.sort(fft_vals), (top_k - len(fft_vals), 0), 'constant', constant_values=0)
            features.extend(top_features)
        else: # coeffs가 비어있으면 0으로 채움
            features.extend([0.0] * top_k)
    return np.array(features)

# 전체 전처리 함수 (X, y 반환)
def extract_features_and_rul(folder_path):
    # 진동 데이터만 로드
    all_raw_data = load_tdms_data(folder_path, channels_raw)

    # 모든 진동 채널이 로드되었고 길이가 충분한지 확인
    if not all_raw_data or not all(data.size > 0 for data in all_raw_data.values()):
        print(f"[스킵] {folder_path}: 로드된 데이터가 없거나 유효하지 않습니다.")
        # 반환 형태를 진동 데이터 특징 수에 맞춰 조정
        return np.empty((0, top_k * (2 ** level) * len(channels_raw))), np.empty((0,))

    # 모든 진동 채널 중 가장 짧은 길이에 맞춰 데이터 길이 통일
    min_len_total = min(data.size for data in all_raw_data.values() if data.size > 0)
    if min_len_total < window_size:
        print(f"[스킵] {folder_path}: 총 데이터 길이가 윈도우 크기보다 작습니다.")
        return np.empty((0, top_k * (2 ** level) * len(channels_raw))), np.empty((0,))

    # 진동 데이터만 스택하여 윈도우 생성
    vib_data_stacked = np.stack([all_raw_data[ch.strip()][:min_len_total] for ch in channels_raw], axis=-1)
    vib_windows = sliding_window(vib_data_stacked, window_size, overlap)

    if len(vib_windows) == 0:
        print(f"[스킵] {folder_path}: 슬라이딩 윈도우 결과 없음 (진동 데이터 기준).")
        return np.empty((0, top_k * (2 ** level) * len(channels_raw))), np.empty((0,))

    all_window_features = []
    n_windows = vib_windows.shape[0]

    for i in range(n_windows):
        current_vib_window = vib_windows[i]

        # 진동 채널별 특징 추출
        per_channel_vib_features = []
        for ch_idx in range(current_vib_window.shape[1]):
            vib_feat = extract_wpt_fft_features(current_vib_window[:, ch_idx], wavelet, level, top_k)
            per_channel_vib_features.append(vib_feat)

        # 모든 진동 채널의 특징을 결합
        combined_features = np.concatenate(per_channel_vib_features)
        all_window_features.append(combined_features)

    feature_matrix = np.array(all_window_features)

    # RUL 계산 (초 단위)
    rul_seconds = np.array([(n_windows - i - 1) * step / sampling_rate for i in range(n_windows)])

    return feature_matrix, rul_seconds

평가 파이프라인

In [None]:
# 오차 계산
def calculate_error(ActRUL, PredRUL):
    # error = 100 * (ActRUL - PredRUL) / ActRUL
    # return error
    # ActRUL이 0이거나 매우 작은 값일 경우를 대비하여 epsilon을 더합니다.
    # 이는 RUL이 0인 경우를 '수명이 완전히 다함'으로 간주하고,
    # 해당 시점의 예측 오차를 패널티하는 방식입니다.
    # 그러나 이것은 엄밀히 ActRUL=0일 때의 정의를 변경하는 것이므로 주의가 필요합니다.

    # ActRUL이 0일 때 (진정한 수명 종료 시점) PredRUL도 0이면 오차를 0으로.
    # ActRUL이 0인데 PredRUL이 0이 아니면 예측이 틀린 것이므로, 큰 패널티를 부여.
    # 여기서는 기존 평가 함수에 맞추기 위해 100 * (ActRUL - PredRUL) / ActRUL 형태를 유지하되,
    # ActRUL이 0인 경우 (나눗셈 불가능)를 별도로 처리합니다.

    # Numpy 배열 연산을 위해 np.where 사용
    # ActRUL이 0인 경우 (매우 민감한 지점)
    #   - PredRUL도 0이면 오차 0 (정확히 예측)
    #   - PredRUL이 0이 아니면 (수명이 다했는데도 남았다고 예측) -> 큰 음의 오차 (패널티)
    # ActRUL이 0이 아닌 경우 (일반적인 RUL 예측)
    #   - 정상적인 오차 계산
    error = np.where(ActRUL == 0,
                     np.where(PredRUL == 0, 0, 100 * (-PredRUL) / 1), # PredRUL이 양수면 음의 오차 (큰 패널티), PredRUL이 음수일 일은 없다고 가정
                     100 * (ActRUL - PredRUL) / ActRUL)
    return error

# 정확도 점수 계산
def calculate_accuracy_score(error):
    ln_0_5 = np.log(0.5)
    score = np.where(
        error <= 0,
        np.exp(-ln_0_5 * error / 20),
        np.exp(+ln_0_5 * error / 20)
    )
    return score

# 최종 점수 계산
def calculate_final_score(accuracy_scores):
    return np.mean(accuracy_scores)

# 전체 평가 파이프라인
def evaluate_rul_prediction(ActRUL, PredRUL):
    error = calculate_error(ActRUL, PredRUL)
    accuracy_scores = calculate_accuracy_score(error)
    final_score = calculate_final_score(accuracy_scores)
    return {
        "Error": error,
        "Accuracy_scores": accuracy_scores,
        "Final_Score": final_score
    }

모델 로드 및 예측 수행 후 평가 점수 출력

In [None]:
# 저장된 모델 로드
model_path = "/content/drive/MyDrive/KSPHM-data-challenge/model/cnn_lstm_model.h5"
model = load_model(model_path, compile=False)
print("모델 로드 완료:", model_path)

# Validation 데이터셋 폴더 경로
validation_base_path = "/content/drive/MyDrive/KSPHM-data-challenge/Validation Set"
validation_folders = [f"Validation{i}" for i in range(1, 7)]

results = []

for folder_name in validation_folders:
    folder_path = os.path.join(validation_base_path, folder_name)
    print(f"\n--- {folder_name} 데이터 처리 중 ---")

    # 1. Validation 데이터 로드 및 특징 추출
    # 여기서 actual_rul은 해당 파일의 길이를 기반으로 계산된 RUL입니다.
    # 만약 대회에서 hidden test set에 대한 실제 RUL이 따로 있다면,
    # 이 actual_rul은 사용되지 않고, 채점 서버에서 제공되는 실제 RUL과 여러분의 예측값이 비교됩니다.
    # 하지만 "RUL Score 파일" 제출을 요구하는 것으로 보아, extract_features_and_rul에서 나오는 actual_rul을
    # 사용하여 Score를 계산하고 제출하는 방식일 가능성이 높습니다.
    features, actual_rul = extract_features_and_rul(folder_path)

    if features.shape[0] == 0:
        print(f"Skipping {folder_name} due to no valid features extracted.")
        results.append({
            "File": folder_name,
            "RUL_Score": np.nan  # 특징이 없으면 NaN으로 표시
        })
        continue

    n_samples = features.shape[0]
    n_features_per_window = features.shape[1]

    # 모델의 입력 형태에 맞게 reshape
    reshaped_features = features.reshape(n_samples, n_features_per_window, 1)

    # 2. 저장된 모델로 RUL 예측 수행
    try:
        predicted_rul = model.predict(reshaped_features).flatten()

        # 예측된 RUL이 음수가 되지 않도록 처리
        predicted_rul[predicted_rul < 0] = 0

        # 실제 RUL과 예측 RUL의 길이가 다를 수 있으므로 짧은 길이에 맞춰 자름
        min_len = min(len(actual_rul), len(predicted_rul))
        actual_rul_trimmed = actual_rul[:min_len]
        predicted_rul_trimmed = predicted_rul[:min_len]

        # 3. 평가 점수(Final Score) 계산
        # 수정된 calculate_error 함수를 사용하여 nan 문제 해결
        evaluation_results = evaluate_rul_prediction(actual_rul_trimmed, predicted_rul_trimmed)
        final_score = evaluation_results["Final_Score"]
        print(f"{folder_name} Final Score: {final_score:.4f}")

        results.append({
            "File": folder_name,
            "RUL_Score": final_score
        })
    except Exception as e:
        print(f"[에러] {folder_name} 예측 중 오류 발생: {e}")
        results.append({
            "File": folder_name,
            "RUL_Score": np.nan
        })

# 결과 데이터프레임 생성 및 저장
df_results = pd.DataFrame(results)
team_name = "파머완" # <-- 여기에 팀 이름을 입력하세요
output_excel_path = f"/content/drive/MyDrive/KSPHM-data-challenge/RUL_Score/{team_name}_validation2.xlsx"
df_results.to_excel(output_excel_path, index=False)
print(f"\n평가 점수 저장 완료: {output_excel_path}")

모델 로드 완료: /content/drive/MyDrive/KSPHM-data-challenge/model/cnn_lstm_model.h5

--- Validation1 데이터 처리 중 ---
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step


  100 * (ActRUL - PredRUL) / ActRUL)


Validation1 Final Score: 0.0316

--- Validation2 데이터 처리 중 ---
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step
Validation2 Final Score: 0.0322

--- Validation3 데이터 처리 중 ---
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step
Validation3 Final Score: 0.0325

--- Validation4 데이터 처리 중 ---
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step
Validation4 Final Score: 0.0328

--- Validation5 데이터 처리 중 ---
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
Validation5 Final Score: 0.0334

--- Validation6 데이터 처리 중 ---
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step 
Validation6 Final Score: 0.0336

평가 점수 저장 완료: /content/drive/MyDrive/KSPHM-data-challenge/RUL_Score/파머완_validation2.xlsx
