# **데이터 전처리 코드**

## **1. 드라이브 연결**

In [1]:
from google.colab import drive
import os

drive.mount('/content/drive')

Mounted at /content/drive


## **2. 데이터 로드**

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from scipy.interpolate import interp1d

# Machine Data 로드 (구글 드라이브 내 파일 혹은 컴퓨터 내 파일의 경로로 변경하기 )
file_path = "/content/drive/MyDrive/ASK_2026_FDC/Dataset/MACHINE_Data.xlsx"

# df 변수에 파일 로드 (데이터셋은 엑셀 파일로 공유드렸기 때문에 xlsx, csv 확장자로 이루어져야 함)
if os.path.exists(file_path):
    print(f"{file_path}")
    if file_path.endswith('.xlsx'):
        df = pd.read_excel(file_path)
    elif file_path.endswith('.csv'):
        df = pd.read_csv(file_path)
    else:
        print("지원하지 않는 파일 형식입니다.")
else:
    raise FileNotFoundError(f"File not found: {file_path}")

# Machine Data의 전체 행열 표기
print(f"Machine Data 로드 완료. Shape: {df.shape}")

# 식별 및 정답 Column
meta_cols = ['Wafer_ID', 'Time_Step', 'Label', 'Fault_Type', 'Time', 'Step Number']

# 학습에 사용할 센서 변수 선택 (센서 데이터만 선택)
sensor_cols = [c for c in df.columns if c not in meta_cols]
# 문자열 데이터 필터링 추가
sensor_cols = df[sensor_cols].select_dtypes(include=[np.number]).columns.tolist()

print(f"선택된 센서 변수 ({len(sensor_cols)}개):")
print(sensor_cols)

/content/drive/MyDrive/ASK_2026_FDC/Dataset/MACHINE_Data.xlsx
Machine Data 로드 완료. Shape: (12829, 25)
선택된 센서 변수 (19개):
['BCl3 Flow', 'Cl2 Flow', 'RF Btm Pwr', 'RF Btm Rfl Pwr', 'Endpt A', 'He Press', 'Pressure', 'RF Tuner', 'RF Load', 'RF Phase Err', 'RF Pwr', 'RF Impedance', 'TCP Tuner', 'TCP Phase Err', 'TCP Impedance', 'TCP Top Pwr', 'TCP Rfl Pwr', 'TCP Load', 'Vat Valve']


## **3. 데이터 전처리**



In [3]:
# 설정 변수
FIXED_LENGTH = 100  # Time Steps 통일
n_features = len(sensor_cols)   # 센서 변수의 총 개수

# 전처리 함수 정의
# df: 데이터셋 저장 변수
# wafer_col: 웨이퍼를 구별하는 ID Column 이름(Wafer_ID)
# sensor_cols: 센서 데이터의 고유 이름이 저장되어 있는 리스트
# target_len: 시간 길이(FIXED_LENGTH)
def preprocess_to_3d(df, wafer_col, sensor_cols, target_len):

    # Wafer_ID 추출
    wafer_ids = df[wafer_col].unique()
    data_list = []   # 센서 데이터 저장 (X축)
    label_list = []  # 정상/불량 라벨 저장 (Y축)

    print(f"총 웨이퍼 수: {len(wafer_ids)}개")

    # 하나의 단일 웨이퍼 씩 처리
    for wid in wafer_ids:
        # 특정 웨이퍼의 데이터 추출
        wafer_df = df[df[wafer_col] == wid]

        # 1. 라벨링 저장 (Normal/Fault)
        label = 1 if wafer_df['Label'].iloc[0] == 'Normal' else 0
        label_list.append(label)

        # 2. 센서 데이터의 보간법 (길이 맞추기)
        sensor_data = wafer_df[sensor_cols].values # (Current_Len, Features)
        current_len = len(sensor_data)

        if current_len == 0: continue

        # 시간 축 정규화(0 ~ 1로 변환)(Interporation)(93s의 공정이라면)
        x_old = np.linspace(0, 1, current_len)  # 센서 데이터를 0 ~ 93으로 나타냄
        x_new = np.linspace(0, 1, target_len)   # 센서 데이터를 0 ~ 100으로 보간함

        # 보간 함수 생성 및 데이터 재생성
        f = interp1d(x_old, sensor_data, axis=0, kind='linear')   # 보간 함수(f)를 통해 x_old의 데이터 패턴을 저장함
        data_resized = f(x_new) # x_new를 입력하여 x_old의 데이터 패턴을 반영된 데이터를 재생성함

        data_list.append(data_resized)

    return np.array(data_list), np.array(label_list)

# 함수 실행 (3차원 배열 생성)
# X_raw: 3차원 센서 데이터, y_labels: 라벨 데이터
X_raw, y_labels = preprocess_to_3d(df, 'Wafer_ID', sensor_cols, FIXED_LENGTH)

print(f"\n변환 완료 형태 (Shape): {X_raw.shape}")
print(f"(Samples: {X_raw.shape[0]}, TimeSteps: {X_raw.shape[1]}, Features: {X_raw.shape[2]})")

총 웨이퍼 수: 129개

변환 완료 형태 (Shape): (129, 100, 19)
(Samples: 129, TimeSteps: 100, Features: 19)


In [4]:
# 1. 3차원 -> 2차원 변환 (MinMax스케일러는 2차원 데이터로만 입력받을 수 있음)
n_samples, n_steps, n_feats = X_raw.shape
X_2d = X_raw.reshape(-1, n_feats)       # 변환 후: (12900행, 19열) = (129웨이퍼 x 100초, 19개의 센서데이터)

# 2. MinMax 스케일링 (센서 데이터의 최소값, 최대값을 기준으로 0부터 1까지 스케일링함)
scaler = MinMaxScaler()
X_scaled_2d = scaler.fit_transform(X_2d)

# 3. 다시 3차원으로 복원
X_final = X_scaled_2d.reshape(n_samples, n_steps, n_feats)

print("정규화 완료. 데이터 범위:", np.min(X_final), "~", np.max(X_final))

정규화 완료. 데이터 범위: 0.0 ~ 1.0


## **+. 실험별 학습셋, 테스트셋 분류**

- 전체 데이터셋을 각 실험 별로 학습셋, 테스트셋으로 나누었음
- 학습셋과 테스트셋은 정상 데이터로만 이루어져 있음

In [5]:
# Wafer_ID 저장
wafer_ids = df['Wafer_ID'].unique()

# Wafer_ID의 개수가 X_final의 샘플 수와 같아야 함
print(f"전체 웨이퍼 개수: {len(wafer_ids)}")
print(f"데이터셋 크기: {X_final.shape[0]}")

전체 웨이퍼 개수: 129
데이터셋 크기: 129


In [6]:
from sklearn.model_selection import train_test_split

# 테스트셋과 학습셋은 정상 데이터로만 이루어져 있고 불량 데이터은 따로 X_fault로 출력됨
def create_dataset_for_experiment(exp_num, X_all, y_all, ids_all):
    # 특정 실험 번호에 해당하는 데이터만 선별하여 학습셋, 테스트셋을 만듭니다.

    print(f"\n======== [Experiment {exp_num}] 데이터셋 생성 ========")

    # 1. 해당 실험에 속하는 인덱스 찾기 (예: 'l29'로 시작하는 ID 찾기)
    # startswith를 사용하여 실험 번호 구분
    exp_indices = [i for i, wid in enumerate(ids_all) if wid.startswith(exp_num)]

    if len(exp_indices) == 0:
        print("해당 실험 데이터를 찾을 수 없습니다.")
        return None, None, None

    # 2. 해당 실험의 데이터만 추출
    X_exp = X_all[exp_indices]
    y_exp = y_all[exp_indices]

    print(f"  > 전체 샘플 수: {len(X_exp)}개")

    # 3. 정상(1) / 불량(0) 분리
    normal_idx = np.where(y_exp == 1)[0]
    fault_idx = np.where(y_exp == 0)[0]

    X_normal = X_exp[normal_idx]
    X_fault = X_exp[fault_idx]

    print(f"  > 정상: {len(X_normal)}개, 불량: {len(X_fault)}개")

    # 4. 학습셋 / 테스트셋 분할 (정상 데이터만 8:2로 분할)
    # random_state를 고정하여 재현성 확보
    X_train, X_test_normal = train_test_split(X_normal, test_size=0.2, random_state=42)

    # 5. 불량 데이터는 전량 테스트용으로 사용 (학습하지 않음)
    # 반환값: 학습셋(정상), 테스트셋(정상), 테스트셋(불량)
    return X_train, X_test_normal, X_fault

# -----------------------------------------------------------
# 실행: 실험 29, 31, 33에 대해 각각 데이터셋 생성
# -----------------------------------------------------------

# 결과를 저장할 딕셔너리
datasets = {}

# 데이터의 실험 번호 확인: 'l29', 'l31', 'l33' (소문자 l + 실험 번호)
experiments = ['l29', 'l31', 'l33']

for exp_id in experiments:
    # 함수 실행
    X_train_exp, X_test_norm_exp, X_test_fault_exp = create_dataset_for_experiment(
        exp_id, X_final, y_labels, wafer_ids
    )

    # 딕셔너리에 저장
    datasets[exp_id] = {
        'train': X_train_exp,
        'test_normal': X_test_norm_exp,
        'test_fault': X_test_fault_exp
    }


  > 전체 샘플 수: 43개
  > 정상: 34개, 불량: 9개

  > 전체 샘플 수: 43개
  > 정상: 37개, 불량: 6개

  > 전체 샘플 수: 43개
  > 정상: 37개, 불량: 6개


In [7]:
print("\n======== 최종 데이터셋 구성 확인 ========")

for exp_id in experiments:
    data = datasets[exp_id]
    if data['train'] is not None:
        print(f"[{exp_id} 실험]")
        print(f"  - 학습용 (Normal): {data['train'].shape}")
        print(f"  - 테스트 (Normal): {data['test_normal'].shape}")
        print(f"  - 테스트 (Fault) : {data['test_fault'].shape}")

        print(f"  - 학습 데이터는 순수 정상 데이터입니다.")
    else:
        print(f"[{exp_id} 실험] 데이터 없음 (ID 확인 필요)")


[l29 실험]
  - 학습용 (Normal): (27, 100, 19)
  - 테스트 (Normal): (7, 100, 19)
  - 테스트 (Fault) : (9, 100, 19)
  - 학습 데이터는 순수 정상 데이터입니다.
[l31 실험]
  - 학습용 (Normal): (29, 100, 19)
  - 테스트 (Normal): (8, 100, 19)
  - 테스트 (Fault) : (6, 100, 19)
  - 학습 데이터는 순수 정상 데이터입니다.
[l33 실험]
  - 학습용 (Normal): (29, 100, 19)
  - 테스트 (Normal): (8, 100, 19)
  - 테스트 (Fault) : (6, 100, 19)
  - 학습 데이터는 순수 정상 데이터입니다.
