In [0]:
from pyspark.sql import functions as F

In [0]:
df = spark.read.format("delta").load("/mnt/simple_etl_adls2/silver")

In [0]:
#pivot type
df = df.groupBy("contract", "log_date").pivot("type").sum("total_duration")
df = df.fillna(0)

In [0]:
df = (df.withColumnRenamed("Giai Tri", "giai_tri")
        .withColumnRenamed("Phim Truyen", "phim_truyen")
        .withColumnRenamed("The Thao", "the_thao")
        .withColumnRenamed("Thieu Nhi", "thieu_nhi")
        .withColumnRenamed("Truyen Hinh", "truyen_hinh"))

In [0]:
# Define the category columns
category = ["giai_tri", "phim_truyen", "the_thao", "thieu_nhi", "truyen_hinh"]

# Compute the max value across those columns
df = df.withColumn("max_duration", F.greatest(*[F.col(c) for c in category]))

# Determine the label that corresponds to the max value
df = df.withColumn("most_watch",
    F.when(F.col("max_duration") == F.col("giai_tri"), "Giai Tri")
     .when(F.col("max_duration") == F.col("phim_truyen"), "Phim Truyen")
     .when(F.col("max_duration") == F.col("the_thao"), "The Thao")
     .when(F.col("max_duration") == F.col("thieu_nhi"), "Thieu Nhi")
     .when(F.col("max_duration") == F.col("truyen_hinh"), "Truyen Hinh")
     .otherwise(None)
)

# Drop the intermediate column
df = df.drop("max_duration")

In [0]:

#generate customers' taste by concatenate the categories they have used
category = ["giai_tri", "phim_truyen", "the_thao", "thieu_nhi", "truyen_hinh"]
taste_flags = [F.when(F.col(c).isNotNull(), F.lit(c)) for c in category]
df = df.withColumn("taste", F.concat_ws("-", *taste_flags))

In [0]:
#calculate customers'activeness: being active more than 4 days considering High, otherwise Low
active_df = df.groupBy("contract").agg(F.countDistinct("log_date").alias("day_count"))

df = (df.join(active_df, on="contract", how="left")
        .drop("log_date")
        .withColumn("active", F.when (F.col("day_count") > 4, "High").otherwise("Low"))
        .drop("day_count"))

del active_df

df = df.groupBy("contract").agg(
        F.sum("giai_tri").alias("total_giai_tri"),
        F.sum("phim_truyen").alias("total_phim_truyen"),
        F.sum("the_thao").alias("total_the_thao"),
        F.sum("truyen_hinh").alias("total_truyen_hinh"),
        F.first("most_watch").alias("most_watch"),
        F.first("taste").alias("taste"),
        F.first("active").alias("active")
)

In [0]:
#save to gold table
(df.write.format("delta")
        .mode("overwrite")
        .save("/mnt/simple_etl_adls2/gold"))