In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Read HDFS CSV") \
    .getOrCreate()

In [None]:
customers = spark.read.csv("hdfs://namenode:9000/raw/telco_customer_churn.csv",
                    header=True, inferSchema=True)
complaints = spark.read.csv("hdfs://namenode:9000/raw/complaints.csv",
                            header=True, inferSchema=True)
payments = spark.read.csv("hdfs://namenode:9000/raw/payments.csv",
                          header=True, inferSchema=True)

In [None]:
from pyspark.sql.functions import col, sum, when

# -----------------------------
def count_nulls(df, df_name):
    print(f"\nNulls count in {df_name}:")
    df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

count_nulls(churn_df, "Churn")
count_nulls(payments_df, "Payments")
count_nulls(tickets_df, "Tickets")

In [None]:
from pyspark.sql.functions import col,sum,when

def count_nulls(df,df_name):
    print(f"\n Nulls count in {df_name}:")
    df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in df.columns]).show()

count_nulls(customers,"customer churn")
count_nulls(payments,"payments")
count_nulls(complaints,"complaints")

In [None]:
customers.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in customers.columns]).show()

In [None]:
#working with 2 columns churn category and churn reason:
#they're null because no customers churn the service, and then fill with "not churned"
#------------Fill-------------
customers = customers.withColumn("Churn Category",
    when(col("Churn Category").isNull(), "Not Churned").otherwise(col("Churn Category"))).withColumn(
    "Churn Reason",when(col("Churn Reason").isNull(), "Not Churned").otherwise(col("Churn Reason")))

In [None]:
#working with internet type and see nulls maybe not subscribe to internet.
#fill it with "no internet"
#-----------Fill----------
customers = customers.withColumn("Internet Type",when(col("Internet Type").isNull(), "No Internet").otherwise(col("Internet Type")))

In [None]:
#working with offer and see nulls maybe no offer for them
#fill it with "No Offer"
customers = customers.withColumn("Offer", when(col("Offer").isNull(),"No Offer").otherwise(col("Offer")))

In [None]:
#Check no nulls
customers.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in ["Churn Category", "Churn Reason","Internet Type","Offer"]]).show()

In [None]:
customers.groupBy("Offer").count().show()

In [None]:
#As we can see, alot of Customers haven't any num of referrals that's cause to don't get offer
customers.filter((col("Offer") == "No Offer") & (col("Number of Referrals") == 0))\
        .select("Customer ID", "Number of Referrals", "Offer").show(10)

In [None]:
customers.groupBy("Internet Type").count().show()

In [None]:
#As we can see, the null values in "Internet Type" don't have internet, so I pass them to "No Internet"
no_internet_df = customers.filter(col("Internet Type") == "No Internet")
no_internet_df.groupBy("Internet Service").count().show()

In [None]:
def check_duplicates(df, df_name):
    total_rows = df.count()
    distinct_rows = df.dropDuplicates().count()
    duplicates = total_rows - distinct_rows
    print(f"{df_name} has {duplicates} duplicate rows.")

check_duplicates(customers, "Customers")
check_duplicates(payments,"Payments")
check_duplicates(complaints,"Complaints")

In [None]:
#Check randomly columns for Consistency
customers.select("Churn").distinct().show()
payments.select("PaymentStatus").distinct().show()
complaints.select("Status").distinct().show()

In [None]:
#Create DWH
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, year, month, dayofmonth

spark = SparkSession.builder.appName("Churn_DWH").getOrCreate()

In [None]:
dim_customers = customers.select(
    "Customer ID", "Age", "Gender", "City", "State", "Country",
    "Married", "Dependents", "Device Protection Plan", "Internet Service",
    "Internet Type", "Contract", "Payment Method", "Churn", "Churn Category"
).withColumnRenamed("Customer ID", "CustomerID")

dim_customers.show(5)

In [None]:
all_dates = payments.select(col("InvoiceDate").alias("date")).union(
    complaints.select(col("DateOpened").alias("date"))
).distinct()

dim_time = all_dates.withColumn("date", to_date("date", "yyyy-MM-dd")) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day", dayofmonth("date"))

dim_time.show(5)

In [None]:
fact_payments = payments.select(
    col("PaymentID").alias("PaymentID"),
    col("customerID").alias("CustomerID"),
    col("InvoiceDate").alias("date"),
    col("AmountDue").cast("double"),
    col("AmountPaid").cast("double"),
    col("PaymentStatus"),
    col("DelayDays").cast("int")
)

fact_payments.show(5)

In [None]:
fact_complaints = complaints.select(
    col("TicketID").alias("TicketID"),
    col("customerID").alias("CustomerID"),
    col("DateOpened").alias("date"),
    col("Category"),
    col("ResolutionTime_Days").cast("int"),
    col("Status")
)

fact_complaints.show(5)

In [None]:
# SK --> for Dim
from pyspark.sql.functions import monotonically_increasing_id

dim_customers = dim_customers.withColumn("customer_sk", monotonically_increasing_id())

dim_time = dim_time.withColumn("time_sk", monotonically_increasing_id())

In [None]:
fact_payments_dw = fact_payments.join(
    dim_customers.select("customer_sk", "CustomerID"),
    fact_payments.CustomerID == dim_customers.CustomerID,
    how="left"
)
fact_payments_dw = fact_payments_dw.join(
    dim_time.withColumnRenamed("date", "date_sk_date"),
    fact_payments_dw.date == F.col("date_sk_date"),
    how="left"
).drop("date_sk_date")

In [None]:
dim_time_renamed = dim_time.withColumnRenamed("date", "time_date")

fact_complaints_dw = complaints.join(
    dim_customers.select("customer_sk", "CustomerID"),
    on="CustomerID",
    how="left"
).join(
    dim_time_renamed,
    complaints["DateOpened"] == F.col("time_date"),
    how="left"
).drop("time_date", "CustomerID")

In [None]:
from pyspark.sql import functions as F

fact_payments_dw = fact_payments.join(
    dim_customers.select("customer_sk", F.col("CustomerID").alias("dim_CustomerID")),
    fact_payments.CustomerID == F.col("dim_CustomerID"),
    how="left"
).join(
    dim_time.withColumnRenamed("date", "date_sk_date"),
    fact_payments.date == F.col("date_sk_date"),
    how="left"
).drop("dim_CustomerID", "date_sk_date")

In [None]:
local_path = "/home/jovyan/local_dw"

dim_customers.write.mode("overwrite").parquet(f"{local_path}/dim_customers")
dim_time.write.mode("overwrite").parquet(f"{local_path}/dim_time")

fact_payments_dw.write.mode("overwrite").parquet(f"{local_path}/fact_payments_dw")
fact_complaints_dw.write.mode("overwrite").parquet(f"{local_path}/fact_complaints_dw")