In [1]:
import polars as pl
import pandas as pd

In [2]:

# 보유상품수 추가 함수
def add_product_count(df: pl.DataFrame) -> pl.DataFrame:
    """
    '수신_요구불예금', '수신_거치식예금', '수신_적립식예금', '수신_펀드',
    '수신_외화예금', '대출금액', '카드_승인금액' 컬럼 값을 바탕으로 
    '보유상품수' 컬럼을 추가한다.
    
    각 컬럼 값이 0이 아니면 해당 상품을 보유했다고 간주한다.
    """
    
    # (컬럼 != 0) AND is_not_null() -> 보유 여부 계산
    df = df.with_columns([
        (pl.col("수신_요구불예금").is_not_null() & (pl.col("수신_요구불예금") != 0))
            .cast(pl.Int64).alias("수신_요구불예금_보유"),
        (pl.col("수신_거치식예금").is_not_null() & (pl.col("수신_거치식예금") != 0))
            .cast(pl.Int64).alias("수신_거치식예금_보유"),
        (pl.col("수신_적립식예금").is_not_null() & (pl.col("수신_적립식예금") != 0))
            .cast(pl.Int64).alias("수신_적립식예금_보유"),
        (pl.col("수신_펀드").is_not_null() & (pl.col("수신_펀드") != 0))
            .cast(pl.Int64).alias("수신_펀드_보유"),
        (pl.col("수신_외화예금").is_not_null() & (pl.col("수신_외화예금") != 0))
            .cast(pl.Int64).alias("수신_외화예금_보유"),
        (pl.col("대출금액").is_not_null() & (pl.col("대출금액") != 0))
            .cast(pl.Int64).alias("대출금액_보유"),
        (pl.col("카드_승인금액").is_not_null() & (pl.col("카드_승인금액") != 0))
            .cast(pl.Int64).alias("카드_승인금액_보유"),
    ])

    # 보유상품수 계산 (각 "_보유" 컬럼 합산)
    df = df.with_columns(
        (
            pl.col("수신_요구불예금_보유") + 
            pl.col("수신_거치식예금_보유") + 
            pl.col("수신_적립식예금_보유") + 
            pl.col("수신_펀드_보유") + 
            pl.col("수신_외화예금_보유") + 
            pl.col("대출금액_보유") + 
            pl.col("카드_승인금액_보유")
        ).alias("보유상품수")
    )

    # 임시 보유 여부 컬럼들 제거
    df = df.drop([
        "수신_요구불예금_보유", "수신_거치식예금_보유", "수신_적립식예금_보유",
        "수신_펀드_보유", "수신_외화예금_보유", "대출금액_보유", "카드_승인금액_보유"
    ])

    return df


# 데이터 전처리 함수
def preprocess_customer(df: pl.DataFrame) -> pl.DataFrame:
    """
    CUSTOMER 데이터프레임을 전처리하는 함수.
    1. 불필요 컬럼 삭제
    2. 특정 컬럼명 변경
    3. '입출금예금', '기타예금' 컬럼 생성 및 기존 컬럼 삭제
    4. '연령대' 컬럼 값 매핑 후 int 형 변환
    5. 컬럼 순서 정리

    Args:
        df (pl.DataFrame): 원본 CUSTOMER 데이터프레임
    
    Returns:
        pl.DataFrame: 전처리 완료된 CUSTOMER 데이터프레임
    """

    # 1. 불필요한 컬럼 삭제
    df = df.drop(['자택_시도', '자택_시군구', '카드_사용횟수']) #'카드_사용여부'
    
    # 2. 컬럼명 변경
    df = df.rename({
        "수신_펀드": "비이자상품",
        "대출금액": "대출",
        "카드_승인금액": "카드"
    })
    
    # 3. '입출금예금', '기타예금' 컬럼 생성 후 기존 컬럼 삭제
    df = (
        df.with_columns(
            (pl.col('수신_요구불예금') + pl.col('수신_외화예금')).alias('입출금예금')
        )
        .drop(['수신_요구불예금', '수신_외화예금'])
        .with_columns(
            (pl.col('수신_거치식예금') + pl.col('수신_적립식예금')).alias('기타예금')
        )
        .drop(['수신_거치식예금', '수신_적립식예금'])
    )
    
    # 4. '연령대' 컬럼 변환 (매핑 후 int32로 변환)
    age_mapping = {"10대미만": 10, "20대": 20, "30대": 30, "40대": 40, "50대": 50, "60대이상": 60}
    df = df.with_columns(
        df['연령대']
        .replace(age_mapping)
        .cast(pl.Int32)
        .alias('연령대')
    )
    
    # 5. 컬럼 순서 정리
    #    - 새로 만든 컬럼들("입출금예금", "기타예금", "비이자상품", "카드", "대출", "보유상품수")을 맨 뒤로
    main_cols = ["입출금예금", "기타예금", "비이자상품", "카드", "대출", "보유상품수"]
    desired_order = [col for col in df.columns if col not in main_cols] + main_cols
    df = df.select(desired_order)

    return df


# 고객 점수 환산 함수
def calculate_financial_scores(df):
    """
    금융 데이터에 대해 3개월 평균, 3개월 누적, 6개월 평균 등을 기반으로 점수를 계산하는 함수.
    특히 '대출' 컬럼은 다음과 같은 로직으로 계산함:
      - 각 달의 기여액 = 대출금액 * (10 + 기준금리) / 100
      - 6개월 동안의 기여액 평균을 구한 뒤, 이를 25/10000로 곱해서 점수를 산출

    Args:
        df (pl.DataFrame): 고객 금융 데이터.
    
    Returns:
        pl.DataFrame: 계산된 금융 점수가 추가된 데이터프레임.
    """

    # 단위 점수 기준 (대출은 별도 로직이므로 제거/무시)
    unit_scores = {
        "입출금예금": 8 / 100000,
        "기타예금": 4 / 100000,
        "비이자상품": 6 / 100000,
        "카드": 6 / 100000,
        "대출": 25 / 10000
    }

    # (1) 대출 기여액 컬럼 생성: 각 달의 대출금액 × (10 + 기준금리) / 100
    df = df.with_columns(
        (pl.col("대출") * (10 + pl.col("기준금리")) / 100).alias("대출_기여액")
    )

    # (2) 고객ID별 rolling 연산: 입출금예금, 기타예금, 비이자상품은 3개월 평균
    #    카드 사용액은 3개월 누적
    df = df.with_columns([
        (pl.col("입출금예금").rolling_sum(window_size=3, min_periods=1).over("고객ID") / 3).alias("입출금예금_3개월평균"),
        (pl.col("기타예금").rolling_sum(window_size=3, min_periods=1).over("고객ID") / 3).alias("기타예금_3개월평균"),
        (pl.col("비이자상품").rolling_sum(window_size=3, min_periods=1).over("고객ID") / 3).alias("비이자상품_3개월평균"),
        (pl.col("카드").rolling_sum(window_size=3, min_periods=1).over("고객ID")).alias("카드_3개월누적"),
        (pl.col("대출_기여액").rolling_sum(window_size=6, min_periods=1).over("고객ID") / 6).alias("대출_6개월평균_기여액")
    ])


    # (3) 점수 계산
    df = df.with_columns([
        (pl.col("입출금예금_3개월평균") * unit_scores["입출금예금"]).alias("입출금예금_점수"),
        (pl.col("기타예금_3개월평균") * unit_scores["기타예금"]).alias("기타예금_점수"),
        (pl.col("비이자상품_3개월평균") * unit_scores["비이자상품"]).alias("비이자상품_점수"),
        (pl.col("카드_3개월누적") * unit_scores["카드"]).alias("카드_점수"),
        (pl.col("대출_6개월평균_기여액") * unit_scores["대출"]).alias("대출_점수")
    ])

    # (5) 보유상품수에 따른 점수(상품_점수)
    df = df.with_columns(
        pl.when(pl.col("보유상품수") <= 4)
          .then(pl.col("보유상품수") * 10)
          .otherwise(40 + (pl.col("보유상품수") - 4) * 20)
          .alias("상품_점수")
    )

    # (6) 총점수 = 각 상품 점수 + 상품_점수
    df = df.with_columns(
        (pl.col("입출금예금_점수") +
         pl.col("기타예금_점수") +
         pl.col("비이자상품_점수") +
         pl.col("카드_점수") +
         pl.col("대출_점수") +
         pl.col("상품_점수")
        ).alias("총점수")
    )

    # (7) 중간 계산 컬럼 제거
    df = df.drop([
        "입출금예금_3개월평균", "기타예금_3개월평균", "비이자상품_3개월평균", "대출_기여액", "대출_6개월평균_기여액",
        "카드_3개월누적", "입출금예금_점수", "기타예금_점수", "비이자상품_점수","카드_점수", "대출_점수", "상품_점수"
    ])

    return df


    # 이용기간 계산 함수
def add_usage_period_column_for_products(df: pl.DataFrame) -> pl.DataFrame:
    """
    같은 고객ID 내에서, 보유상품수가 0이 아닌 달에만 이용기간을 +1로 누적.
    (보유상품수가 0인 달은 이전 달 이용기간을 그대로 유지)
    결과를 '이용기간' 컬럼에 기록해 주는 함수.
    """
    # (1) (고객ID, 기준년월)로 정렬
    df_sorted = df.sort(["고객ID", "기준년월"])

    out = []
    # (2) 고객ID별로 partition
    for gdf in df_sorted.partition_by("고객ID", maintain_order=True):
        usage_period_list = []
        accumulator = 0
        
        # (3) 0이 아닌 경우에만 accumulator+=1
        #     0이면 accumulator 그대로
        for val in gdf["보유상품수"]:
            if val != 0:
                accumulator += 1
            usage_period_list.append(accumulator)
        
        # (4) 계산 결과를 새로운 '이용기간' 컬럼으로 추가
        gdf_with_col = gdf.with_columns(
            pl.Series("이용기간", usage_period_list)
        )
        out.append(gdf_with_col)

    # (5) 모든 고객 그룹을 다시 하나로 concat
    df_with_usage = pl.concat(out)
    return df_with_usage

In [None]:
import polars as pl

def load_and_process_data(data_path: str, data_path_2: str) -> pl.DataFrame:
    # 카드, 고객 데이터 불러오기
    CARD =  pl.read_csv(f"{data_path}", encoding="euc-kr")
    CUSTOMER =  pl.read_csv(f"{data_path_2}", encoding="euc-kr")

    # 필요한 컬럼 선택
    card_select = CARD.select(["고객ID", "승인금액"])

    # 고객ID별 카드 승인금액 합계 계산
    card_summary = (
        card_select
        .group_by("고객ID")
        .agg(
            pl.sum("승인금액").alias("카드_승인금액")  # 승인금액 합계
        )
    )

    # 고객 데이터와 카드 데이터 병합 (left join)
    CUSTOMER_N = CUSTOMER.join(card_summary, on="고객ID", how="left")

    # Null값을 0으로 반환
    CUSTOMER_N = CUSTOMER_N.with_columns([
        pl.col("카드_승인금액").fill_null(0)
    ])

    CUSTOMER_N = add_product_count(CUSTOMER_N)
    CUSTOMER_N = preprocess_customer(CUSTOMER_N)

    return CUSTOMER_N

#CUSTOMER_21_N = load_and_process_data("C:/Users/campus3S031/Desktop/iM뱅크 프로젝트용 데이터/raw_data/iMBANK_CARD_DATA_2021(K-DigitalTraining).csv",
#                                      "C:/Users/campus3S031/Desktop/iM뱅크 프로젝트용 데이터/raw_data/iMBANK_CUSTOMER_DATA_2021(K-DigitalTraining).csv")

#CUSTOMER_22_N = load_and_process_data("C:/Users/campus3S031/Desktop/iM뱅크 프로젝트용 데이터/raw_data/iMBANK_CARD_DATA_2022(K-DigitalTraining).csv",
#                                      "C:/Users/campus3S031/Desktop/iM뱅크 프로젝트용 데이터/raw_data/iMBANK_CUSTOMER_DATA_2022(K-DigitalTraining).csv")

#CUSTOMER_23_N = load_and_process_data("C:/Users/campus3S031/Desktop/iM뱅크 프로젝트용 데이터/raw_data/iMBANK_CARD_DATA_2023(K-DigitalTraining).csv",
#                                      "C:/Users/campus3S031/Desktop/iM뱅크 프로젝트용 데이터/raw_data/iMBANK_CUSTOMER_DATA_2023(K-DigitalTraining).csv")

'''
-------------------------------------------

22, 23년 CUSTOMER 데이터도 같은 방식으로 진행

-------------------------------------------
'''

# 기준년월순으로 정렬 후, 21, 22, 23년 데이터 병합
#CUSTOMER_21_N = CUSTOMER_21_N.sort('기준년월')
#CUSTOMER_22_N = CUSTOMER_22_N.sort('기준년월')
#CUSTOMER_23_N = CUSTOMER_23_N.sort('기준년월')
#CUSTOMER_ALL = pl.concat([CUSTOMER_21_N, CUSTOMER_22_N, CUSTOMER_23_N])

In [None]:
'''

중간에 데이터가 누락된 고객ID 삭제,
대출금리 데이터 추가,
고객 점수 환산

'''

# 고객별 최초 기록된 달과 마지막 기록된 달 확인
customer_range = CUSTOMER_ALL.group_by("고객ID").agg(
    pl.min("기준년월").cast(pl.Utf8).alias("start_month"),  # 고객별 최초 기록된 달을 문자열로 변환
    pl.max("기준년월").cast(pl.Utf8).alias("end_month"),    # 고객별 마지막 기록된 달을 문자열로 변환
    pl.n_unique("기준년월").alias("recorded_months")  # 고객별 실제 기록된 월 개수
)

# 연도와 월을 숫자로 변환하여 개월 수 차이 계산
customer_range = customer_range.with_columns([
    pl.col("start_month").str.slice(0, 4).cast(pl.Int32).alias("start_year"),  # 연도 추출
    pl.col("start_month").str.slice(5, 2).cast(pl.Int32).alias("start_month_num"),  # 월 추출
    pl.col("end_month").str.slice(0, 4).cast(pl.Int32).alias("end_year"),  # 연도 추출
    pl.col("end_month").str.slice(5, 2).cast(pl.Int32).alias("end_month_num")  # 월 추출
])

# 이론적으로 있어야 할 전체 월 개수 계산
customer_range = customer_range.with_columns(
    ((pl.col("end_year") - pl.col("start_year")) * 12 + (pl.col("end_month_num") - pl.col("start_month_num")) + 1)
    .alias("expected_months")  # 이론적으로 있어야 할 전체 월 개수
)

# 누락된 달 개수 계산 (이론적 개수 - 실제 존재 개수)
customer_range = customer_range.with_columns(
    (pl.col("expected_months") - pl.col("recorded_months")).alias("missing_months")
)

# 한 번이라도 데이터가 누락된 고객 ID 추출
missing_customer_ids = customer_range.filter(pl.col("missing_months") >= 1).select("고객ID")

# 한 번도 누락되지 않은 고객의 데이터만 추출
CUSTOMER_ALL = CUSTOMER_ALL.filter(~pl.col("고객ID").is_in(missing_customer_ids["고객ID"]))

# 금리 데이터 추가
금리 =  pl.read_csv('금리.csv')

# 1. '일반신용대출평균금리'를 f64로 변환
금리 = 금리.with_columns(
    pl.col("일반신용대출평균금리")
    .str.strip_chars()  # 공백 제거
    .cast(pl.Float64)   # f64로 변환
    .alias("일반신용대출평균금리")
)

# 2. 금리 데이터와 고객 데이터 기준으로 조인
CUSTOMER = CUSTOMER_ALL.join(
    금리, 
    on="기준년월", 
    how="left"  # left join을 사용하여 CUSTOMER 데이터를 기준으로 합침
)

# 3. 기준금리 컬럼 삭제
CUSTOMER = CUSTOMER.drop("기준금리")  # 기준금리 컬럼 삭제

# 4. 컬럼 이름 변경
CUSTOMER = CUSTOMER.rename({"일반신용대출평균금리": "기준금리"})

# 고객 점수 환산 함수 실행
result_df = calculate_financial_scores(CUSTOMER)

In [None]:
'''

거래기간, 이용기간 column 추가
21.06 이전 데이터 삭제

'''

# 1. 고객별 최초 기준년월 계산
df_min = result_df.group_by("고객ID").agg(
    pl.col("기준년월").min().alias("최초기준년월")
)

# 2. 원본 df와 고객별 최초 기준년월 병합
df = result_df.join(df_min, on="고객ID", how="left")

# 3. 각 행마다 거래기간(개월) 계산: 
#    (현재 기준년월 - 최초 기준년월)을 (연, 월) 단위로 계산한 후 +1 (첫 달 포함)
df = df.with_columns(
    (
        ((pl.col("기준년월") // 100) - (pl.col("최초기준년월") // 100)) * 12 +
        ((pl.col("기준년월") % 100) - (pl.col("최초기준년월") % 100)) +
        1
    ).alias("거래기간(개월)")
)

# 4. 각 행마다 거래기간 점수 계산: 12개월마다 10점 부여 (미만이면 0점)
df = df.with_columns(
    (pl.col("거래기간(개월)") // 12 * 10).alias("거래기간 점수")
)

# 5. 기존 총점수와 거래기간 점수를 합산하여 총점수 업데이트
df = df.with_columns(
    (pl.col("총점수") + pl.col("거래기간 점수")).alias("총점수")
)

# 6. 불필요한 거래기간 점수와 최초기준년월 컬럼 삭제
df = df.drop(["거래기간 점수", "최초기준년월"])

# 7. 이용기간 column 추가
df = add_usage_period_column_for_products(df)

# 8. 최종 컬럼 순서 재정렬
df = df.select("고객ID", "기준년월", "연령대", "성별", "고객등급", 
    "입출금예금", "기타예금", "비이자상품", "카드", "대출", 
    "기준금리", "보유상품수", "이용기간", "거래기간(개월)", "총점수")

# 9. 21.01~05 데이터 삭제
#df = df.filter(~pl.col("기준년월").is_in([202101, 202102, 202103, 202104, 202105]))

# 10. 데이터 저장
#df.write_csv('CUSTOMER_SCORE.csv') 