In [4]:

import pandas as pd
import numpy as np
import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("process_log.log"),
        logging.StreamHandler()
    ]
)

def construct_X_matrix(x, y, rx, ry, matrix_type='dx'):
    """
    X 행렬을 구성하는 함수.

    Parameters:
        x (numpy.ndarray): X 좌표
        y (numpy.ndarray): Y 좌표
        rx (numpy.ndarray): Field X 좌표
        ry (numpy.ndarray): Field Y 좌표
        matrix_type (str): 'dx' 또는 'dy'로 구분하여 dx/dy에 맞는 행렬 생성
    
    Returns:
        numpy.ndarray: 구성된 X 행렬
    """
    if matrix_type == 'dx':  # WK 1023/1023, RK 1023/511
        terms = [
            np.ones(len(x)), 
            x / 1e6, -y / 1e6, 
            (x ** 2) / 1e12, (x * y) / 1e12, (y ** 2) / 1e12, 
            (x ** 3) / 1e15, (x ** 2 * y) / 1e15, (x * y ** 2) / 1e15, (y ** 3) / 1e15,
            rx / 1e6, -ry / 1e6, 
            (rx ** 2) / 1e9, (rx * ry) / 1e9, (ry ** 2) / 1e9,
            (rx ** 3) / 1e12, (rx ** 2 * ry) / 1e12, (rx * ry ** 2) / 1e12, (ry ** 3) / 1e12
        ]
    elif matrix_type == 'dy':
        terms = [
            np.ones(len(y)), 
            y / 1e6, x / 1e6, 
            (y ** 2) / 1e12, (y * x) / 1e12, (x ** 2) / 1e12, 
            (y ** 3) / 1e15, (y ** 2 * x) / 1e15, (y * x ** 2) / 1e15, (x ** 3) / 1e15,
            ry / 1e6, rx / 1e6, (ry ** 2) / 1e9, (ry * rx) / 1e9, (rx ** 2) / 1e9,
            (ry ** 3) / 1e12, (ry ** 2 * rx) / 1e12, (ry * rx ** 2) / 1e12
        ]
    else:
        raise ValueError("matrix_type은 'dx' 또는 'dy'만 가능합니다.")
    
    return np.vstack(terms).T


def prepare_coordinates(group):
    """
    좌표 및 독립변수를 계산하는 공통 함수.

    Parameters:
        group (pandas.DataFrame): UNIQUE_ID 그룹 데이터
    
    Returns:
        tuple: (x, y, rx, ry) 계산된 좌표
    """
    die_x = group['DieX']
    die_y = group['DieY']
    step_pitch_x = group['STEP_PITCH_X']
    step_pitch_y = group['STEP_PITCH_Y']
    map_shift_x = group['MAP_SHIFT_X']
    map_shift_y = group['MAP_SHIFT_Y']
    coordinate_x = group['coordinate_X']
    coordinate_y = group['coordinate_Y']

    # 좌표 계산
    x = die_x * step_pitch_x + map_shift_x
    y = die_y * step_pitch_y + map_shift_y
    rx = coordinate_x
    ry = coordinate_y
    
    return x, y, rx, ry


def create_matrices(x, y, rx, ry):
    """
    X_dx와 X_dy 행렬을 생성하는 공통 함수.

    Parameters:
        x, y, rx, ry (numpy.ndarray): 좌표 데이터
    
    Returns:
        tuple: (X_dx, X_dy) 행렬
    """
    X_dx = construct_X_matrix(x, y, rx, ry, matrix_type='dx')
    X_dy = construct_X_matrix(x, y, rx, ry, matrix_type='dy')
    return X_dx, X_dy


def multi_lot_analysis(df_rawdata):
    """
    UNIQUE_ID별로 회귀 및 잔차 계산을 통합 수행하는 함수.
    df_rawdata에 잔차 결과를 직접 추가합니다.

    Parameters:
        df_rawdata (pandas.DataFrame): 입력 데이터
    
    Returns:
        pandas.DataFrame: 회귀 계수 결과 데이터프레임
    """
    grouped = df_rawdata.groupby('UNIQUE_ID')
    coeff_results = []  # 회귀 계수 저장
    
    # 예측값과 잔차를 저장할 열 초기화
    df_rawdata['pred_x'] = np.nan
    df_rawdata['pred_y'] = np.nan
    df_rawdata['residual_x'] = np.nan
    df_rawdata['residual_y'] = np.nan




    for unique_id, group in grouped:
        logging.info(f"Processing UNIQUE_ID: {unique_id}")

        # 좌표 및 독립변수 준비
        x, y, rx, ry = prepare_coordinates(group)
        X_dx, X_dy = create_matrices(x, y, rx, ry)

        # 종속변수
        Y_dx = group['X_reg']
        Y_dy = group['Y_reg']

        # 회귀 계수 계산
        coeff_dx = np.linalg.lstsq(X_dx, Y_dx, rcond=None)[0]
        coeff_dy = np.linalg.lstsq(X_dy, Y_dy, rcond=None)[0]

        # 회귀 계수 결과 저장
        coeff_results.append(pd.DataFrame({
            'UNIQUE_ID': [unique_id],
            'WK1': [coeff_dx[0]],
            'WK2': [coeff_dy[0]],
            'WK3': [coeff_dx[1]],
            'WK4': [coeff_dy[1]],
            'WK5': [coeff_dx[2]],
            'WK6': [coeff_dy[2]],
            'WK7': [coeff_dx[3]],
            'WK8': [coeff_dy[3]],
            'WK9': [coeff_dx[4]],
            'WK10': [coeff_dy[4]],
            'WK11': [coeff_dx[5]],
            'WK12': [coeff_dy[5]],
            'WK13': [coeff_dx[6]],
            'WK14': [coeff_dy[6]],
            'WK15': [coeff_dx[7]],
            'WK16': [coeff_dy[7]],
            'WK17': [coeff_dx[8]],
            'WK18': [coeff_dy[8]],
            'WK19': [coeff_dx[9]],
            'WK20': [coeff_dy[9]],
            'RK1': [0],
            'RK2': [0],
            'RK3': [coeff_dx[10]],
            'RK4': [coeff_dy[10]],
            'RK5': [coeff_dx[11]],
            'RK6': [coeff_dy[11]],
            'RK7': [coeff_dx[12]],
            'RK8': [coeff_dy[12]],
            'RK9': [coeff_dx[13]],
            'RK10': [coeff_dy[13]],
            'RK11': [coeff_dx[14]],
            'RK12': [coeff_dy[14]],
            'RK13': [coeff_dx[15]],
            'RK14': [coeff_dy[15]],
            'RK15': [coeff_dx[16]],
            'RK16': [coeff_dy[16]],
            'RK17': [coeff_dx[17]],
            'RK18': [coeff_dy[17]],
            'RK19': [coeff_dx[18]],
            'RK20': [0]
        }))


        # 예측값 계산
        pred_x = X_dx.dot(coeff_dx)
        pred_y = X_dy.dot(coeff_dy)

        # 잔차 계산
        residual_x = group['X_reg'] - pred_x
        residual_y = group['Y_reg'] - pred_y


        # 기존 df_rawdata에 예측값과 잔차 추가
        df_rawdata.loc[group.index, 'pred_x'] = pred_x
        df_rawdata.loc[group.index, 'pred_y'] = pred_y
        df_rawdata.loc[group.index, 'residual_x'] = residual_x
        df_rawdata.loc[group.index, 'residual_y'] = residual_y


    # 결과 병합
    df_coeff = pd.concat(coeff_results, ignore_index=True)
  
    return df_coeff, df_rawdata




# Zernike 기저 행렬 생성 함수
def compute_zernike_matrix(r, theta, max_order=5):
    """
    Zernike 다항식 기저 생성
    :param r: 방사 좌표계의 반지름 값 (0 ≤ r ≤ 1로 스케일링 필요)
    :param theta: 방사 좌표계의 각도 값 (-π ≤ θ ≤ π)
    :param max_order: 최대 차수
    :return: Zernike 기저 행렬
    """
    Z = [np.ones_like(r)]  # Z0,0: 상수항
    for n in range(1, max_order + 1):
        for m in range(-n, n + 1, 2):
            if m >= 0:
                Z.append(r**n * np.cos(m * theta))  # 짝수 m: cos(mθ)
            else:
                Z.append(r**n * np.sin(-m * theta))  # 홀수 m: sin(-mθ)
    return np.array(Z).T  # 각 Zernike 다항식이 열로 구성된 행렬


# 좌표 변환 및 Zernike 기저 생성 함수
def prepare_zernike_coordinates(group, max_order=5):
    """
    Zernike 좌표 및 기저 행렬 생성
    :param group: UNIQUE_ID별 그룹 데이터
    :param max_order: 최대 차수
    :return: Zernike 기저 행렬
    """
    die_x = group['DieX']
    die_y = group['DieY']
    step_pitch_x = group['STEP_PITCH_X']
    step_pitch_y = group['STEP_PITCH_Y']
    map_shift_x = group['MAP_SHIFT_X']
    map_shift_y = group['MAP_SHIFT_Y']
    coordinate_x = group['coordinate_X']
    coordinate_y = group['coordinate_Y']

    # 방사 좌표계 변환
    wf_x = die_x * step_pitch_x + map_shift_x + coordinate_x
    wf_y = die_y * step_pitch_y + map_shift_y + coordinate_y
    r = np.sqrt(wf_x**2 + wf_y**2) / 1e6  # 거리 스케일링
    theta = np.arctan2(wf_y, wf_x)

    # Zernike 기저 행렬 생성
    return compute_zernike_matrix(r, theta, max_order=max_order)


# Zernike 분석 함수
def zernike_analysis(df_rawdata, max_order=5):
    """
    Zernike 회귀분석 및 잔차 계산
    :param df_rawdata: 입력 데이터
    :param max_order: Zernike 다항식 최대 차수
    :return: (df_z_coeff, df_rawdata_with_predictions)
    """
    grouped = df_rawdata.groupby('UNIQUE_ID')
    coeff_results = []

    # 원본 데이터프레임에 예측값 및 잔차 열 추가
    df_rawdata['Z_pred_x'] = np.nan
    df_rawdata['Z_pred_y'] = np.nan
    df_rawdata['Z_residual_x'] = np.nan
    df_rawdata['Z_residual_y'] = np.nan

    for unique_id, group in grouped:
        logging.info(f"Processing UNIQUE_ID: {unique_id}")

        # Zernike 기저 생성
        Z = prepare_zernike_coordinates(group, max_order=max_order)

        # 종속변수
        Y_dx = group['X_reg']
        Y_dy = group['Y_reg']

        # 회귀 계수 계산
        coeff_dx = np.linalg.lstsq(Z, Y_dx, rcond=None)[0]
        coeff_dy = np.linalg.lstsq(Z, Y_dy, rcond=None)[0]

        # 회귀 계수 저장
        coeff_result = {'UNIQUE_ID': unique_id}
        coeff_result.update({f'Z{i+1}_dx': coeff for i, coeff in enumerate(coeff_dx)})
        coeff_result.update({f'Z{i+1}_dy': coeff for i, coeff in enumerate(coeff_dy)})
        coeff_results.append(coeff_result)

        # 예측값 계산
        pred_x = Z @ coeff_dx
        pred_y = Z @ coeff_dy

        # 잔차 계산
        residual_x = Y_dx - pred_x
        residual_y = Y_dy - pred_y

        # 원본 데이터프레임에 추가
        df_rawdata.loc[group.index, 'Z_pred_x'] = pred_x
        df_rawdata.loc[group.index, 'Z_pred_y'] = pred_y
        df_rawdata.loc[group.index, 'Z_residual_x'] = residual_x
        df_rawdata.loc[group.index, 'Z_residual_y'] = residual_y

    # 회귀 계수 결과 데이터프레임 생성
    df_z_coeff = pd.DataFrame(coeff_results)
    return df_z_coeff, df_rawdata





if __name__ == "__main__":
    # 데이터 불러오기
    df_rawdata = pd.read_csv("RawData-1_2lot.csv")
    logging.info(f"Raw data loaded. Shape: {df_rawdata.shape}")

    # 통합 분석 실행
    logging.info("Starting multi-lot analysis")
    df_coeff, df_residual = multi_lot_analysis(df_rawdata)

    # 결과 저장
    df_coeff.to_csv("OSR_K_test.csv", index=False)
    logging.info("Regression coefficients saved to OSR_K_test.csv")


    # Zernike 분석 실행
    max_order = 5
    logging.info("Starting Zernike analysis")
    df_z_coeff, df_rawdata_with_predictions = zernike_analysis(df_rawdata, max_order=max_order)

    # 결과 저장
    df_z_coeff.to_csv("Zernike_Coefficients.csv", index=False)
    logging.info("Zernike coefficients saved to Zernike_Coefficients.csv")


    df_rawdata_with_predictions.to_csv("통합(C+Z)_FIT.csv", index=False)
    logging.info("Zernike predictions and residuals saved to Z_FIT.csv")

    





2024-11-23 05:13:22,591 - INFO - Raw data loaded. Shape: (2494, 38)
2024-11-23 05:13:22,592 - INFO - Starting multi-lot analysis
2024-11-23 05:13:22,595 - INFO - Processing UNIQUE_ID: VH075030_PTVB827_3GJPBVH.VH075P_-_2024-05-30 15:39:37_2024-05-30 17:17:40_B3N049.1_11_E1
2024-11-23 05:13:22,601 - INFO - Processing UNIQUE_ID: WF075030_PTVP841_5G4PPWF.WF075P_-_2024-07-16 23:04:19_2024-07-16 23:55:04_PDS211.1_11_E1
2024-11-23 05:13:22,609 - INFO - Regression coefficients saved to OSR_K_test.csv
2024-11-23 05:13:22,609 - INFO - Starting Zernike analysis
2024-11-23 05:13:22,612 - INFO - Processing UNIQUE_ID: VH075030_PTVB827_3GJPBVH.VH075P_-_2024-05-30 15:39:37_2024-05-30 17:17:40_B3N049.1_11_E1
2024-11-23 05:13:22,620 - INFO - Processing UNIQUE_ID: WF075030_PTVP841_5G4PPWF.WF075P_-_2024-07-16 23:04:19_2024-07-16 23:55:04_PDS211.1_11_E1
2024-11-23 05:13:22,638 - INFO - Zernike coefficients saved to Zernike_Coefficients.csv
2024-11-23 05:13:22,694 - INFO - Zernike predictions and residuals 