In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Instalar e configurar PySpark no Google Colab
!pip install pyspark
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, date_add, to_date, expr



In [3]:
folder_bronze = '/content/drive/Shareddrives/Real_Project_Churn_DSA30/Dataset/Bronze'
folder_prata = '/content/drive/Shareddrives/Real_Project_Churn_DSA30/Dataset/Prata'

In [6]:
# Criar a sessão do Spark
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# Carregar os dados diretamente no Spark DataFrame
df_userlog_spark = spark.read.csv(os.path.join(folder_bronze, 'df_userlogs_1.csv'), header=True, inferSchema=True)
df_userlog_spark_2 = spark.read.csv(os.path.join(folder_bronze, 'df_userlogs_2.csv'), header=True, inferSchema=True)

df_userlog_spark = df_userlog_spark.union(df_userlog_spark_2)

In [7]:
df_transactions_churn_spark = spark.read.csv(os.path.join(folder_bronze, 'df_transactions_churn_mes.csv'), header=True, inferSchema=True)

df_transactions_churn_spark_vencto = spark.read.csv(os.path.join(folder_bronze, 'df_transactions_churn.csv'), header=True, inferSchema=True)

In [8]:
df_transactions_churn_spark_vencto.show()

+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+-----+
|                msno|payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|transaction_date|membership_expire_date|is_cancel|churn|
+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+-----+
|5Hv+qb4Lbb1XgcAxY...|               31|              180|            799|               799|            0|      2015-03-02|            2015-09-01|        0|false|
|5Hv+qb4Lbb1XgcAxY...|               31|              180|            799|               799|            0|      2015-09-02|            2016-02-28|        0| true|
|l+zc52ZSzwqy/pjxR...|               39|               31|            149|               149|            1|      2015-01-31|            2015-03-28|        0|false|
|l+zc52ZSzwqy/pj

In [9]:
# Converter coluna de data para formato adequado
df_userlog_spark = df_userlog_spark.withColumn("date", to_date(col("date").cast("string"), "yyyyMMdd"))

# Criar coluna de ano-mês e referência futura
df_userlog_spark = df_userlog_spark.withColumn("ano_mes", expr("date_format(date, 'yyyy-MM')"))
df_userlog_spark = df_userlog_spark.withColumn("dt_ref", date_add(col("date"), 30))

In [10]:
# Criar tabela temporária para consultas SQL

df_userlog_spark.createOrReplaceTempView("userlog")
df_transactions_churn_spark.createOrReplaceTempView("transactions_churn")
df_transactions_churn_spark_vencto.createOrReplaceTempView("transactions_churn_vencto")

In [32]:
# Criar tabela agregada de métricas com PySpark SQL
df_features_spark = spark.sql('''
    SELECT msno, dt_ref,
           SUM(num_25) AS num_25,
           SUM(num_50) AS num_50,
           SUM(num_75) AS num_75,
           SUM(num_985) AS num_985,
           SUM(num_100) AS num_100,
           SUM(num_unq) AS num_unq,
           SUM(total_secs) AS total_secs
    FROM userlog
    GROUP BY msno, dt_ref
''')

# Criar tabela de churn apenas com msno e dt_ref
df_churn_spark = spark.sql('''
    SELECT msno, dt_ref, churn, is_auto_renew FROM transactions_churn
''')

df_churn_spark_vecnto = spark.sql('''
    SELECT msno, membership_expire_date, churn, is_auto_renew, is_cancel, payment_method_id, payment_plan_days, plan_list_price, actual_amount_paid FROM transactions_churn
''')

df_features_spark.createOrReplaceTempView("features")
df_churn_spark.createOrReplaceTempView("churn")
df_churn_spark_vecnto.createOrReplaceTempView("churn_vecnto")

In [13]:
# Realizar o JOIN entre features e churn
intervalo_dias = 91  # Alterar conforme necessário

df_final = spark.sql(f'''
    SELECT c.msno, c.dt_ref, c.churn, c.is_auto_renew,
           SUM(f.num_25) AS sum_25_91_dias,
           SUM(f.num_50) AS sum_50_91_dias,
           SUM(f.num_75) AS sum_75_91_dias,
           SUM(f.num_985) AS sum_985_91_dias,
           SUM(f.num_100) AS sum_100_91_dias,
           SUM(f.num_unq) AS sum_unq_91_dias,
           SUM(f.total_secs) AS sum_total_secs_91_dias,

           AVG(f.num_25) AS AVG_num_25_91_dias,
           AVG(f.num_50) AS AVG_num_50_91_dias,
           AVG(f.num_75) AS AVG_num_75_91_dias,
           AVG(f.num_985) AS AVG_num_985_91_dias,
           AVG(f.num_100) AS AVG_num_100_91_dias,
           AVG(f.num_unq) AS AVG_num_unq_91_dias,
           AVG(f.total_secs) AS AVG_total_secs_91_dias

    FROM churn c
    LEFT JOIN features f
    ON c.msno = f.msno
    AND f.dt_ref BETWEEN date_sub(c.dt_ref, {intervalo_dias}) AND c.dt_ref
    GROUP BY c.msno, c.dt_ref, c.churn, c.is_auto_renew
''')

# Criar uma tabela temporária para consultas posteriores
df_final.createOrReplaceTempView("final_table")

# Exibir algumas linhas do resultado
# df_final.show(10)

In [39]:
# Realizar o JOIN entre features e churn
intervalo_dias = 91  # Alterar conforme necessário

df_final_vecnto = spark.sql(f'''
    SELECT c.msno, c.membership_expire_date, c.churn, c.is_auto_renew, c.is_cancel, c.payment_method_id, c.payment_plan_days, c.plan_list_price, c.actual_amount_paid,
           SUM(f.num_25) AS sum_25_91_dias,
           SUM(f.num_50) AS sum_50_91_dias,
           SUM(f.num_75) AS sum_75_91_dias,
           SUM(f.num_985) AS sum_985_91_dias,
           SUM(f.num_100) AS sum_100_91_dias,
           SUM(f.num_unq) AS sum_unq_91_dias,
           SUM(f.total_secs) AS sum_total_secs_91_dias,

           AVG(f.num_25) AS AVG_num_25_91_dias,
           AVG(f.num_50) AS AVG_num_50_91_dias,
           AVG(f.num_75) AS AVG_num_75_91_dias,
           AVG(f.num_985) AS AVG_num_985_91_dias,
           AVG(f.num_100) AS AVG_num_100_91_dias,
           AVG(f.num_unq) AS AVG_num_unq_91_dias,
           AVG(f.total_secs) AS AVG_total_secs_91_dias

    FROM churn_vecnto c
    LEFT JOIN features f
    ON c.msno = f.msno
    AND f.dt_ref BETWEEN date_sub(c.membership_expire_date, {intervalo_dias}) AND c.membership_expire_date
    GROUP BY c.msno, c.membership_expire_date, c.churn, c.is_auto_renew, c.is_cancel, c.payment_method_id, c.payment_plan_days, c.plan_list_price, c.actual_amount_paid
''')

# Criar uma tabela temporária para consultas posteriores
df_final_vecnto.createOrReplaceTempView("final_table")

# Exibir algumas linhas do resultado
# df_final.show(10)

In [35]:
# Salvar o resultado final em um arquivo CSV
# df_final.write.csv(os.path.join(os.path.join(folder_prata, 'df_abt.csv')), header=True, mode='overwrite')
df_final_vecnto.write.csv(os.path.join(os.path.join(folder_prata, 'df_abt_vecnto.csv')), header=True, mode='overwrite')

In [None]:
df_abt = pd.DataFrame()

for arquivo in os.listdir(os.path.join(folder_prata, 'df_abt.csv')):
    if arquivo.endswith('.csv'):
        caminho_arquivo = os.path.join(folder_prata, 'df_abt.csv', arquivo)
        df = pd.read_csv(caminho_arquivo)
        df_abt = pd.concat([df_abt, df], ignore_index=True)

df_abt_tratado = df_abt.fillna(0)
df_abt_tratado.to_csv(os.path.join(folder_prata, 'df_abt_tratado.csv'), index=False)

In [36]:
df_abt_vecnto = pd.DataFrame()

for arquivo in os.listdir(os.path.join(folder_prata, 'df_abt_vecnto.csv')):
    if arquivo.endswith('.csv'):
        caminho_arquivo = os.path.join(folder_prata, 'df_abt_vecnto.csv', arquivo)
        df = pd.read_csv(caminho_arquivo)
        df_abt_vecnto = pd.concat([df_abt_vecnto, df], ignore_index=True)

df_abt_vecnto_tratado = df_abt_vecnto.fillna(0)
df_abt_vecnto_tratado.to_csv(os.path.join(folder_prata, 'df_abt_vecnto_tratado.csv'), index=False)

In [37]:
df_abt = pd.read_csv(os.path.join(folder_prata, 'df_abt_vecnto_tratado.csv'))

In [38]:
df_abt.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1685178 entries, 0 to 1685177
Data columns (total 23 columns):
 #   Column                  Non-Null Count    Dtype  
---  ------                  --------------    -----  
 0   msno                    1685178 non-null  object 
 1   membership_expire_date  1685178 non-null  object 
 2   churn                   1685178 non-null  bool   
 3   is_auto_renew           1685178 non-null  int64  
 4   is_cancel               1685178 non-null  int64  
 5   payment_method_id       1685178 non-null  int64  
 6   payment_plan_days       1685178 non-null  int64  
 7   plan_list_price         1685178 non-null  int64  
 8   actual_amount_paid      1685178 non-null  int64  
 9   sum_25_91_dias          1685178 non-null  float64
 10  sum_50_91_dias          1685178 non-null  float64
 11  sum_75_91_dias          1685178 non-null  float64
 12  sum_985_91_dias         1685178 non-null  float64
 13  sum_100_91_dias         1685178 non-null  float64
 14  su

In [25]:
df_abt.head()

Unnamed: 0,msno,membership_expire_date,churn,is_auto_renew,sum_25_91_dias,sum_50_91_dias,sum_75_91_dias,sum_985_91_dias,sum_100_91_dias,sum_unq_91_dias,sum_total_secs_91_dias,AVG_num_25_91_dias,AVG_num_50_91_dias,AVG_num_75_91_dias,AVG_num_985_91_dias,AVG_num_100_91_dias,AVG_num_unq_91_dias,AVG_total_secs_91_dias
0,++7jKYbuIJPXry8Oh1NcEh9fCsqcQgUaaxXsgG15kMg=,2016-03-22,True,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,+/mUb99nNFMD2bAk3UHs+2ArwioKZTZ+e9l3AhWcLh0=,2015-03-04,False,1,19.0,3.0,1.0,1.0,478.0,492.0,120384.461,1.357143,0.214286,0.071429,0.071429,34.142857,35.142857,8598.890071
2,+/mUb99nNFMD2bAk3UHs+2ArwioKZTZ+e9l3AhWcLh0=,2015-04-04,False,1,68.0,15.0,10.0,6.0,2429.0,2393.0,613926.339,1.74359,0.384615,0.25641,0.153846,62.282051,61.358974,15741.701
3,+/mUb99nNFMD2bAk3UHs+2ArwioKZTZ+e9l3AhWcLh0=,2015-05-04,False,1,124.0,28.0,18.0,18.0,4442.0,4379.0,1118792.383,2.296296,0.518519,0.333333,0.333333,82.259259,81.092593,20718.377463
4,+/mUb99nNFMD2bAk3UHs+2ArwioKZTZ+e9l3AhWcLh0=,2015-06-04,False,1,200.0,49.0,27.0,29.0,5111.0,4913.0,1271484.46,3.333333,0.816667,0.45,0.483333,85.183333,81.883333,21191.407667
