<h1>1- Data Preprocessing</h1>

In [1]:
# Dependencies and Setup
from google.cloud import bigquery
from package.helpers import *
from package.plots import *
from package.classes import *

The helpers file is imported ☑
The plots file is imported ☑
The classes file is imported ☑


<h3>1-1- Data Collection</h3>

In [2]:
# A class to handle data loading from BigQuery and retrieval in Spark
class DataToSpark():
    def __init__(self, project_id, dataset_id):
        # Initialize the class with the specified project and dataset identifiers
        self.project_id = project_id
        self.dataset_id = dataset_id

    def load_tables(self, tables = []):
        # Load tables into DataFrames and store them in a dictionary
        self.df_tables = {}  # Dictionary to store DataFrames
        for i, table in enumerate(tables):
            df_table = spark.read.format("bigquery").option("table", f"{self.project_id}:{self.dataset_id}.{table}").load()
            self.df_tables[table] = df_table

    def get_tables(self):
        # Retrieve the stored DataFrames
        return self.df_tables

In [3]:
# Instantiate the 'DataToSpark' class to handle KKBOX data
kkbox = DataToSpark("customer-churn-391917", "kkbox")

# Load specific tables into DataFrames
tables_to_load = ["member", "user", "transactions", "train"]
kkbox.load_tables(tables_to_load)

# Retrieve the stored DataFrames
tables = kkbox.get_tables()

# Display the first row of each table DataFrame
for table_name, table_df in tables.items():
    print(table_name)
    table_df.show(1)

member


                                                                                

+--------------------+----+---+------+--------------+----------------------+
|                msno|city| bd|gender|registered_via|registration_init_time|
+--------------------+----+---+------+--------------+----------------------+
|icRXKGc8YLAIcS6nu...|   1|  0|  null|             7|              20120320|
+--------------------+----+---+------+--------------+----------------------+
only showing top 1 row

user
+--------------------+--------+------+------+------+-------+-------+-------+----------+
|                msno|    date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+--------+------+------+------+-------+-------+-------+----------+
|FnqNUBvN8mysLeKba...|20160119|    34|     1|     1|      1|     88|    108| 24172.471|
+--------------------+--------+------+------+------+-------+-------+-------+----------+
only showing top 1 row

transactions
+--------------------+-----------------+-----------------+---------------+------------------+-------------+--

<h3>2-1- Exploratory Data Analysis (EDA)</h3>

<h4>1-2-1- Members Dataset</h4>

In [4]:
# Perform the merge operation for train and members dataset
merge_df = tables["train"].join(tables["member"], on=['msno'], how='left')

# Dataframe shape
print(f"shape\n |-- rows: {merge_df.count()}\n |-- columns: {len(merge_df.columns)}")
merge_df.printSchema()

                                                                                

shape
 |-- rows: 992931
 |-- columns: 7
root
 |-- msno: string (nullable = true)
 |-- is_churn: long (nullable = true)
 |-- city: long (nullable = true)
 |-- bd: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: long (nullable = true)
 |-- registration_init_time: long (nullable = true)



In [5]:
# Update the "registration_init_time" column
merge_df = merge_df.withColumn("registration_init_time",
                               when(col("registration_init_time").isNotNull(),
                                    to_date(col("registration_init_time").cast("string"), "yyyyMMdd")
                                   ).otherwise("NAN"))

# Fill missing values in the "gender" column with "NAN"
merge_df = merge_df.fillna("NAN", subset=["gender"])

# Columns to replace null values with "NAN" and cast to numeric
columns_to_replace = ["city", "bd", "registered_via"]
for column in columns_to_replace:
    merge_df = merge_df.withColumn(column, when(col(column).isNull(), "NAN").otherwise(col(column).cast("double")))

# Display the updated DataFrame
# merge_df.show()

# # Print the schema of the updated DataFrame
# merge_df.printSchema()

<h4>1-1-2-1- Data and Model Noise</h4>

<p align="justify"><b>a)</b> The data is absent in all columns (city, bd, gender, registered_via, and registration_init_time), where they are marked as NAN.</p>
<p align="justify"><b>b)</b> Instances of duplicated data are identified by considering attributes such as city, bd, gender, and registered_via, with specific attention placed on scenarios where bd equals 0 and gender is labeled as NAN.</p>

<b><ul><li>Noise a evaluation</li></ul></b>

In [6]:
# Filter out rows with all columns set to "NAN"
NAN_filtered_df = merge_df.filter((col("city") == "NAN") &
                                  (col("bd") == "NAN") &
                                  (col("gender") == "NAN") &
                                  (col("registered_via") == "NAN") &
                                  (col("registration_init_time") == "NAN"))

# Calculate and print the filtered result count and percentage
print(f"result\n |-- count: {NAN_filtered_df.count()}\n |-- out of total data: {round(NAN_filtered_df.count()/merge_df.count(),4)*100} %")



result
 |-- count: 115770
 |-- out of total data: 11.66 %


                                                                                

<b><ul><li>Noise b evaluation</li></ul></b>

In [7]:
# Group the data and count occurrences based on specified columns
unit_count = merge_df.groupBy("city", "bd", "gender", "registered_via").count()

# Order the grouped data by count in descending order and show the top 10
top_10_unit_count = unit_count.orderBy(col("count").desc()).show(10)



+----+---+------+--------------+------+
|city| bd|gender|registered_via| count|
+----+---+------+--------------+------+
| 1.0|0.0|   NAN|           7.0|399071|
| NAN|NAN|   NAN|           NAN|115770|
| 1.0|0.0|   NAN|           4.0| 16453|
| 1.0|0.0|   NAN|           9.0| 15651|
| 1.0|0.0|   NAN|           3.0|  6786|
|13.0|0.0|   NAN|           9.0|  4072|
| 1.0|0.0|   NAN|          13.0|  2804|
| 5.0|0.0|   NAN|           9.0|  2660|
|13.0|0.0|   NAN|           3.0|  2399|
|15.0|0.0|   NAN|           9.0|  1635|
+----+---+------+--------------+------+
only showing top 10 rows



                                                                                

In [8]:
# city=1 assesment
city_1 = merge_df.filter(col("city") == 1)

# Calculate and print the filtered result count and percentage
print(f"result\n |-- count: {city_1.count()}\n |-- out of total data: {round(city_1.count()/merge_df.count(),4)*100} %")



result
 |-- count: 455389
 |-- out of total data: 45.86 %


                                                                                

<h4>2-1-2-1- Observation</h4>

<p align="justify">The member dataset exhibits a presence of 11.66% null values that appear to be attributable to inherent noise. To ensure data quality, it is recommended to undertake the removal of rows with complete null values. Further scrutiny reveals that 45.86% of the dataset originates from a specific city (city 1), yet lacks essential demographic details such as age and gender. This scenario gives rise to an imbalanced dataset, characterized by a substantial disparity in the representation of different cities. This imbalance can potentially introduce bias in machine learning model performance, leading it to disproportionately emphasize the predominant class while marginalizing the less prevalent classes. To mitigate this challenge, a judicious approach involves the development of a separate model dedicated to the analysis of city 1's data, thus fostering a more nuanced and equitable learning process.</p>

In [9]:
# Create a new DataFrame 'member_model_df' by subtracting the contents of 'NAN_filtered_df' and 'city_1' from 'merge_df'.
member_model_df = merge_df.subtract(NAN_filtered_df).subtract(city_1)

# Dataframe shape
# print(f"shape\n |-- rows: {member_model_df.count()}\n |-- columns: {len(member_model_df.columns)}")
# member_model_df.printSchema()

# # Display the resulting 'member_model_df'.
# member_model_df.show()

<h4>2-2-1- Transaction Data</h4>

In [None]:
# Perform the merge operation for members output dataset and Transaction data
transactions_merge_df = member_model_df.join(tables["transactions"], on=['msno'], how='left')

# Dataframe shape
print(f"shape\n |-- rows: {transactions_merge_df.count()}\n |-- columns: {len(transactions_merge_df.columns)}")
transactions_merge_df.printSchema()

In [None]:
# Define the window specification for ranking transactions within each member group
window_spec = Window.partitionBy("msno").orderBy(F.col("transaction_date").desc())

# Rank transactions within each member group
ranked_df = transactions_merge_df.withColumn("rank", F.rank().over(window_spec))

# Select rows with rank 1 (last transaction for each member)
last_transaction_df = ranked_df.filter(F.col("rank") == 1).drop("rank")

# Select rows with rank 2 (transaction before the last for each member)
transaction_before_last_df = ranked_df.filter(F.col("rank") == 2).drop("rank")

# Select specific columns and rename one of them
selected_columns_df = transaction_before_last_df.select(
    F.col("msno"),
    F.col("membership_expire_date").alias("exp_last")
)

# Merge members' output dataset with transaction data
new_tran_df = last_transaction_df.join(selected_columns_df, on=['msno'], how='left')

# Dataframe shape
print(f"shape\n |-- rows: {new_tran_df.count()}\n |-- columns: {len(new_tran_df.columns)}")
new_tran_df.printSchema()

# Show the result
# new_tran_df.show()

<h4>1-2-2-1- Data and Model Noise</h4>

<p align="justify"><b>a)</b> The 'msno' column has matching 'transaction_date' and 'membership_expire_date' values, but differs in other columns such as errors in the 'is_cancel' column, which are distinct for the below example. In this context, the 'exp_last' value is null.</p>

In [None]:
# Filtering rows where "msno" matches a specific value
noise_a = new_tran_df.filter(F.col("msno") == "1M+OGzETqoIR33bo2mzrTP3p8jOFyVdUX5vJ3KLe2ms=")

# Displaying the filtered results
# noise_a.show()

<p align="justify"><b>b)</b> The "msno" shares identical "transaction_date" and "membership_expire_date," but has errors in the "exp_last" column. </p>

In [None]:
# Filtering rows where "msno" matches a specific value
noise_b = new_tran_df.filter(col("msno") == "ya/pbMnE1Bc9NXQIQ3r9avpXJet0hiNQEgy8QMG98ZI=")

# Displaying the filtered results
# noise_b.show(50)

<p align="justify"><b>c)</b> The "msno" has one "transaction_date" and multiple distinct "membership_expire_date" values..</p>

In [None]:
# Filtering rows where "msno" matches a specific value
noise_c = new_tran_df.filter(col("msno") == "pI0cMv4wwhvLTBJpJSoHQrG6pdazfXo77JmRfKoyA6U=")

# Displaying the filtered results
# noise_c.show(50)

<b><ul><li>Noise evaluation</li></ul></b>

In [None]:
def noise_finder(df, param=[]):
    """
    Finds and filters noise in a DataFrame based on specified columns.

    Args:
        df (DataFrame): The input DataFrame.
        param (list): List of column names for grouping and filtering.

    Returns:
        DataFrame: Filtered DataFrame with noise removed.
        DataFrame: Information about duplicate rows.
        DataFrame: "msno" keys for noise identification.
    """
    # Grouping and counting duplicate rows based on specific columns
    noise_duplicate_rows = df.groupBy(param).count().orderBy(F.desc("count"))

    # Filtering rows with a count greater than 1 (i.e., duplicate rows)
    noise_duplicate_rows = noise_duplicate_rows.filter(F.col("count") > 1)

    # Selecting the "msno" column from the DataFrame with duplicate rows
    noise_msno_keys = noise_duplicate_rows.select("msno")
    
    # Joining the original DataFrame with the selected "msno" keys
    noise_filtered_df = df.join(noise_msno_keys, on=["msno"], how="inner")

#     # Print DataFrame shape information
#     print(f"DataFrame Shape:\n |-- Number of Records: {noise_filtered_df.count()}\n |-- Number of Unique Cases: {noise_duplicate_rows.count()}")

#     # Displaying the resulting DataFrame after filtering
#     noise_filtered_df.show()
    
    return noise_filtered_df, noise_duplicate_rows, noise_msno_keys

<b><ul><li>Noise a and b evaluation</li></ul></b>

In [None]:
noise_a_filtered_df, noise_a_duplicate_rows, noise_a_msno_keys = noise_finder(new_tran_df, ["msno", "transaction_date", "membership_expire_date"])

In [None]:
# Grouping and counting duplicate rows based on specific columns
noise_a_duplicate_rows = new_tran_df.groupBy("msno", "transaction_date", "membership_expire_date").count().orderBy(F.desc("count"))

# Filtering rows with a count greater than 1 (i.e., duplicate rows)
noise_a_duplicate_rows = noise_a_duplicate_rows.filter(F.col("count") > 1)

# Dataframe shape
print(f"shape\n |-- rows: {noise_a_duplicate_rows.count()}\n |-- columns: {len(noise_a_duplicate_rows.columns)}")

# Displaying the results with full column content
# noise_a_duplicate_rows.show(truncate=False)

# Selecting the "msno" column from the DataFrame with duplicate rows
noise_a_msno_keys = noise_a_duplicate_rows.select("msno")

In [None]:
# Joining the original DataFrame with the selected "msno" keys
noise_a_filtered_df = new_tran_df.join(noise_a_msno_keys, on=["msno"], how="inner")

# Dataframe shape
print(f"shape\n |-- rows: {noise_a_filtered_df.count()}\n |-- columns: {len(noise_a_filtered_df.columns)}")

# Displaying the resulting DataFrame after filtering
# noise_a_filtered_df.show()

<b><ul><li>Noise c evaluation</li></ul></b>

In [None]:
noise_c_filtered_df, noise_c_duplicate_rows, noise_c_msno_keys = noise_finder(new_tran_df, ["msno", "transaction_date"])

In [None]:
# Grouping and counting duplicate rows based on "msno" and "transaction_date" columns
noise_c_duplicate_rows = new_tran_df.groupBy("msno", "transaction_date").count().orderBy(F.desc("count"))

# Filtering rows with a count greater than 1 (i.e., duplicate rows)
noise_c_duplicate_rows = noise_c_duplicate_rows.filter(F.col("count") > 1)

# Dataframe shape
print(f"shape\n |-- rows: {noise_a_duplicate_rows.count()}\n |-- columns: {len(noise_a_duplicate_rows.columns)}")

# Displaying the results with full column content
# noise_c_duplicate_rows.show(truncate=False)

# Selecting the "msno" column from the DataFrame with duplicate rows
noise_c_msno_keys = noise_c_duplicate_rows.select("msno")

In [None]:
# Joining the original DataFrame with the selected "msno" keys
noise_c_filtered_df = new_tran_df.join(noise_c_msno_keys, on=["msno"], how="inner")

# Dataframe shape
print(f"shape\n |-- rows: {noise_c_filtered_df.count()}\n |-- columns: {len(noise_c_filtered_df.columns)}")

# Displaying the resulting DataFrame after filtering
# noise_c_filtered_df.show()

<h4>2-2-2-1- Observation</h4>

In [None]:
# Create a new DataFrame 'trans_model_df' by subtracting the contents of 'noise_a_filtered_df' and 'noise_c_filtered_df' from 'merge_df'.
trans_model_df = new_tran_df.subtract(noise_a_filtered_df).subtract(noise_c_filtered_df)

# Dataframe shape
print(f"shape\n |-- rows: {trans_model_df.count()}\n |-- columns: {len(trans_model_df.columns)}")
trans_model_df.printSchema()

# Display the resulting 'trans_model_df'.
# trans_model_df.show()

<h4>3-2-1- User Data</h4>

In [None]:
# filter activity data to just the closest to last data (january 2017)
user_filter_df=tables["user"].filter((col("date") >= "20170101") & (col("date") <= "20170131"))

# Selecting the "msno" column from the "trans_model_df" DataFrame
trans_msno_keys = trans_model_df.select("msno")

user_merge_df = user_filter_df.join(trans_msno_keys, on=["msno"], how="inner")

# Dataframe shape
print(f"shape\n |-- rows: {user_merge_df.count()}\n |-- columns: {len(user_merge_df.columns)}")
user_merge_df.printSchema()

In [None]:
user_sum_df = user_merge_df.groupBy("msno").agg(
    F.format_number(F.count('date'),0).alias('activity'),
    F.sum("num_25").alias("sum_num_25"),
    F.sum("num_50").alias("sum_num_50"),
    F.sum("num_75").alias("sum_num_75"),
    F.sum("num_985").alias("sum_num_985"),
    F.sum("num_100").alias("sum_num_100"),
    F.sum("num_unq").alias("sum_num_unq"),
    F.sum("total_secs").alias("total_secs"))

# Dataframe shape
print(f"shape\n |-- rows: {user_sum_df.count()}\n |-- columns: {len(user_sum_df.columns)}")
user_sum_df.printSchema()

# user_sum_df.show()

In [None]:
model_df = trans_model_df.join(user_sum_df, on=["msno"], how="inner")

# Dataframe shape
print(f"shape\n |-- rows: {model_df.count()}\n |-- columns: {len(model_df.columns)}")
model_df.printSchema()

In [None]:
import json
from google.cloud import storage
from google.oauth2 import service_account

In [None]:
import os

# Set the environment variable for credentials
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './customer-churn-391917-150cab3a2647.json'

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Save to GCS") \
    .getOrCreate()

# Assuming you already have the DataFrame model_df
# model_df = ...

# Specify GCS path
gcs_bucket = "gs://kkbox_data_churn"
file_name = "model_main"
gcs_path = f"{gcs_bucket}/{file_name}"

# Save DataFrame to GCS
model_df.write.mode('overwrite').parquet(gcs_path)