<a href="https://colab.research.google.com/github/jihosuperman/predict_churn_rate/blob/main/sampling_module.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=cbd6e573f5869e596f4a8051fc1cfb8ea7695dd403edbaac380f446fa8e036a8
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
# Spark Session 열기

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('ds_study').getOrCreate()
spark

In [None]:
# Parquet 파일 읽어오기

path = 'D:/kkbox-churn-prediction-challenge/data/churn_comp_refresh/id_encoding/data'

members = spark.read.option('inferSchema', 'true').option('header', 'true').parquet(path+'/new_members/new_members.parquet')
train = spark.read.option('inferSchema', 'true').option('header', 'tdrue').parquet(path+'/new_train/new_train.parquet')
transactions = spark.read.option('inferSchema', 'true').option('header', 'true').parquet(path+'/new_transactions/new_transactions.parquet')
user_logs = spark.read.option('inferSchema', 'true').option('header', 'true').parquet(path+'/new_user_logs/new_user_logs-001.parquet')

In [None]:
# 날짜 데이터를 가지는 열의 값 형식을 Date로 바꾸어주기 (현재는 모두 Integer)
# members - registration_init_time
# transactions - transaction_date, membership_expire_date
# user_logs - date


date_dict = {members : ['new_members', ['registration_init_time']],
             transactions : ['new_transactions', ['transaction_date', 'membership_expire_date']],
             user_logs : ['new_user_logs', ['date']]}

for key, value in date_dict.items():
  for d_col in value[1]:
    key = key.withColumn(d_col, col(d_col).cast('string')).select('*')
    key = key.withColumn(d_col, to_date(concat(col(d_col).substr(1, 4),
                                               lit('-'),
                                               col(d_col).substr(5, 2),
                                               lit('-'),
                                               col(d_col).substr(7, 2)))).select('*')
  value[0] = key 

members = date_dict[members][0]
transactions = date_dict[transactions][0]
user_logs = date_dict[user_logs][0]

In [None]:
# sampling 가능한 msno 리스트 확보하기

A = members.groupBy('msno_num').count()
B = train.groupBy('msno_num').count()
C = transactions.groupBy('msno_num').count()
D = user_logs.groupBy('msno_num').count()

sample_target_msno_all = A.join(B, on = 'msno_num', how = 'inner')\
                          .join(C, on = 'msno_num', how = 'inner')\
                          .join(D, on = 'msno_num', how = 'inner').distinct().drop('count')

total_pool = sample_target_msno_all.count()
print('샘플링이 가능한(4개 데이터셋에서 모두 존재하는) 이용자는 총 {}명입니다.'.format(total_pool))

In [None]:
minor_df = sample_target_msno_all.filter(col("is_churn") == 1)
major_df = sample_target_msno_all.filter(col("is_churn") == 0)
ratio = round(major_df.count()/minor_df.count(), 4)

In [None]:
sampled_majority_df = major_df.sample(1/ratio)
combined_df_2 = sampled_majority_df.unionAll(minor_df)

In [None]:
# sample 비율 정하기
sample_ratio = 0.107

sample_target_msno = sample_target_msno_all.sample(sample_ratio).toPandas()
print('{}명 중 {}%를 샘플링하여 총 {}명을 train 및 test 데이터로 확정합니다.'.format(total_pool, sample_ratio*100, len(sample_target_msno['msno_num'])))

In [None]:
# sample data 저장하기

table_dict = {train : 'sample_train.parquet', members : 'sample_members.parquet', 
              transactions : 'sample_transactions.parquet', user_logs : 'sample_user_logs.parquet'}

new_table_dict = []

for key, value in table_dict.items():
    key.filter(col('msno_num').isin(list(sample_target_msno['msno_num'])))\
       .coalesce(1).write.mode("overwrite").option("header", "true").parquet('./data/sample (ratio = 0.107)/' + value)