# 데이터 파이프라인

## 함수목록

### 로그 변환

In [1]:
# ============================================================
#    로그 변환 함수: Series 값에 log1p를 적용
# ============================================================

def log_transforming(series):
    """
    주어진 Pandas Series의 값을 자연로그 변환(log1p)을 수행하여 반환합니다.

    Parameters:
        series (pd.Series): 로그 변환할 데이터. 모든 값은 0 이상의 양수여야 합니다.

    Returns:
        pd.Series: 원본 인덱스와 이름을 유지한 로그 변환된 데이터.

    Raises:
        TypeError: 입력값이 Pandas Series가 아닐 경우.
        ValueError: Series에 음수 값이 포함된 경우.
    """
    # import pandas as pd
    # import numpy as np
    # import logging

    logging.info(f"✅ [log_transforming] 로그 변환 시작 - Column: {series.name}")

    # 입력 데이터 타입 확인
    if not isinstance(series, pd.Series):
        raise TypeError("입력값은 pandas Series여야 합니다.")

    # 숫자형 데이터인지 확인 (불필요한 변환 방지)
    if not np.issubdtype(series.dtype, np.number):
        raise ValueError("로그 변환을 위해 숫자형 데이터가 필요합니다.")

    # 음수 값 확인
    if (series < 0).any():
        raise ValueError("로그 변환할 데이터에 음수 값이 포함되어 있습니다.")

    # float 타입이 아닌 경우만 변환
    if not np.issubdtype(series.dtype, np.floating):
        series = series.astype(float)

    # 자연로그 변환
    result_series = np.log1p(series)

    logging.info(f"✅ [log_transforming] 로그 변환 완료 - 결과 샘플: {result_series.head(3).tolist()}")

    return result_series

### 정규화

In [2]:
def normalizing(series):
    """
    주어진 Pandas Series의 값을 표준화(Standardization)하여 평균 0, 표준편차 1의 분포로 변환합니다.

    Parameters:
        series (pd.Series): 표준화할 데이터.

    Returns:
        pd.Series: 표준화된 데이터 (원본 인덱스와 이름 유지).

    Raises:
        TypeError: 입력값이 Pandas Series가 아닐 경우.
    """
    # import pandas as pd
    # import numpy as np
    # import logging
    from sklearn.preprocessing import StandardScaler

    logging.info(f"✅ [normalizing] 표준화 시작 - Column: {series.name}")

    if not isinstance(series, pd.Series):
        logging.error("❌입력값이 pandas Series가 아닙니다!")
        raise TypeError("입력값은 pandas Series여야 합니다.")

    scaler = StandardScaler()

    # 1차원 데이터를 2차원 배열로 변환 후 표준화
    result_array = scaler.fit_transform(series.values.reshape(-1, 1)).flatten()

    # 원본 인덱스 및 컬럼명 유지
    result_series = pd.Series(result_array, index=series.index, name=series.name)

    logging.info(f"✅ [normalizing] 표준화 완료 - 결과 샘플: {result_series.head(3).tolist()}")

    return result_series

### 원-핫 인코딩

In [3]:
def one_hot_encoding(series, delimiter="/"):
    """
    주어진 Pandas Series에 대해 지정된 구분자(delimiter)를 기준으로 원-핫 인코딩을 수행합니다.

    Parameters:
        series (pd.Series): 원본 문자열 데이터. 각 셀에 여러 값이 포함되어 있을 수 있음.
        delimiter (str): 값들 간의 구분자 (기본값: "/").

    Returns:
        pd.DataFrame: 원-핫 인코딩 결과를 포함하는 DataFrame.

    Raises:
        TypeError: 입력값이 Pandas Series가 아닐 경우.
        ValueError: Series의 데이터 타입이 문자열이 아닐 경우.
    """
    # import pandas as pd
    # import logging

    logging.info(f"✅ [one_hot_encoding] 시작 - Column: {series.name}")

    if not isinstance(series, pd.Series):
        logging.error("❌ 입력값이 pandas Series가 아닙니다!")
        raise TypeError("❌ 입력값은 pandas Series여야 합니다!")

    # 문자열 타입 확인
    if not series.dtype == "object":
        logging.error("❌ 원-핫 인코딩을 위해 Series는 문자열 타입이어야 합니다!")
        raise ValueError("❌ 원-핫 인코딩을 위해 Series는 문자열 타입이어야 합니다!")

    # 문자열을 delimiter로 분리하여 dummies 생성
    result_df = series.str.get_dummies(sep=delimiter)

    # 빈 문자열 컬럼이 생성될 경우 제거
    if "" in result_df.columns:
        result_df = result_df.drop(columns="")

    # 기존 컬럼명을 접두사로 추가하여 결과 DataFrame 생성
    result_df = result_df.add_prefix(f"{series.name}_")

    # ✅ 결과 컬럼 수 & 컬럼 리스트 출력
    logging.info(f"✅ [one_hot_encoding] 완료 - 생성된 컬럼 수: {len(result_df.columns)}, 컬럼 목록: {result_df.columns.tolist()}")

    return result_df

### 텍스트 임베딩

In [4]:
def load_bge_model(model_name="BAAI/bge-m3"):
    """
    지정된 모델 이름의 BGE-M3 모델과 토크나이저를 로드합니다.

    Parameters:
        model_name (str, optional): 사용할 모델 이름 (기본값: "BAAI/bge-m3").

    Returns:
        tuple: (tokenizer, model, device) - 로드된 토크나이저, 모델, 실행 디바이스 (GPU/CPU).

    Raises:
        RuntimeError: 모델 로드 중 오류가 발생한 경우.
    """
    # import logging
    import torch
    from transformers import AutoTokenizer, AutoModel

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"✅ [load_bge_model] 모델 로드 시작: {model_name}")

    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModel.from_pretrained(model_name)
        model.to(device)  # 모델을 GPU/CPU에 로드
        model.eval()  # 평가 모드 설정
        logging.info(f"✅ [load_bge_model] 모델 '{model_name}'이(가) {device}에서 성공적으로 로드됨")

    except Exception as e:
        logging.error(f"❌ 모델 로드 중 오류 발생: {e}")
        raise RuntimeError(f"❌ 모델 로드 중 오류 발생: {e}")

    return tokenizer, model, device

In [5]:
def get_embedding_vector(texts, tokenizer, model, device, max_length=512):
    """
    텍스트 리스트를 받아 BGE-M3 모델을 사용하여 임베딩 벡터 배열을 생성합니다.

    Parameters:
        texts (list of str): 임베딩할 텍스트들의 리스트.
        tokenizer: 로드된 토크나이저.
        model: 로드된 임베딩 모델.
        device: 실행 디바이스 (GPU/CPU).
        max_length (int, optional): 최대 토큰 길이 (기본값: 512).

    Returns:
        np.ndarray: (배치 크기, 임베딩 차원) 형태의 임베딩 벡터 배열.

    Raises:
        TypeError: texts가 문자열 리스트가 아닐 경우.
    """
    # import numpy as np
    # import pandas as pd
    import torch
    import logging

    logging.info(f"✅ [get_embedding_vector] 텍스트 개수: {len(texts)}")

    if not isinstance(texts, list):
        logging.error("❌ 입력값이 문자열 리스트가 아닙니다!")
        raise TypeError("❌ 입력값은 문자열 리스트여야 합니다!")

    if not texts:
        logging.warning("⚠️ [get_embedding_vector] 빈 리스트 입력됨. 빈 배열 반환.")
        return np.array([])

    # 결측값을 빈 문자열로 변환
    texts = [str(text) if pd.notnull(text) else "" for text in texts]

    # 텍스트 토큰화 및 모델 입력 데이터 준비
    inputs = tokenizer(texts, return_tensors="pt", truncation=True, padding=True, max_length=max_length)
    inputs = {key: value.to(device) for key, value in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)

    # CLS 토큰 임베딩 추출
    embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()

    logging.info(f"✅ [get_embedding_vector] 임베딩 완료 - 결과 shape: {embeddings.shape}")

    return embeddings

In [6]:
def embedding(series, tokenizer, model, device, batch_size=32):
    """
    주어진 Pandas Series의 텍스트를 배치 단위로 임베딩하여,
    각 텍스트에 해당하는 임베딩 벡터 리스트를 반환합니다.

    Parameters:
        series (pd.Series): 임베딩할 텍스트 데이터가 포함된 컬럼.
        tokenizer: 로드된 토크나이저.
        model: 로드된 임베딩 모델.
        device: 실행 디바이스 (GPU/CPU).
        batch_size (int, optional): 배치 크기 (기본값: 32).

    Returns:
        pd.Series: 각 텍스트의 임베딩 벡터(리스트)를 포함하는 Series.

    Raises:
        TypeError: 입력값이 Pandas Series가 아닐 경우.
    """
    # import numpy as np
    # import pandas as pd
    # import logging
    import torch
    from tqdm import tqdm

    logging.info(f"✅ [embedding] 시작 - Column: {series.name}, 길이: {len(series)}")

    if not isinstance(series, pd.Series):
        logging.error("❌ 입력 값이 Pandas Series가 아닙니다!")
        raise TypeError("❌ 입력 값은 Pandas Series여야 합니다!")

    if series.empty:
        logging.warning("⚠️ [embedding] 빈 Series 입력됨. 빈 결과 반환.")
        return pd.Series([], index=series.index, dtype=object)

    # 결측값을 빈 문자열로 대체
    series = series.fillna("")

    embeddings = []
    num_batches = (len(series) + batch_size - 1) // batch_size

    for i in tqdm(range(0, len(series), batch_size), total=num_batches, desc="임베딩 진행 중"):
        batch_texts = series.iloc[i:i+batch_size].tolist()
        batch_embeddings = get_embedding_vector(batch_texts, tokenizer, model, device)

        if batch_embeddings is None or len(batch_embeddings) == 0:
            logging.warning(f"⚠️ [embedding] {i}번째 배치에서 빈 결과 발생. 빈 배열 추가.")
            batch_embeddings = np.zeros((len(batch_texts), model.config.hidden_size))

        embeddings.extend(batch_embeddings)

    # 임베딩 벡터를 리스트 형식으로 저장
    result_series = pd.Series([emb.tolist() for emb in embeddings], index=series.index, dtype=object)

    logging.info("✅ [embedding] 완료")

    return result_series

### 차원축소

In [7]:
# ============================================================
#  SVD 차원 축소 함수: Truncated SVD를 사용한 차원 축소
# ============================================================
def dimension_reducing_SVD(df, prefix, components_n=2, random_state=42):
    """
    주어진 DataFrame을 Truncated SVD를 사용하여 차원 축소합니다.

    Parameters:
        df (pd.DataFrame): 차원 축소할 입력 데이터.
        prefix (str): 결과 컬럼 이름에 사용할 접두사.
        components_n (int, optional): 목표 차원 수 (기본값: 2).
        random_state (int, optional): SVD의 랜덤 시드 값 (기본값: 42).

    Returns:
        pd.DataFrame: SVD 차원 축소 후 결과 DataFrame (원본 인덱스 유지).

    Raises:
        TypeError: 입력값이 Pandas DataFrame이 아닐 경우.
        ValueError: components_n이 1보다 작을 경우.
    """
    # import pandas as pd
    # import logging
    from sklearn.decomposition import TruncatedSVD

    logging.info(f"✅ [dimension_reducing_SVD] 시작 - 입력 DataFrame shape: {df.shape}")

    # 입력 타입 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌ 입력값은 pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력값은 pandas DataFrame이어야 합니다!")

    # components_n 값 검증
    if components_n < 1:
        logging.error("❌ components_n 값은 1 이상이어야 합니다!")
        raise ValueError("❌ components_n 값은 1 이상이어야 합니다!")

    if components_n > df.shape[1]:
        components_n = df.shape[1]
        logging.warning("⚠️ [PCA] components_n이 입력 차원보다 크므로 입력 차원으로 설정합니다.")

    # Truncated SVD 실행
    svd = TruncatedSVD(n_components=components_n, random_state=random_state)
    reduced_data = svd.fit_transform(df)

    # 결과 컬럼명 생성
    reduced_columns = [f"SVD_{prefix}_{i+1}" for i in range(components_n)]

    # 결과 DataFrame 생성 (원본 인덱스 유지)
    result_df = pd.DataFrame(reduced_data, columns=reduced_columns, index=df.index)

    logging.info(f"✅ [dimension_reducing_SVD] 완료 - 결과 shape: {result_df.shape}")

    return result_df

In [8]:
# ============================================================
#  PCA 차원 축소 함수: PCA를 사용한 차원 축소
# ============================================================
def dimension_reducing_PCA(df, prefix, components_n=2):
    """
    주어진 DataFrame을 PCA를 사용하여 차원 축소합니다.

    Parameters:
        df (pd.DataFrame): 차원 축소할 입력 데이터.
        prefix (str): 결과 컬럼 이름에 사용할 접두사.
        components_n (int, optional): 목표 차원 수 (기본값: 2).

    Returns:
        pd.DataFrame: PCA 차원 축소 후 결과 DataFrame (원본 인덱스 유지).

    Raises:
        TypeError: 입력값이 Pandas DataFrame이 아닐 경우.
        ValueError: components_n이 1보다 작을 경우.
    """
    # import pandas as pd
    # import logging
    from sklearn.decomposition import PCA

    logging.info(f"✅ [dimension_reducing_PCA] 시작 - 입력 DataFrame shape: {df.shape}")

    # 입력 타입 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌ 입력값은 pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력값은 pandas DataFrame이어야 합니다!")

    # components_n 값 검증
    if components_n < 1:
        logging.error("❌ components_n 값은 1 이상이어야 합니다!")
        raise ValueError("❌ components_n 값은 1 이상이어야 합니다!")

    if components_n > df.shape[1]:
        components_n = df.shape[1]
        logging.warning("⚠️ [PCA] components_n이 입력 차원보다 크므로 입력 차원으로 설정합니다.")

    # PCA 실행
    pca = PCA(n_components=components_n)
    reduced_data = pca.fit_transform(df)

    # 결과 컬럼명 생성
    reduced_columns = [f"PCA_{prefix}_{i+1}" for i in range(components_n)]

    # 설명된 분산 계산
    explained_variance = sum(pca.explained_variance_ratio_) * 100

    # 결과 DataFrame 생성 (원본 인덱스 유지)
    result_df = pd.DataFrame(reduced_data, columns=reduced_columns, index=df.index)

    logging.info(f"✅ [dimension_reducing_PCA] 완료 - 결과 shape: {result_df.shape}, 설명된 분산: {explained_variance:.2f}%")

    return result_df

In [9]:
# ============================================================
# UMAP 차원 축소 함수: UMAP을 사용한 비선형 차원 축소
# ============================================================
def dimension_reducing_UMAP(df, prefix, components_n=2, n_neighbors=15, metric="euclidean", random_state=42):
    """
    주어진 DataFrame을 UMAP을 사용하여 비선형 차원 축소합니다.

    Parameters:
        df (pd.DataFrame): 차원 축소할 입력 데이터.
        prefix (str): 결과 컬럼 이름에 사용할 접두사.
        components_n (int, optional): 목표 차원 수 (기본값: 2).
        n_neighbors (int, optional): UMAP의 이웃 수 (기본값: 15).
        metric (str, optional): 거리 측정 방식 (기본값: "euclidean").
        random_state (int, optional): 랜덤 시드 값 (기본값: 42).

    Returns:
        pd.DataFrame: UMAP 차원 축소 후 결과 DataFrame (원본 인덱스 유지).

    Raises:
        TypeError: 입력값이 Pandas DataFrame이 아닐 경우.
        ValueError: components_n이 1보다 작을 경우.
    """
    # import pandas as pd
    # import logging
    import umap

    logging.info(f"✅[dimension_reducing_UMAP] 시작 - 입력 DataFrame shape: {df.shape}")

    # 입력 타입 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌입력값은 pandas DataFrame이어야 합니다!")
        raise TypeError("❌입력값은 pandas DataFrame이어야 합니다!")

    # components_n 값 검증
    if components_n < 1:
        logging.error("❌components_n 값은 1 이상이어야 합니다!")
        raise ValueError("❌components_n 값은 1 이상이어야 합니다!")

    if components_n > df.shape[1]:
        components_n = df.shape[1]
        logging.warning("⚠️ [PCA] components_n이 입력 차원보다 크므로 입력 차원으로 설정합니다.")

    # UMAP 실행
    umap_model = umap.UMAP(n_components=components_n, n_neighbors=n_neighbors, metric=metric, random_state=random_state)
    reduced_data = umap_model.fit_transform(df)

    # 결과 컬럼명 생성
    reduced_columns = [f"UMAP_{prefix}_{i+1}" for i in range(components_n)]

    # 결과 DataFrame 생성 (원본 인덱스 유지)
    result_df = pd.DataFrame(reduced_data, columns=reduced_columns, index=df.index)

    logging.info(f"✅[dimension_reducing_UMAP] 완료 - 결과 shape: {result_df.shape}")

    return result_df

### 데이터 분리, 가공

In [10]:
# ============================================================
#   데이터 분리 함수: '예가범위' 기준 그룹 분류
# ============================================================
def separate_data(df):
    """
    '예가범위' 컬럼을 기준으로 데이터를 세 그룹(예: range3, range2, others)으로 분류합니다.

    Parameters:
        df (pd.DataFrame): 입력 데이터. '예가범위' 컬럼이 포함되어 있어야 합니다.

    Returns:
        dict: {'range3': DataFrame, 'range2': DataFrame, 'others': DataFrame} 형태로 그룹별 DataFrame 반환.

    Raises:
        TypeError: 입력 데이터가 Pandas DataFrame이 아닐 경우.
        KeyError: '예가범위' 컬럼이 DataFrame에 없을 경우.
    """
    # import pandas as pd
    # import logging

    logging.info(f"✅[separate_data] 시작 - 입력 DataFrame shape: {df.shape}")

    # 입력 타입 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌입력 데이터는 pandas DataFrame이어야 합니다!")
        raise TypeError("❌입력 데이터는 pandas DataFrame이어야 합니다!")

    # '예가범위' 컬럼 존재 여부 확인
    if "예가범위" not in df.columns:
        logging.error("❌'예가범위' 컬럼이 존재하지 않습니다!")
        raise KeyError("❌'예가범위' 컬럼이 존재하지 않습니다!")

    # 지정한 예가범위 값을 기준으로 그룹화
    range_values = ["+3%~-3%", "+2%~-2%"]
    range3_df = df[df["예가범위"] == "+3%~-3%"]
    range2_df = df[df["예가범위"] == "+2%~-2%"]
    others_df = df[~df["예가범위"].isin(range_values)]

    logging.info(f"✅[separate_data] 분류 완료 - range3: {len(range3_df)}, range2: {len(range2_df)}, others: {len(others_df)}")

    return {"range3": range3_df, "range2": range2_df, "others": others_df}

In [11]:
# ============================================================
#   데이터 재구성 함수: 공고 데이터와 투찰 데이터 병합 및 정리
# ============================================================
def restructure_data(df1, df2):
    """
    공고 데이터와 투찰 데이터를 정리 한 후 병합 및 재구성합니다.

    Parameters:
        df1 (pd.DataFrame): 공고 데이터.
        df2 (pd.DataFrame): 투찰 데이터.

    Returns:
        pd.DataFrame: 병합된 최종 데이터.

    Raises:
        TypeError: 입력 데이터가 Pandas DataFrame이 아닐 경우.
        KeyError: 필요한 컬럼이 존재하지 않을 경우.
    """
    # import pandas as pd
    # import logging

    logging.info("✅ [restructure_data] 데이터 재구성 시작...")

    # 입력 데이터 검증
    if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
        logging.error("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")

    # 공고번호 컬럼 확인
    if "공고번호" not in df1.columns or "공고번호" not in df2.columns:
        logging.error("❌ '공고번호' 컬럼이 두 데이터프레임에 모두 존재해야 합니다!")
        raise KeyError("❌ '공고번호' 컬럼이 누락되었습니다.")

    notices_df = df1.copy()
    bids_df = df2.copy()

    # 투찰 데이터 사정률 변환 및 정리
    bids_df["사정률"] = parse_sajeong_rate(bids_df["기초대비 사정률(%)"])
    bids_df.dropna(subset=["사정률"], inplace=True)
    bids_df = bids_df[["공고번호", "사정률"]]
    bids_df = group_to_list(bids_df, "공고번호", "사정률")

    # 필요한 컬럼 정의 및 존재 여부 확인
    required_columns = ['공고번호', '공고제목', '발주처(수요기관)', '지역제한', '기초금액',
                        '예정가격', '예가범위', 'A값', '투찰률(%)', '참여업체수', '공고구분표시']

    missing_columns = [col for col in required_columns if col not in df1.columns]
    if missing_columns:
        logging.error(f"❌ 공고 데이터에서 누락된 컬럼: {missing_columns}")
        raise KeyError(f"❌ 공고 데이터에서 누락된 컬럼: {missing_columns}")

    notices_df = notices_df[required_columns]

    # 결측값 처리
    notices_df.dropna(subset=["예가범위"], inplace=True)
    notices_df["투찰률(%)"] = notices_df["투찰률(%)"].fillna(notices_df["투찰률(%)"].mean(numeric_only=True))
    notices_df["공고구분표시"] = notices_df["공고구분표시"].fillna("")

    # 문자열 컬럼 정리 (공백 제거)
    str_columns = ["예가범위", "발주처(수요기관)", "지역제한", "공고구분표시"]
    for col in str_columns:
        notices_df[col] = notices_df[col].astype(str).str.replace(r"\s+", "", regex=True)

    # 데이터 병합
    merged_data = pd.merge(notices_df, bids_df, on="공고번호", how="inner")

    logging.info(f"✅ [restructure_data] 데이터 재구성 완료! - 데이터 shape: {merged_data.shape}")

    return merged_data

In [12]:
# ============================================================
#   Series를 DataFrame으로 변환 함수: 리스트 확장
# ============================================================
def series_to_dataframe(series):
    """
    Pandas Series의 각 원소가 리스트인 경우, 이를 개별 컬럼으로 확장하여 DataFrame으로 변환합니다.

    Parameters:
        series (pd.Series): 변환할 데이터.

    Returns:
        pd.DataFrame: 각 원소가 개별 컬럼으로 확장된 DataFrame.

    Raises:
        TypeError: 입력 데이터가 Pandas Series가 아닐 경우.
    """
    # import pandas as pd
    # import numpy as np
    # import logging

    logging.info(f"✅[series_to_dataframe] 시작 - 입력 Series 길이: {len(series)}")

    if not isinstance(series, pd.Series):
        logging.error("❌입력 데이터는 Pandas Series이어야 합니다!")
        raise TypeError("❌입력 데이터는 Pandas Series이어야 합니다!")

    # 컬럼명 설정 (기본값: "feature" 사용)
    column_name = series.name if series.name else "feature"

    # DataFrame 변환
    expanded_df = pd.DataFrame(series.tolist())

    # 컬럼명 지정
    expanded_df.columns = [f"{column_name}_{i}" for i in range(expanded_df.shape[1])]

    logging.info(f"✅[series_to_dataframe] 완료 - 결과 shape: {expanded_df.shape}")

    return expanded_df

In [13]:
# ============================================================
#   그룹별 리스트 생성 함수: 지정 컬럼 그룹화 후 리스트화
# ============================================================
def group_to_list(df, group_col, value_col):
    """
    지정된 그룹 컬럼을 기준으로 데이터를 그룹화한 후, 해당 그룹의 값을 리스트로 집계합니다.

    Parameters:
        df (pd.DataFrame): 입력 데이터.
        group_col (str): 그룹화할 컬럼 이름 (예: '공고번호').
        value_col (str): 집계할 값이 있는 컬럼 이름 (예: '사정률(%)').

    Returns:
        pd.DataFrame: 그룹과 해당 값의 리스트를 포함한 DataFrame.

    Raises:
        TypeError: 입력 데이터가 Pandas DataFrame이 아닐 경우.
        KeyError: group_col 또는 value_col이 DataFrame에 존재하지 않을 경우.
    """
    # import pandas as pd
    # import logging

    logging.info(f"✅ [group_to_list] 시작 - 그룹 컬럼: {group_col}, 값 컬럼: {value_col}")

    # 입력 타입 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")

    # group_col, value_col 존재 여부 확인
    if group_col not in df.columns or value_col not in df.columns:
        logging.error(f"❌ 컬럼 '{group_col}' 또는 '{value_col}'이(가) 데이터프레임에 존재하지 않습니다!")
        raise KeyError(f"❌ 컬럼 '{group_col}' 또는 '{value_col}'이(가) 존재하지 않습니다.")

    # 결측값 제거하여 그룹화 오류 방지
    df_filtered = df.dropna(subset=[value_col])

    # 그룹화 후 리스트 변환
    grouped_df = df_filtered.groupby(group_col)[value_col].agg(list).reset_index()

    logging.info(f"✅ [group_to_list] 완료 - 결과 shape: {grouped_df.shape}")

    return grouped_df

### taget 데이터 처리

In [14]:
# ============================================================
#   기초대비 사정률 파싱 함수: 문자열에서 소수점 숫자 추출
# ============================================================
def parse_sajeong_rate(series):
    """
    '기초대비 사정률(%)' 문자열에서 첫 번째 소수 형태의 숫자를 추출하여 float으로 변환합니다.

    Parameters:
        series (pd.Series): 변환할 문자열 데이터.

    Returns:
        pd.Series: 추출된 숫자를 float 형식으로 포함하는 Series.

    Raises:
        TypeError: 입력값이 Pandas Series가 아닐 경우.
    """
    # import pandas as pd
    # import logging

    logging.info(f"✅ [parse_sajeong_rate] 시작 - Series 길이: {len(series)}")

    # 입력 데이터 검증
    if not isinstance(series, pd.Series):
        logging.error("❌ 입력 데이터는 Pandas Series이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 Pandas Series이어야 합니다!")

    # 결측값 제거
    series = series.dropna()

    # 정규식으로 첫 번째 소수 형태 숫자 추출
    result_series = series.astype(str).str.extract(r'(-?\d+\.\d+)')[0]

    # 숫자로 변환 (변환 실패 시 NaN 처리)
    result_series = pd.to_numeric(result_series, errors="coerce")

    logging.info(f"✅ [parse_sajeong_rate] 완료 - 결과 샘플: {result_series.head(3).tolist()}")

    return result_series

In [15]:
# ============================================================
#    예가범위 값 매핑 함수: 첫 행의 예가범위로 범위 값 결정
# ============================================================
def parse_range_level(range_str):
    """
    '예가범위' 값을 읽어, 미리 정의된 매핑에 따라 정수 값을 반환합니다.

    Parameters:
        range_str (str): 예가범위 문자열.

    Returns:
        int: 매핑된 범위 값 (예: 3, 2 또는 기본값 0).

    Raises:
        TypeError: 입력값이 문자열이 아닐 경우.
    """
    # import logging

    logging.info(f"✅ [parse_range_level] 시작 - 입력값: {range_str}")

    # 입력 데이터 검증
    if not isinstance(range_str, str):
        logging.error("❌ 입력 데이터는 문자열이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 문자열이어야 합니다!")

    # 미리 정의된 매핑 딕셔너리: 예가범위 문자열과 대응되는 정수 값
    range_mapping = {"+3%~-3%": 3, "+2%~-2%": 2}

    # 공백 제거 후 매핑 검색
    range_level = range_mapping.get(range_str.strip(), 0)

    logging.info(f"✅ [parse_range_level] 완료 - 매핑된 값: {range_level}")

    return range_level

In [16]:
# ==================================================================
#   구간 하한값, 상한값 추출 함수: 예가 범위에서 시작점과 끝점 추출
# ==================================================================
def extract_bounds(range_str):
    """
    주어진 '예가범위' 문자열에서 두 개의 퍼센트 값을 추출하여,
    이들 중 최소값을 Lower, 최대값을 Upper로 반환합니다.

    Parameters:
        s (str): 예가범위 문자열 (예: "+3% ~ -3%").

    Returns:
        tuple(float, float): (Lower, Upper) - 추출된 숫자 중 최소값과 최대값.

    Raises:
        TypeError: 입력값이 문자열이 아닐 경우.
        ValueError: 입력값이 유효하지 않거나, 두 개의 퍼센트 값 추출에 실패한 경우.
    """
    # import pandas as pd
    # import re
    # import logging

    logging.info(f"✅ [extract_bounds] 시작 - 입력값: '{range_str}'")

    # 입력값 검증
    if not isinstance(range_str, str):
        logging.error("❌ 입력 데이터는 문자열이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 문자열이어야 합니다!")

    if pd.isna(range_str) or range_str.strip() == "":
        logging.error(f"❌ 유효하지 않은 예가범위 값입니다: '{range_str}'")
        raise ValueError(f"❌ 유효하지 않은 예가범위 값입니다: '{range_str}'")

    range_str = range_str.strip()

    # 정규식으로 숫자(퍼센트 기호 포함) 추출
    matches = re.findall(r"([-+]?\d+(?:\.\d+)?)%", range_str)

    if len(matches) != 2:
        logging.error(f"❌ 예가범위에서 시작점과 끝점을 찾을 수 없음: '{range_str}'")
        raise ValueError(f"❌ 예가범위에서 시작점과 끝점을 찾을 수 없음: '{range_str}'")

    start = float(matches[0])
    end = float(matches[1])

    lower_bound = min(start, end)
    upper_bound = max(start, end)

    logging.info(f"✅ [extract_bounds] 완료 - 결과 (Lower, Upper): ({lower_bound}, {upper_bound})")

    return lower_bound, upper_bound

In [17]:
# ============================================================
#   구간 경계값 생성 함수: 예가범위에 따라 bins 생성
# ============================================================
def generate_bins(lower_bound, upper_bound):
    """
    주어진 하한(lower_bound)과 상한(upper_bound)을 기준으로 여러 개의 구간(bin) 경계값을 생성합니다.

    Parameters:
        lower_bound (float): 구간의 최소값.
        upper_bound (float): 구간의 최대값.

    Returns:
        dict: 다양한 interval 값(10, 20, 50, 100)에 대해 생성된 bins 딕셔너리.

    Raises:
        TypeError: 입력값이 숫자가 아닐 경우.
        ValueError: 중복된 구간 경계값이 발생할 경우.
    """
    # import numpy as np
    # import logging
    from collections import Counter

    logging.info(f"✅ [generate_bins] 시작 - lower_bound: {lower_bound}, upper_bound: {upper_bound}")

    # 입력값 검증
    if not isinstance(lower_bound, (int, float)) or not isinstance(upper_bound, (int, float)):
        logging.error("❌ 입력값은 숫자(float, int)여야 합니다!")
        raise TypeError("❌ 입력값은 숫자(float, int)여야 합니다!")

    if lower_bound >= upper_bound:
        logging.error("❌ lower_bound 값은 upper_bound보다 작아야 합니다!")
        raise ValueError("❌ lower_bound 값은 upper_bound보다 작아야 합니다!")

    intervals = [10, 20, 50, 100]
    bins_dict = {}

    for interval in intervals:
        # np.linspace로 구간 경계값 생성 (구간 수 = interval + 1)
        bins = np.linspace(lower_bound, upper_bound, num=interval + 1).tolist()

        # 중복 검사 (set을 활용한 최적화)
        if len(set(bins)) < len(bins):
            duplicates = [item for item, count in Counter(bins).items() if count > 1]
            logging.error(f"⚠️ 중복된 구간 경계값 발생! 중복된 값: {set(duplicates)}")
            raise ValueError(f"⚠️ 중복된 구간 경계값이 발생했습니다! 중복된 값: {set(duplicates)}")

        # 첫 번째와 마지막 경계를 -∞, ∞로 설정
        bins[0] = -np.inf
        bins[-1] = np.inf
        bins_dict[interval] = bins

        logging.info(f"✅ [generate_bins] Interval {interval}: Bins 생성 완료")

    logging.info("✅ [generate_bins] 완료 - 모든 구간 생성 완료")

    return bins_dict

In [18]:
# ============================================================
#   CSV에서 구간 경계값 불러오기 함수
# ============================================================
def load_bins(range_level):
    """
    지정된 range_level (2 또는 3)에 해당하는 CSV 파일에서 구간 경계값을 불러와,
    이를 딕셔너리 형태로 반환합니다.

    Parameters:
        range_level (int): 범위 값 (허용값: 2 또는 3).

    Returns:
        dict: {구간 개수: 구간 경계값 리스트} 형태의 딕셔너리.

    Raises:
        TypeError: range_level이 정수가 아닐 경우.
        ValueError: range_level이 2 또는 3이 아닐 경우.
        FileNotFoundError: 해당 CSV 파일을 찾을 수 없는 경우.
    """
    # import os
    # import pandas as pd
    # import numpy as np
    # import logging
    from pathlib import Path

    logging.info(f"✅ [load_bins] 시작 - range_level: {range_level}")

    # 입력값 검증
    if not isinstance(range_level, int):
        logging.error("❌ range_level은 정수(int)여야 합니다!")
        raise TypeError("❌ range_level은 정수(int)여야 합니다!")

    if range_level not in {2, 3}:
        logging.error("❌ 입력값 'range_level'은 2 또는 3이어야 합니다!")
        raise ValueError("❌ 입력값 'range_level'은 2 또는 3이어야 합니다!")

    intervals = [10, 20, 50, 100]
    bins_dict = {}

    for interval in intervals:
        csv_filename = f"intervals_{interval}_{range_level}.csv"
        intervals_csv_path = Path("intervals") / csv_filename

        logging.info(f"🔍 [load_bins] CSV 파일 경로: {intervals_csv_path}")

        if not intervals_csv_path.exists():
            logging.error(f"❌ 구간 경계 파일이 존재하지 않습니다: {intervals_csv_path}")
            raise FileNotFoundError(f"❌ 구간 경계 파일이 존재하지 않습니다: {intervals_csv_path}")

        # CSV 파일 읽기
        interval_df = pd.read_csv(intervals_csv_path)

        if "상한값" not in interval_df.columns:
            logging.error(f"❌ CSV 파일에서 '상한값' 컬럼을 찾을 수 없습니다: {intervals_csv_path}")
            raise KeyError(f"❌ CSV 파일에서 '상한값' 컬럼을 찾을 수 없습니다.")

        # '상한값' 정렬 후 리스트 변환
        upper_bounds = interval_df["상한값"].iloc[:-1].sort_values().tolist()
        upper_bounds = [val * 100 for val in upper_bounds]  # 퍼센트 변환

        # 첫 번째와 마지막 경계를 -∞, ∞로 설정
        bins = [-np.inf] + upper_bounds + [np.inf]
        bins_dict[interval] = bins

        logging.info(f"✅ [load_bins] {interval} 구간: {bins}")

    logging.info("✅ [load_bins] 완료 - 모든 구간 로드 완료")

    return bins_dict

In [19]:
# ============================================================
#   데이터 구간화 및 비율 계산 함수
# ============================================================
def data_to_target(data, bins):
    """
    데이터를 구간(bins)에 따라 분할하고, 각 구간에 속하는 비율을 빠르게 계산하는 최적화 함수.

    Parameters:
        data (list, np.array, pd.Series): 구간화할 숫자 데이터.
        bins (list): 구간 경계값 리스트 (최소 2개 이상의 값 필요).

    Returns:
        dict: {구간 레이블: 해당 구간 비율} 형태의 딕셔너리.

    Raises:
        TypeError: data가 리스트, NumPy 배열, Pandas Series가 아닐 경우.
        ValueError: bins가 최소 2개 이상의 값을 가진 리스트가 아닐 경우.
    """
    # import numpy as np
    # import pandas as pd
    # import logging

    logging.info("✅ [data_to_target] 시작")

    # 입력값 검증
    if not isinstance(data, (list, np.ndarray, pd.Series)):
        logging.error("❌ 입력 데이터는 리스트, NumPy 배열 또는 Pandas Series여야 합니다!")
        raise TypeError("❌ 입력 데이터는 리스트, NumPy 배열 또는 Pandas Series여야 합니다!")

    if not isinstance(bins, list) or len(bins) < 2:
        logging.error("❌ 입력값 'bins'는 최소 2개 이상의 값을 가진 리스트여야 합니다!")
        raise ValueError("❌ 입력값 'bins'는 최소 2개 이상의 값을 가진 리스트여야 합니다!")

    if len(data) == 0:
        logging.warning("⚠️ [data_to_target] 빈 데이터 입력됨. 빈 딕셔너리 반환.")
        return {}

    # 데이터를 NumPy 배열로 변환하여 연산 속도 향상
    data = np.asarray(data, dtype=float)

    # numpy.digitize() 사용하여 구간화
    bin_indices = np.digitize(data, bins, right=False) - 1
    bin_indices = np.clip(bin_indices, 0, len(bins) - 2)  # 범위를 벗어나는 값 보정

    # numpy.bincount() 활용하여 개수 계산
    bin_counts = np.bincount(bin_indices, minlength=len(bins) - 1)

    # 비율 계산
    total_count = len(data)
    ratios = bin_counts / total_count if total_count > 0 else bin_counts

    # 구간 레이블 생성
    labels = [f"{i+1:03}" for i in range(len(bins) - 1)]

    result = dict(zip(labels, ratios))

    logging.info(f"✅ [data_to_target] 완료 - 결과: {result}")

    return result

In [20]:
# ============================================================
#   데이터 구간화 및 비율 계산 (여러 bins 사용)
# ============================================================
def process_row_fixed_bins(data, bins_dict):
    """
    여러 개의 bins를 사용하여 데이터를 구간화하고 비율을 계산하는 함수.

    Parameters:
        data (list, np.array, pd.Series): 구간화를 적용할 숫자 데이터.
        bins_dict (dict): 미리 정의된 구간 경계값 딕셔너리.

    Returns:
        dict: {구간_레이블: 비율} 형태의 딕셔너리.

    Raises:
        TypeError: data가 리스트, NumPy 배열, Pandas Series가 아닐 경우.
        TypeError: bins_dict가 딕셔너리가 아닐 경우.
    """
    # import numpy as np
    # import logging

    logging.info("✅ [process_row_fixed_bins] 시작")

    # 입력값 검증
    if not isinstance(data, (list, np.ndarray, pd.Series)):
        logging.error("❌ 입력 데이터는 리스트, NumPy 배열 또는 Pandas Series여야 합니다!")
        raise TypeError("❌ 입력 데이터는 리스트, NumPy 배열 또는 Pandas Series여야 합니다!")

    if not isinstance(bins_dict, dict):
        logging.error("❌ 입력값 'bins_dict'는 딕셔너리여야 합니다!")
        raise TypeError("❌ 입력값 'bins_dict'는 딕셔너리여야 합니다!")

    if not data:
        logging.warning("⚠️ [process_row_fixed_bins] 빈 데이터 입력됨. 빈 딕셔너리 반환.")
        return {}

    if not bins_dict:
        logging.warning("⚠️ [process_row_fixed_bins] 빈 bins_dict 입력됨. 빈 딕셔너리 반환.")
        return {}

    row_result = {}

    # data_to_target()을 반복 호출하지 않고, dict.update() 사용
    for bin_size, bins in bins_dict.items():
        bin_counts = data_to_target(data, bins)
        row_result.update({f"{bin_size:03}_{key}": value for key, value in bin_counts.items()})

    logging.info(f"✅ [process_row_fixed_bins] 완료 - 결과: {row_result}")

    return row_result

In [21]:
# def calculate_target(df):
#     # ✅ 예가범위 레벨 부여
#     range_level = parse_range_level(df["예가범위"].iloc[0])

#     df_bins = pd.DataFrame()  # ✅ 빈 DataFrame 생성

#     if range_level == 3 or range_level == 2:

#         bins_dict = load_bins(range_level)

#         for row in df.itertuples():
#             # ✅ row_result를 행(row) 형식으로 반환받아 바로 추가
#             row_result = process_row_fixed_bins(getattr(row, "사정률"), bins_dict)

#             # ✅ row_result를 DataFrame 한 줄(`DataFrame([row_result])`)로 변환 후 추가
#             df_bins = pd.concat([df_bins, pd.DataFrame([row_result])], ignore_index=True)

#     elif range_level == 0:
#         for row in df.itertuples():
#             # ✅ 구간 양 끝 값 가져오기
#             lower_bound, upper_bound = extract_bounds(row.예가범위)

#             # ✅ 구간 경계값 생성
#             bins_dict = generate_bins(lower_bound, upper_bound)

#             # ✅ row_result를 행(row) 형식으로 반환받아 바로 추가
#             row_result = process_row_fixed_bins(getattr(row, "사정률"), bins_dict)

#             # ✅ row_result를 DataFrame 한 줄(`DataFrame([row_result])`)로 변환 후 추가
#             df_bins = pd.concat([df_bins, pd.DataFrame([row_result])], ignore_index=True)

#     # ✅ 원본 Data_0과 `df_bins` 병합
#     new_data = pd.concat([df.reset_index(drop=True), df_bins.reset_index(drop=True)], axis=1)

#     return new_data

In [22]:
# ============================================================
#   데이터 구간화 및 목표값 계산
# ============================================================
def calculate_target(df):
    """
    입력 데이터프레임(df)의 '예가범위' 값을 기준으로 데이터를 구간화하고 비율을 계산하는 함수.

    Parameters:
        df (pd.DataFrame): 입력 데이터.

    Returns:
        pd.DataFrame: 구간화된 비율 정보를 포함한 새로운 데이터프레임.

    Raises:
        TypeError: df가 Pandas DataFrame이 아닐 경우.
    """
    # import pandas as pd
    # import logging

    logging.info("✅ [calculate_target] 시작")

    # 입력값 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")

    # 예가범위 레벨 부여
    range_level = parse_range_level(df["예가범위"].iloc[0])

    # df_bins = pd.DataFrame()  # 빈 DataFrame 생성
    rows = []  # 성능 최적화를 위한 리스트 사용

    if range_level in {2, 3}:
        logging.info(f"✅ [calculate_target] 예가범위 레벨: {range_level}")

        bins_dict = load_bins(range_level)

        for row in df.itertuples():
            row_result = process_row_fixed_bins(getattr(row, "사정률"), bins_dict)
            rows.append(row_result)

    elif range_level == 0:
        logging.info("✅ [calculate_target] 예가범위 레벨: 0 ")

        for row in df.itertuples():
            lower_bound, upper_bound = extract_bounds(row.예가범위)
            bins_dict = generate_bins(lower_bound, upper_bound)
            row_result = process_row_fixed_bins(getattr(row, "사정률"), bins_dict)
            rows.append(row_result)

    # 리스트를 DataFrame으로 변환하여 df_bins 생성
    if rows:
        df_bins = pd.DataFrame(rows)

    # 원본 데이터와 df_bins 병합
    result_df = pd.concat([df.reset_index(drop=True), df_bins.reset_index(drop=True)], axis=1)

    logging.info("✅ [calculate_target] 완료 - 데이터 변환 완료")

    return result_df

### 데이터 변환

In [23]:
# ==================================================================================================
#   데이터 전처리 함수: 로그 변환, 정규화, 원-핫 인코딩, 텍스트 임베딩, 차원 축소, 구간 경쟁률 계산
# ==================================================================================================
def process_data(df):
    """
    데이터 전처리 함수: 로그 변환, 정규화, 원-핫 인코딩, 텍스트 임베딩, 차원 축소 수행.

    Parameters:
        df (pd.DataFrame): 원본 데이터프레임.

    Returns:
        pd.DataFrame: 전처리된 데이터프레임.

    Raises:
        TypeError: df가 Pandas DataFrame이 아닐 경우.
        ValueError: 필요한 컬럼이 누락된 경우.
    """
    # import pandas as pd
    # import logging

    logging.info("✅ [process_data] 시작")

    # 입력값 검증
    if not isinstance(df, pd.DataFrame):
        logging.error("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")

    required_columns = [
        '공고번호', '공고제목', '발주처(수요기관)', '지역제한', '기초금액', '예정가격',
        '예가범위', 'A값', '투찰률(%)', '참여업체수', '공고구분표시', "사정률"
    ]

    # 컬럼 존재 여부 확인
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        logging.error(f"❌ 다음 컬럼이 데이터프레임에 없습니다: {missing_columns}")
        raise ValueError(f"❌ 다음 컬럼이 데이터프레임에 없습니다: {missing_columns}")

    # 데이터프레임 복사 (원본 보호)
    copy_df = df.copy()

    # 로그 변환 및 정규화 적용
    numeric_cols = ["기초금액", "예정가격", "투찰률(%)"]
    for col in numeric_cols:
        copy_df[col] = log_transforming(copy_df[col])
        copy_df[f"norm_log_{col}"] = normalizing(copy_df[col])

    # 원-핫 인코딩을 위한 컬럼
    categorical_cols = ["발주처(수요기관)", "지역제한", "공고구분표시"]
    encoded_dfs = [one_hot_encoding(copy_df[col]) for col in categorical_cols]

    # 원본 데이터와 원-핫 인코딩 결과 병합
    copy_df = pd.concat([copy_df] + encoded_dfs, axis=1)

    # 텍스트 임베딩 적용
    logging.info("✅ [process_data] 텍스트 임베딩 모델 로드 중...")
    tokenizer, model, device = load_bge_model(model_name="BAAI/bge-m3")

    logging.info("✅ [process_data] 공고제목 임베딩 수행...")
    copy_df["embedding_공고제목"] = embedding(copy_df["공고제목"].fillna(""), tokenizer, model, device)

    # 차원 축소 수행
    logging.info("✅ [process_data] 임베딩 벡터 차원 축소 중...")
    expanded_df = series_to_dataframe(copy_df["embedding_공고제목"])
    tmp_df = dimension_reducing_PCA(expanded_df, "공고제목", 100)
    reduced_df = dimension_reducing_UMAP(tmp_df, "공고제목", 20)

    # 병합
    copy_df = pd.concat([copy_df.reset_index(drop=True), reduced_df.reset_index(drop=True)], axis=1)

    print(f"🔍 병합 후 결측치가 포함된 행:\n{copy_df[copy_df.isna().any(axis=1)]}")

    # 구간 경쟁률 계산
    logging.info("✅ [process_data] 구간 경쟁률 계산 중...")
    copy_df = calculate_target(copy_df)

    # 최종 결과 반환
    logging.info("✅ [process_data] 완료 - 데이터 전처리 완료")
    return copy_df.copy()

In [24]:
# ============================================================
# 24. 전체 데이터셋 변환 함수: 데이터 클렌징부터 Feature 및 Target 처리까지
# ============================================================
def transform(df1, df2):
    """
    공고 데이터(df1)와 투찰 데이터(df2)를 입력받아, 데이터 클렌징, Feature 처리, Target 처리
    순으로 실행한 후, 최종적으로 세 개의 데이터셋(Dataset_3_df, Dataset_2_df, Dataset_etc_df)을 생성하여 반환합니다.

    Parameters:
        df1 (pd.DataFrame): 공고 데이터를 포함한 DataFrame.
        df2 (pd.DataFrame): 투찰 데이터를 포함한 DataFrame.

    Returns:
        dict: {
            "DataSet_3": DataFrame,  # '예가범위'가 +3%~-3%인 데이터셋
            "DataSet_2": DataFrame,  # '예가범위'가 +2%~-2%인 데이터셋
            "DataSet_etc": DataFrame # '예가범위'가 그 외인 데이터셋
        }

    Raises:
        TypeError: 입력 데이터가 Pandas DataFrame이 아닐 경우.
    """
    # import logging
    # import pandas as pd

    logging.info("✅ [transform] 시작")

    if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
        logging.error("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")
        raise TypeError("❌ 입력 데이터는 Pandas DataFrame이어야 합니다!")

    # 원본 데이터 복사 및 클렌징 수행
    notices_df = df1.copy()
    bids_df = df2.copy()

    # 공고 & 투찰 데이터 재구성
    merged_df = restructure_data(notices_df, bids_df)

    # 예가범위별 분리
    df_dict = separate_data(merged_df)
    df_3 = df_dict.get("range3")
    df_2 = df_dict.get("range2")
    df_etc = df_dict.get("others")

    # 데이터 전처리 (process_data)
    Dataset_3_df = process_data(df_3)
    Dataset_2_df = process_data(df_2)
    Dataset_etc_df = process_data(df_etc)

    Dataset_dict = {
        "DataSet_3": Dataset_3_df,
        "DataSet_2": Dataset_2_df,
        "DataSet_etc": Dataset_etc_df
    }

    logging.info("🎯 [transform] 완료 - 데이터 변환 완료")

    return Dataset_dict

In [25]:
def load(dataset_dict):
    copy_dict = dataset_dict.copy

    DataSet_3 = copy_dict.get("DataSet_3")
    DataSet_2 = copy_dict.get("DataSet_2")
    DataSet_etc = copy_dict.get("DataSet_etc")

    DataSet_3.to_csv("DataSet_3.csv", index=False)
    DataSet_2.to_csv("DataSet_2.csv", index=False)
    DataSet_etc.to_csv("DataSet_etc.csv", index=False)

# SETING

In [26]:
# 기본 라이브러리 로드
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import re
import logging

In [27]:
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [28]:
# 공유 드라이브 이동
%cd "/content/drive/Shareddrives/BidPrice/DataWarehouse"
!ls # 파일 목록 확인

/content/drive/Shareddrives/BidPrice/DataWarehouse
20250302	    catboost_info			notices_processed.csv
Agg_2_output.csv    catprocessed_Modified_2_output.csv	PCA_output.csv
Agg_3_output.csv    catprocessed_Modified_3_output.csv	saved_model
AutogluonModels     DR_Modified_2_output.csv		test_Combined_Modified_X_Range2.csv
backup_traintest    DR_Modified_3_output.csv		test_Combined_Modified_Y_Range2.csv
Basic_2_output.csv  intervals				train_Combined_Modified_X_Range2.csv
Basic_3_output.csv  Modified_2_output.csv		train_Combined_Modified_Y_Range2.csv
bids_processed.csv  Modified_3_output.csv


# 데이터 파이프라인 실행


In [29]:
notices_df = pd.read_csv('notices_processed.csv') # 공고 데이터
bids_df = pd.read_csv('bids_processed.csv') # 투찰 데이터


# 데이터 확인
display(notices_df.head(3))
print("\n 공고 데이터 크기:", notices_df.shape)
print("notices_df.columns:", notices_df.columns.tolist())


display(bids_df.head(3))
print("\n투찰 데이터 크기:", bids_df.shape)
print("투찰 데이터 칼럼:", bids_df.columns.tolist())

Unnamed: 0,공고번호,입찰년도,공고제목,발주처(수요기관),지역제한,기초금액,예정가격,예가범위,A값,투찰률(%),참여업체수,공고구분표시,정답사정률(%)
0,20241209755-00,2024,고속철도 스마트SOC 시스템 감시화면 통신망 구축 공사,한국철도공사,서울/부산/대구/경기/충북/충남/경북,71830000.0,71308550.0,+2% ~ -2%,0.0,87.745,3787.0,기/수의,-0.72596
1,20231103166-00,2023,고속선 관제전화시스템 개량공사,한국철도공사,서울/광주/군위/울산/경기/충북/충남/경북/전북,90830000.0,90890675.0,+2% ~ -2%,0.0,87.745,3709.0,기/수의,0.0668
2,20201137688-00,2020,경부선 영등포역 등 59역 역무자동화설비 신설기타공사,한국철도공사,서울/인천/경기/강원/충남,175530000.0,175586225.0,+2% ~ -2%,0.0,87.745,3429.0,기/수의,0.03203



 공고 데이터 크기: (7160, 13)
notices_df.columns: ['공고번호', '입찰년도', '공고제목', '발주처(수요기관)', '지역제한', '기초금액', '예정가격', '예가범위', 'A값', '투찰률(%)', '참여업체수', '공고구분표시', '정답사정률(%)']


Unnamed: 0,입찰년도,공고번호,순위,사업자 등록번호,업체명,대표,경쟁사 분석,투찰금액,가격점수,예가대비 투찰률(%),기초대비 투찰률(%),기초대비 사정률(%),추첨번호,투찰일시,비고
0,2014,20141010049-00,1,2030130565,이수정보통신,이한수,분석,52431622.0,,87.751,86.98,-0.87170 (99.1283),13 4,2014/10/20 09:56:58,
1,2014,20141010049-00,2,2068111894,(주)아파트피아,곽복동,분석,52433380.0,,87.754,86.983,-0.86838 (99.13162),7 15,2014/10/20 18:15:15,
2,2014,20141010049-00,3,1058132562,주식회사 신보기전,정종욱,분석,52434953.0,,87.756,86.985,-0.86540 (99.1346),5 8,2014/10/17 16:31:53,



투찰 데이터 크기: (5735258, 15)
투찰 데이터 칼럼: ['입찰년도', '공고번호', '순위', '사업자 등록번호', '업체명', '대표', '경쟁사 분석', '투찰금액', '가격점수', '예가대비 투찰률(%)', '기초대비 투찰률(%)', '기초대비 사정률(%)', '추첨번호', '투찰일시', '비고']


In [None]:
Data_dict = transform(notices_df,bids_df)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/444 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/964 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/687 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

임베딩 진행 중:   0%|          | 0/116 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

임베딩 진행 중:   9%|▊         | 10/116 [02:22<22:51, 12.93s/it]

In [None]:
DataSet_3 = Data_dict.get("DataSet_3")
DataSet_2 = Data_dict.get("DataSet_2")
DataSet_etc = Data_dict.get("DataSet_etc")

In [None]:
DataSet_3.head()

Unnamed: 0,공고번호,공고제목,발주처(수요기관),지역제한,기초금액,예정가격,예가범위,A값,투찰률(%),참여업체수,...,100_091,100_092,100_093,100_094,100_095,100_096,100_097,100_098,100_099,100_100
0,20230217762-00,정문경비실 신축(통신) 공사,서울특별시,서울/경기,18.40268,18.416526,+3%~-3%,0.0,4.485767,2941.0,...,0.008841,0.011221,0.011221,0.011561,0.010201,0.00782,0.0068,0.00816,0.00544,0.00646
1,20230635737-00,2023년 전산네트워크(업무망) 신설공사,서울교통공사,서울/경기,17.96116,17.953698,+3%~-3%,0.0,4.485767,2765.0,...,0.011212,0.008318,0.009042,0.011212,0.011935,0.005787,0.005787,0.007233,0.006872,0.007233
2,20221110036-00,5호선 미사 외 3역 지하역사 초미세먼지 표출장치 통신공사,서울교통공사,서울/경기,18.120615,18.116143,+3%~-3%,0.0,4.485767,2749.0,...,0.010186,0.009822,0.010186,0.019644,0.013459,0.011277,0.009094,0.008003,0.007639,0.009458
3,20221001152-00,배출수 처리시설 개선 계측제어공사,서울특별시,서울/경기,18.790968,18.804015,+3%~-3%,0.0,4.485767,2717.0,...,0.006993,0.009569,0.015826,0.01325,0.010674,0.006625,0.008097,0.007729,0.010674,0.012146
4,20200504427-00,잠실(2)역 등 4개역 전기실 노후설비 개량에 따른 정보통신공사,서울교통공사,서울/경기,18.141465,18.150785,+3%~-3%,0.0,4.485767,2594.0,...,0.008096,0.010409,0.008867,0.011565,0.013107,0.010409,0.009252,0.005012,0.008096,0.011565


In [None]:
DataSet_etc.isna().sum()

Unnamed: 0,0
공고번호,0
공고제목,0
발주처(수요기관),0
지역제한,0
기초금액,0
...,...
100_096,0
100_097,0
100_098,0
100_099,0


In [None]:
DataSet_2.shape

(1417, 459)

In [None]:
DataSet_etc.iloc[:,-180:].sum(axis=1).value_counts()

Unnamed: 0,count
4.0,39
4.0,38
4.0,34
4.0,31
4.0,23
4.0,22
4.0,20
4.0,20
4.0,19
4.0,19


# 과거의 망령

In [None]:
# # ============================================================
# # 25. 데이터 클렌징 함수: 중복 제거, 결측치 처리 등 전처리 수행
# # ============================================================
# def data_cleansing(df1, df2):
#     """
#     공고 데이터(df1)와 투찰 데이터(df2)에 대해 중복 제거, 결측치 처리 및 기타 전처리를 수행합니다.

#     Parameters:
#         df1 (pd.DataFrame): 공고 데이터.
#         df2 (pd.DataFrame): 투찰 데이터.

#     Returns:
#         tuple(pd.DataFrame, pd.DataFrame): 전처리된 공고 데이터와 투찰 데이터.

#     Raises:
#         TypeError: 입력 데이터가 Pandas DataFrame이 아닐 경우.
#     """
#     #print("✅ [data_cleansing] 시작")
#     if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
#         raise TypeError("입력 데이터는 Pandas DataFrame이어야 합니다.")
#     # 원본 데이터 복사
#     notices_df = df1.copy()
#     bids_df = df2.copy()
#     #print("✅ [data_cleansing] 중복 제거 시작")
#     notices_df = notices_df.drop_duplicates(keep='first')
#     bids_df = bids_df.drop_duplicates(keep='first')
#     #print("✅ [data_cleansing] 결측치 처리 시작")
#     # 공고 데이터의 '예가범위'와 투찰 데이터의 '기초대비 사정률(%)' 결측치 제거
#     notices_df["예가범위"].dropna(inplace=True)
#     bids_df.dropna(subset=["기초대비 사정률(%)"], inplace=True)
#     # '투찰률(%)' 결측치는 평균값으로 대체
#     notices_df["투찰률(%)"] = notices_df["투찰률(%)"].fillna(notices_df["투찰률(%)"].mean(numeric_only=True))
#     #print("✅ [data_cleansing] 기타 처리 시작")
#     # '예가범위' 문자열에서 공백 제거
#     notices_df["예가범위"] = notices_df["예가범위"].astype(str).str.replace(r"\s+", "", regex=True)
#     # '기초대비 사정률(%)' 값을 파싱하여 '사정률(%)' 컬럼 생성
#     bids_df["사정률(%)"] = parse_sajeong_rate(bids_df['기초대비 사정률(%)'])
#     bids_df.drop("기초대비 사정률(%)", axis=1, inplace=True)
#     bids_df.dropna(subset=["사정률(%)"], inplace=True)
#     #print("✅ [data_cleansing] 완료")
#     return notices_df, bids_df

In [None]:
# # ============================================================
# #  Target 데이터 처리 함수: 구간화 및 최종 DataFrame 생성
# # ============================================================
# def target_processing(df):
#     """
#     Target 데이터를 처리하여, '예가범위'와 '사정률(%)' 데이터를 구간별 비율로 변환한 후,
#     원본 데이터를 해당 결과와 결합하여 반환합니다.

#     Parameters:
#         df (pd.DataFrame): Target 데이터. '예가범위'와 '사정률(%)' 컬럼이 포함되어 있어야 합니다.

#     Returns:
#         pd.DataFrame: 구간화된 타겟 데이터 (원본 컬럼에서 '예가범위'와 '사정률(%)' 제거됨).

#     Raises:
#         ValueError: 필수 컬럼이 존재하지 않을 경우.
#     """
#     #print(f"✅ [target_processing] 시작 - 입력 DataFrame shape: {df.shape}")
#     required_columns = {"예가범위", "사정률(%)"}
#     if not required_columns.issubset(df.columns):
#         raise ValueError(f"데이터프레임에 필수 컬럼이 없습니다: {required_columns - set(df.columns)}")
#     copy_df = df.copy()
#     # parse_range_value로 예가범위 매핑 값 결정 (예: 3, 2 등)
#     range_value = parse_range_value(copy_df)
#     result_list = []
#     if range_value in (2, 3):
#         bins_dict = get_bins_dict(range_value)
#         result_list = [process_row_fixed_bins(data, bins_dict) for data in copy_df["사정률(%)"]]
#     else:
#         result_list = [process_row_dynamic_bins(row["사정률(%)"], row["예가범위"]) for _, row in copy_df.iterrows()]
#     result_df = pd.DataFrame(result_list)
#     # 원본 DataFrame과 구간화 결과 결합 후, 원본 '예가범위'와 '사정률(%)' 컬럼 제거
#     combined_df = pd.concat([copy_df, result_df], axis=1)
#     combined_df.drop(columns=["예가범위", "사정률(%)"], inplace=True)
#     # print(f"✅ [target_processing] 완료 - 결과 DataFrame shape: {combined_df.shape}")
#     return combined_df

In [None]:
# # ============================================================
# #  Target 데이터 생성 함수: 공고와 입찰 데이터 병합
# # ============================================================
# def remake_target(df1, df2):
#     """
#     공고 데이터(df1)와 입찰 데이터(df2)를 공고번호를 기준으로 병합하여 Target 데이터를 생성합니다.

#     Parameters:
#         df1 (pd.DataFrame): 공고 데이터 (예: notices_df), '공고번호'와 '예가범위' 컬럼 필수.
#         df2 (pd.DataFrame): 입찰 데이터 (예: bids_df), '공고번호'와 '기초대비 사정률(%)' 컬럼 필수.

#     Returns:
#         pd.DataFrame: 공고번호 기준으로 병합된 Target 데이터 (공고번호, 사정률(%), 예가범위 포함).

#     Raises:
#         ValueError: 필수 컬럼이 존재하지 않을 경우.
#     """
#     #print("✅ [remake_target] 시작")
#     if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
#         raise TypeError("입력 데이터는 Pandas DataFrame이어야 합니다.")
#     required_columns_df1 = {"공고번호", "예가범위"}
#     required_columns_df2 = {"공고번호", "사정률(%)"}
#     if not required_columns_df1.issubset(df1.columns):
#         raise ValueError(f"df1에 필요한 컬럼이 없습니다: {required_columns_df1 - set(df1.columns)}")
#     if not required_columns_df2.issubset(df2.columns):
#         raise ValueError(f"df2에 필요한 컬럼이 없습니다: {required_columns_df2 - set(df2.columns)}")
#     notices_df = df1.copy()
#     bids_df = df2.copy()
#     grouped_df = group_to_list(bids_df[["공고번호", "사정률(%)"]], "공고번호", "사정률(%)")
#     range_df = notices_df[["공고번호", "예가범위"]]
#     merge_df = pd.merge(grouped_df, range_df, on="공고번호", how="inner")
#     result_df = merge_df[["공고번호", "사정률(%)", "예가범위"]]
#     #print(f"✅ [remake_target] 완료 - 결과 shape: {result_df.shape}")
#     return result_df

In [None]:
# def mapping_range_level(lower_bound, upper_bound):
#     """
#     하한(lower_bound)과 상한(upper_bound)에 따라 예가범위 레벨을 반환하는 함수.

#     Parameters:
#         lower_bound (int): 예가범위 하한값 (예: -3, -2)
#         upper_bound (int): 예가범위 상한값 (예: 3, 2)

#     Returns:
#         int: 매핑된 정수 값 (기본값: 0)
#     """

#     # 예가범위 매핑 딕셔너리
#     range_mapping = {
#         (-3, 3): 3,
#         (-2, 2): 2
#     }

#     # 딕셔너리에서 찾고, 없으면 기본값 0 반환
#     range_level = range_mapping.get((lower_bound, upper_bound), 0)

#     return range_level

In [None]:
# # ============================================================
# #  구간 경계값 생성 함수: 예가범위에 따라 bins 생성
# # ============================================================
# def make_bins(range_str):
#     """
#     주어진 예가범위 문자열을 기반으로, np.linspace를 사용해 일정 구간 수마다 구간 경계값을 생성합니다.
#     각 구간의 첫 번째 경계는 -∞, 마지막 경계는 ∞로 설정됩니다.

#     Parameters:
#         range_str (str): 예가범위 문자열 (예: "+0% ~ 6%").

#     Returns:
#         dict: {구간 개수: 구간 경계값 리스트} 형태의 딕셔너리.

#     Raises:
#         ValueError: 입력값이 유효하지 않거나, 구간 경계값 생성에 실패한 경우.
#     """
#     #print(f"✅ [make_bins] 예가범위 입력값: {range_str}")
#     if not isinstance(range_str, str) or range_str.strip() == "":
#         raise ValueError(f"❌ 유효하지 않은 예가범위 값: {range_str}")
#     try:
#         lower_bound, upper_bound = extract_bounds(range_str)
#     except Exception as e:
#         print(f"❌ [make_bins] extract_bounds 오류: {e}")
#         return {}
#     #print(f"🔍 [make_bins] 추출된 Lower: {lower_bound}, Upper: {upper_bound}")
#     intervals = [10, 20, 50, 100]
#     bins_dict = {}

#     for interval in intervals:
#         # np.linspace로 구간 경계값 생성 (구간 수 = interval + 1)
#         bins = np.linspace(lower_bound, upper_bound, num=interval + 1).tolist()
#         #print(f"🔍 [make_bins] Interval {interval}: Generated Bins (Before Deduplication): {bins}")
#          # ✅ 중복 검사 (중복된 값이 있으면 오류 발생)
#         if len(set(bins)) < len(bins):
#             duplicates = [x for x in bins if list(bins).count(x) > 1]
#             raise ValueError(f"⚠️ 중복된 구간 경계값이 발생했습니다! 중복된 값: {set(duplicates)}")
#         #print(f"✅ [make_bins] Interval {interval}: Unique Bins: {bins}\n")
#         # 첫 번째와 마지막 경계를 -∞, ∞로 설정
#         bins[0] = -np.inf
#         bins[-1] = np.inf
#         bins_dict[interval] = bins

#     return bins_dict

In [None]:
# # ============================================================
# #  매핑된 range_value에 따른 bins_dict 반환 함수
# # ============================================================
# def get_bins_dict(range_value, row_value=None):
#     """
#     매핑된 range_value에 따라, 저장된 CSV 파일에서 구간 경계값을 불러오거나
#     동적으로 구간(bins)을 생성하여 딕셔너리 형태로 반환합니다.

#     Parameters:
#         range_value (int): 미리 매핑된 범위 값 (예: 2 또는 3).
#         row_value (str, optional): 동적 구간 생성을 위한 예가범위 문자열.

#     Returns:
#         dict: {구간 개수: 구간 경계값 리스트} 형태의 딕셔너리.
#     """
#     #print(f"✅ [get_bins_dict] range_value: {range_value}, row_value: {row_value}")
#     if not isinstance(range_value, int):
#         raise TypeError("range_value는 정수(int)여야 합니다.")
#     if range_value in (2, 3):
#         return load_bins(range_value)
#     if isinstance(row_value, str):
#         return make_bins(row_value)
#     return {}

In [None]:
# # ============================================================
# #  벡터 데이터를 구간화하여 비율로 변환하는 함수
# # ============================================================
# def data_to_target(data, bins):
#     """
#     주어진 숫자 데이터 리스트를 구간(bins)에 따라 분할하고,
#     각 구간에 속하는 값들의 비율을 계산하여 딕셔너리로 반환합니다.

#     Parameters:
#         data (list, np.array, pd.Series): 구간화할 숫자 데이터.
#         bins (list): 구간 경계값 리스트 (최소 2개 이상의 값 필요).

#     Returns:
#         dict: {구간 레이블: 해당 구간 비율} 형태의 딕셔너리.

#     Raises:
#         ValueError: bins가 최소 2개 이상의 값을 가지지 않을 경우.
#     """
#     #print(f"✅ [data_to_target] 데이터 크기: {len(data)}, Bins 개수: {len(bins)}")
#     if not data:
#         print("⚠️ [data_to_target] 빈 데이터 입력됨.")
#         return {}
#     if not isinstance(bins, list) or len(bins) < 2:
#         raise ValueError("입력값 'bins'는 최소 2개 이상의 값을 가진 리스트여야 합니다.")
#     # 각 구간에 대해 라벨 생성 (예: '001', '002', ...)
#     labels = [f"{i+1:03}" for i in range(len(bins) - 1)]
#     # pd.cut을 이용해 데이터를 구간에 맞게 분류
#     categories = pd.cut(data, bins, labels=labels, include_lowest=True)
#     # 각 구간별 값의 개수를 세고 전체 개수에 대한 비율 계산
#     counts = pd.Series(categories).value_counts().reindex(labels, fill_value=0).sort_index()
#     total_count = len(data)
#     ratios = counts / total_count if total_count > 0 else counts
#     result_dict = dict(zip(labels, ratios.values))
#     #print(f"✅ [data_to_target] 결과: {result_dict}")
#     return result_dict

In [None]:
# # ============================================================
# #  동적으로 bins를 생성하여 데이터를 구간화하는 함수
# # ============================================================
# def process_row_dynamic_bins(data, rng):
#     """
#     주어진 예가범위(rng)를 기반으로 동적으로 구간(bins)을 생성하고,
#     입력 데이터의 각 값이 해당 구간에 속하는 비율을 계산하여 반환합니다.

#     Parameters:
#         data (list, np.array, pd.Series): 구간화를 적용할 숫자 데이터.
#         rng (str): 예가범위 문자열 (예: "+2% ~ -2%", "+3% ~ -3%").

#     Returns:
#         dict: {구간_레이블: 비율} 형태의 딕셔너리.
#     """
#     #print(f"✅ [process_row_dynamic_bins] 데이터 크기: {len(data)}, 예가범위: {rng}")
#     if not isinstance(data, (list, np.ndarray, pd.Series)):
#         raise TypeError("입력값 'data'는 list, np.array, pd.Series 중 하나여야 합니다.")
#     if not isinstance(rng, str) or pd.isna(rng) or rng.strip() == "":
#         print(f"⚠️ [process_row_dynamic_bins] 유효하지 않은 예가범위: '{rng}'")
#         return {}
#     # 데이터에서 NaN 제거 후 리스트로 변환
#     if isinstance(data, (pd.Series, np.ndarray)):
#         data = pd.Series(data).dropna().tolist()
#     elif isinstance(data, list):
#         data = [x for x in data if pd.notna(x)]
#     if not data:
#         print("⚠️ [process_row_dynamic_bins] 빈 데이터 입력됨.")
#         return {}
#     bins_dict = make_bins(rng)
#     if not bins_dict:
#         print(f"⚠️ [process_row_dynamic_bins] bins 생성 실패 for range '{rng}'")
#         return {}
#     try:
#         row_result = {
#             f"{bin_size:03}_{key}": value
#             for bin_size, bins in bins_dict.items()
#             for key, value in data_to_target(data, bins).items()
#         }
#         #print(f"✅ [process_row_dynamic_bins] 결과: {row_result}")
#         return row_result
#     except Exception as e:
#         print(f"⚠️ [process_row_dynamic_bins] 오류 발생: {e}")
#         return {}

In [None]:
# ============================================================
# #  동적으로 bins를 생성하여 데이터를 구간화하는 함수
# # ============================================================
# def process_row_dynamic_bins(data, bins_dict):
#     """
#     주어진 예가범위(rng)를 기반으로 동적으로 구간(bins)을 생성하고,
#     입력 데이터의 각 값이 해당 구간에 속하는 비율을 계산하여 반환합니다.

#     Parameters:
#         data (list, np.array, pd.Series): 구간화를 적용할 숫자 데이터.
#         rng (str): 예가범위 문자열 (예: "+2% ~ -2%", "+3% ~ -3%").

#     Returns:
#         dict: {구간_레이블: 비율} 형태의 딕셔너리.
#     """
#     #print(f"✅ [process_row_dynamic_bins] 데이터 크기: {len(data)}, 예가범위: {rng}")
#     if not isinstance(data, (list, np.ndarray, pd.Series)):
#         raise TypeError("입력값 'data'는 list, np.array, pd.Series 중 하나여야 합니다.")
#     if not isinstance(rng, str) or pd.isna(rng) or rng.strip() == "":
#         print(f"⚠️ [process_row_dynamic_bins] 유효하지 않은 예가범위: '{rng}'")
#         return {}
#     # 데이터에서 NaN 제거 후 리스트로 변환
#     if isinstance(data, (pd.Series, np.ndarray)):
#         data = pd.Series(data).dropna().tolist()
#     elif isinstance(data, list):
#         data = [x for x in data if pd.notna(x)]
#     if not data:
#         print("⚠️ [process_row_dynamic_bins] 빈 데이터 입력됨.")
#         return {}
#     bins_dict = make_bins(rng)
#     if not bins_dict:
#         print(f"⚠️ [process_row_dynamic_bins] bins 생성 실패 for range '{rng}'")
#         return {}
#     try:
#         row_result = {
#             f"{bin_size:03}_{key}": value
#             for bin_size, bins in bins_dict.items()
#             for key, value in data_to_target(data, bins).items()
#         }
#         #print(f"✅ [process_row_dynamic_bins] 결과: {row_result}")
#         return row_result
#     except Exception as e:
#         print(f"⚠️ [process_row_dynamic_bins] 오류 발생: {e}")
#         return {}