## 1. 환경 설정

In [2]:
import pandas as pd
import urllib.parse
from sqlalchemy import create_engine, text
from datetime import datetime
import os
from dotenv import load_dotenv
import warnings
warnings.filterwarnings("ignore")

# 출력 설정
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)

# MySQL 연결 설정
# 환경 변수 로드


# MySQL 연결 설정


# 비밀번호 URL 인코딩

# SQLAlchemy 엔진 생성


## 2. 데이터 추출 함수

In [3]:
def fetch_data(query, engine):
    """MySQL에서 데이터를 추출하여 DataFrame으로 반환."""
    with engine.connect() as conn:
        return pd.read_sql(text(query), conn)

## 3. 사용자(User) 데이터 전처리

In [4]:
# 사용자 데이터 추출
user_query = """
SELECT id, name, mail, is_admin, c_time, m_time, entered_competition_cnt, 
       code_share_cnt, talk_board_cnt, country_code, rank_cpt, new_rank_cpt, 
       pre_ranking_cpt, best_rank_cpt, best_rank_cpt_totaluser, new_rank_cs, 
       new_rank_tb, is_bronze, bronze_update, bronze_rank_cs, bronze_rank_tb, 
       best_rank_tb, one_percent_ranker, best_rank_cs_totaluser, 
       best_rank_tb_totaluser, avatar_tier 
FROM user
"""
user = fetch_data(user_query, engine)
user = user.rename(columns={'id': 'user_id'})
user.to_csv('../data/processed/user.csv', index=False)

# 데이콘 멤버 필터링
def get_dacon_user_ids(user_df):
    """데이콘 내부 멤버(user_id)를 필터링."""
    admin_users = set(user_df[user_df['is_admin'] == 1]['user_id'])
    dacon_email_users = set(
        user_df[user_df['mail'].str.contains(r'@dacon', case=False, regex=True)]['user_id']
    )
    combined = admin_users.union(dacon_email_users, {'400025', '465065'})
    return combined - {457780}

dacon_user_id = get_dacon_user_ids(user)
dacon_user_id = pd.DataFrame(list(dacon_user_id), columns=['user_id'])
dacon_user_id.to_csv('../data/processed/dacon_user_id.csv', index=False)

## 4.구독(subscriptions) 데이터 전처리

In [5]:
# 구독 데이터 추출
subs_query = "SELECT * FROM subscriptions"
subscriptions = fetch_data(subs_query, engine)

# 불필요한 컬럼 제거
subscriptions = subscriptions.drop(['is_deleted', 'IS_PARTIAL_CANCELED', 'period', 'product_price', 'expiration_date', 
                                    'subs_id', 'is_oneoff','next_plan_type','next_billing_date','billing_id',
                                    'created_at','updated_at', 'auto_billing', 'product_code', 'auto_billing_status'], axis = 1)

# 구독 상태 및 유형 정리
subscriptions['status'] = subscriptions['status'].replace({'CANCELED': '환불'}).apply(
    lambda x: '사용' if x != '환불' else x
)
subscriptions['plan_type'] = subscriptions['plan_type'].replace({
    'MONTHLY': 'M', 'YEARLY': 'Y', 'PROMOTION': 'P'
})
subscriptions['product_name'] = subscriptions['product_name'].apply(
    lambda x: x.replace('데이스쿨 프로(Pro) ', '').strip()
)

# 필터링: 사용 중인 구독, 데이콘 멤버 제외
subscriptions = subscriptions[subscriptions['status'] == '사용']
subscriptions = subscriptions[~subscriptions['user_id'].isin(dacon_user_id['user_id'].values)]

# 특정 사용자 구독 유형 수정
subscriptions.loc[subscriptions['user_id'] == 471893, 'plan_type'] = "Y" 

In [6]:
subscriptions = subscriptions.drop(['product_name', 'status'], axis = 1)

# 날짜 변환 및 초기 피처 생성
subscriptions['start_date'] = pd.to_datetime(subscriptions['start_date'])
subscriptions['end_date'] = pd.to_datetime(subscriptions['end_date'])
subscriptions['unsubscribe_date'] = pd.to_datetime(subscriptions['unsubscribe_date'], errors='coerce')

subscriptions = subscriptions.sort_values(['user_id', 'start_date'])
subscriptions['구독_횟수'] = subscriptions.groupby('user_id').cumcount() + 1
subscriptions['구독_일수'] = (subscriptions['end_date'] - subscriptions['start_date']).dt.days
subscriptions['해지까지_일수'] = (subscriptions['unsubscribe_date'] - subscriptions['start_date']).dt.days

# 구독 상태 추가
one_sub = subscriptions.groupby('user_id').filter(lambda x: len(x) == 1)
subscriptions['subscription_status'] = subscriptions.apply(
    lambda row: '해지' if row['user_id'] in one_sub['user_id'].unique() else None, axis=1
)
two_over_sub = subscriptions.groupby('user_id').filter(lambda x: len(x) >= 2)
two_over_sub = two_over_sub.sort_values(by=['user_id', 'start_date'])

def assign_subscription_status(group):
    group['subscription_status'] = '해지'
    group.loc[(group['start_date'].shift(-1) - group['end_date']).dt.total_seconds() <= 24 * 3600, 'subscription_status'] = '연장'
    return group

two_over_sub = two_over_sub.groupby('user_id').apply(assign_subscription_status)
update_mask = subscriptions['user_id'].isin(two_over_sub['user_id'])
subscriptions.loc[update_mask, 'subscription_status'] = two_over_sub['subscription_status'].values

# 사용자별 요약 변수
user_summary = subscriptions.groupby('user_id').agg(
    총_구독_횟수=('구독_횟수', 'max'),
    총_구독_일수=('구독_일수', 'sum')
).reset_index()
subscriptions = subscriptions.merge(user_summary, on='user_id', how='left')

# 컬럼 정리
subscriptions = subscriptions.rename(columns = {
    'is_now': '현재_구독_상태', 'start_date':'구독_시작일', 'end_date':'구독_종료일',
    'plan_type':'구독_타입', 'unsubscribe_date':'해지_신청일', 'subscription_status':'구독_상태'})

subscriptions = subscriptions[[
    'user_id', '현재_구독_상태', '구독_시작일', '구독_종료일', '구독_상태', '구독_타입',
    '구독_일수', '총_구독_일수', '구독_횟수', '총_구독_횟수', '해지_신청일', '해지까지_일수'
]]

# 저장
subscriptions.to_csv('../data/processed/subscriptions.csv', index=False)

## 5. 트랙 사용자(track_users) 데이터 전처리

In [7]:
# 트랙 사용자 데이터 추출
track_query = "SELECT * FROM track_users"
track_users = fetch_data(track_query, engine)

# 트랙 난이도 및 기수 매핑
track_generation_mapping = {
    236269: 1, 236301: 2, 236329: 3, 236362: 4, 236386: 5, 236405: 6, 
    236267: 1, 236300: 2, 236330: 3, 236363: 4, 236387: 5, 236406: 6,  
    236262: 1, 236299: 2, 236331: 3, 236364: 4, 236388: 5, 236407: 6,  
    236268: 1, 236302: 2, 236332: 3, 236365: 4, 236389: 5, 236408: 6,  
                                     236366: 4, 236390: 5, 236409: 6,  
                                                236371: 5, 236410: 6  
}
 
track_mapping = {
    236269: '파이썬', 236301: '파이썬', 236329: '파이썬', 236362: '파이썬', 236386: '파이썬', 236405: '파이썬',  
    236267: '입문', 236300: '입문', 236330: '입문', 236363: '입문', 236387: '입문', 236406: '입문',  
    236262: '기초', 236299: '기초', 236331: '기초', 236364: '기초', 236388: '기초', 236407: '기초',  
    236268: '심화', 236302: '심화', 236332: '심화', 236365: '심화', 236389: '심화', 236408: '심화',  
    236366: '시계열', 236390: '시계열', 236409: '시계열',  # 시계열 트랙
    236371: '이미지', 236410: '이미지'   # 이미지 트랙
}

track_users['기수'] = track_users['track_id'].map(track_generation_mapping)
track_users = track_users[track_users['기수'].notna()] # 25년 기수 참여자들은 제거
track_users['트랙_난이도'] = track_users['track_id'].map(track_mapping)
track_users['progress_rate'] = track_users['progress_rate'].fillna(0)

track_users = track_users.drop(['completion_date','badge_date',
                                'hackathon_score', 'is_completed','ipynb_uploaded','pdf_uploaded',
                                'id', 'play_time', 'total_steps'], axis = 1) 

# 기수별 스케줄 병합
generation_schedule = pd.DataFrame({
    '기수': [1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
    '오픈_날짜': ['2024-06-03', '2024-07-01', '2024-08-01', '2024-09-02', '2024-10-02', '2024-11-01'],
    '마감_날짜': ['2024-06-27', '2024-07-25', '2024-08-26', '2024-09-26', '2024-11-25', '2024-12-25']
})

generation_schedule['오픈_날짜'] = pd.to_datetime(generation_schedule['오픈_날짜'])
generation_schedule['마감_날짜'] = pd.to_datetime(generation_schedule['마감_날짜'])
generation_schedule['트랙_진행_일수'] = (generation_schedule['마감_날짜'] - generation_schedule['오픈_날짜']).dt.days

track_users = track_users.merge(generation_schedule, on='기수', how='left')

# 1. 날짜 형식 변환
track_users['created_at'] = pd.to_datetime(track_users['created_at'], errors='coerce')
track_users['updated_at'] = pd.to_datetime(track_users['updated_at'], errors='coerce')
track_users['트랙_학습_기간_일수'] = (track_users['updated_at'] - track_users['created_at']).dt.days.clip(lower=0)
track_users = track_users[track_users['updated_at'].notna()]

track_users = track_users[~track_users['user_id'].isin(dacon_user_id['user_id'].values)]

# 저장
track_users.to_csv('../data/processed/track_users.csv', index=False)

# 6.프로젝트 및 로그(project_progress_details) 데이터 전처리

In [8]:
project_progress_query = "SELECT * FROM project_progress_details"
project_progress_details = fetch_data(project_progress_query, engine)

project_progress_details = project_progress_details[~project_progress_details['user_id'].isin(dacon_user_id['user_id'].values)]
project_progress_details = project_progress_details.drop(['id'], axis=1)
project_progress_details.to_csv('../data/processed/project_progress_details.csv', index=False)
project_progress_details.shape

(123096, 8)

# 7.track_user_step_check_logs

In [9]:
import pandas as pd
import urllib.parse
from sqlalchemy import create_engine, text

# MySQL 연결 설정
CONFIG = {
    'host': "dacon-rds-new-replica.cwg136z3ejlu.ap-northeast-2.rds.amazonaws.com",
    'username': "data",
    'password': "aortladkdltm@2024!",  # 특수문자(!) 처리 필요
    'database': "DaconEduLog",
    'port': 3306  # MySQL 기본 포트 추가
}

# 비밀번호 URL 인코딩
encoded_password = urllib.parse.quote_plus(CONFIG["password"])

# SQLAlchemy 엔진 생성
engine = create_engine(
    f"mysql+pymysql://{CONFIG['username']}:{encoded_password}@{CONFIG['host']}:{CONFIG['port']}/{CONFIG['database']}"
)


project_progress_query = "SELECT * FROM track_user_step_check_logs"
track_user_step_check_logs = fetch_data(project_progress_query, engine)


track_user_step_check_logs = track_user_step_check_logs[~track_user_step_check_logs['user_id'].isin(dacon_user_id['user_id'].values)]
track_user_step_check_logs = track_user_step_check_logs.drop(['id', 'uuid'], axis = 1)
track_user_step_check_logs

track_user_step_check_logs.to_csv('../data/processed/track_user_step_check_logs.csv', index=False)
