In [11]:
from pyspark.sql import SparkSession,Window
from pyspark.sql.functions import*

spark = SparkSession.builder.appName("test").getOrCreate()

df = spark.read.csv("./data/part-10/2019-10-10.csv",header=True, inferSchema=True)
df.show(2)




+-------------------+----------+----------+-------------------+--------------------+---------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|    brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+---------+-------+---------+--------------------+
|2019-10-10 00:00:00|      view|   1801922|2053013554415534427|electronics.video.tv|  samsung|1544.16|513417348|05d19a8c-6e5e-460...|
|2019-10-10 00:00:00|      view|  35108572|2070005009382114061|   apparel.underwear|milavitsa|  12.85|557616497|4c08a587-46cc-492...|
+-------------------+----------+----------+-------------------+--------------------+---------+-------+---------+--------------------+
only showing top 2 rows



In [None]:
df_profile = df.filter(col("event_type") == "purchase") \
        .groupBy("user_id").agg(
            min("event_time").alias("first_purchase_date"),
            max("event_time").alias("last_purchase_date"),
            round(sum("price"), 2).alias("total_spent"),
            count("product_id").alias("total_orders")
        )
    # Phân loại khách hàng theo tổng chi tiêu
df_profile = df_profile.withColumn("customer_tier",
        when(col("total_spent") >= 10000000, "VIP")
        .when(col("total_spent") >= 3000000, "Medium")
        .otherwise("Low"))
    
# top category mua nhiều nhất
df_category = df.filter(col("event_type") == "purchase") \
        .groupBy("user_id", "category_id", "category_code") \
        .agg(count("*").alias("t"))
window_category = Window.partitionBy("user_id").orderBy(desc("t"))
df_preferred_category = df_category.withColumn("rank", rank().over(window_category)) \
        .filter(col("rank") <= 3).select("user_id","category_code")\
        .groupBy("user_id").agg(collect_list("category_code").alias("preferred_category"))


#top brand mua nhieu nhat
df_brand = df.filter(col("event_type") == "purchase") \
        .groupBy("user_id", "brand") \
        .agg(count("*").alias("t"))
    
window_brand = Window.partitionBy("user_id").orderBy(desc("t"))
df_preferred_brand = df_brand.withColumn("rank", rank().over(window_brand)) \
        .filter(col("rank") <= 3)\
        .groupBy("user_id").agg(collect_list("brand").alias("preferred_brands"))

df_churn = df.groupBy("user_id").agg(
        min(col("event_time")).alias("first_activity_date"),
        max(when(col("event_type") == "purchase", col("event_time"))).alias("last_purchase_date"),
        max(when(col("event_type").isin("view","cart"), col("event_time"))).alias("last_active_day")
    )
df_churn = df_churn.withColumn("day_since_last_purchase", datediff(col("last_purchase_date"),col("first_activity_date")))\
                .withColumn("day_since_first_activity", datediff(col("last_active_day"),col("first_active_date")))

df_churn = df_churn.withColumn("chunk_risk",
                                   when(col("day_since_last_purchase") > 30, "High")
                                   .when((col("day_since_last_purchase") > 15) & (col("day_since_first_activity") < 30), "Potential")
                                   .when(col("day_since_last_purchase" )> 15, "Medium")
                                   .otherwise("Low")
                                   )

df_profile = df_profile.join(df_preferred_category, "user_id", "left")\
                            .join(df_preferred_brand,"user_id","left")\
                            .join(df_churn,"user_id","left")
df_profile.show(5)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `first_active_date` cannot be resolved. Did you mean one of the following? [`first_activity_date`, `last_active_day`, `last_purchase_date`, `user_id`, `day_since_last_purchase`].;
'Project [user_id#1040, first_activity_date#1551, last_purchase_date#1553, last_active_day#1555, day_since_last_purchase#1560, datediff(last_active_day#1555, 'first_active_date) AS day_since_first_activity#1566]
+- Project [user_id#1040, first_activity_date#1551, last_purchase_date#1553, last_active_day#1555, datediff(cast(last_purchase_date#1553 as date), cast(first_activity_date#1551 as date)) AS day_since_last_purchase#1560]
   +- Aggregate [user_id#1040], [user_id#1040, min(event_time#1033) AS first_activity_date#1551, max(CASE WHEN (event_type#1034 = purchase) THEN event_time#1033 END) AS last_purchase_date#1553, max(CASE WHEN event_type#1034 IN (view,cart) THEN event_time#1033 END) AS last_active_day#1555]
      +- Relation [event_time#1033,event_type#1034,product_id#1035,category_id#1036L,category_code#1037,brand#1038,price#1039,user_id#1040,user_session#1041] csv


In [None]:
#top brand mua nhieu nhat
df_prefered_brand = df.filter(col("event_type") == "purchase").groupBy(col('user_id'),col('brand')).agg(count('*').alias('num_purchase_brand'))

window = Window.partitionBy("user_id").orderBy(desc('num_purchase_brand'))

df_prefered_brand = df_prefered_brand.withColumn("rank", rank().over(window)).filter(col('rank') <= 3)

df_top_3_brands = df_prefered_brand.groupBy("user_id").agg(collect_list("brand").alias("top_3_brands"))

df_top_3_brands.show(3)

+---------+------------+
|  user_id|top_3_brands|
+---------+------------+
|401646272|     [sonel]|
|469922078|  [babyliss]|
|482482539|     [elari]|
+---------+------------+
only showing top 3 rows



In [6]:
#static customer profile
df_scp = df.filter(col("event_type") == "purchase")\
    .groupBy(col("user_id")).agg(
        min("event_time").alias("first_purchase_date"), 
        max("event_time").alias("last_purchase_date"),
        round(sum("price"),2).alias("total_spend"),
        count("product_id").alias("total_orders")
        )
df_scp.show(2)


+---------+-------------------+-------------------+-----------+------------+
|  user_id|first_purchase_date| last_purchase_date|total_spend|total_orders|
+---------+-------------------+-------------------+-----------+------------+
|527958941|2019-10-10 00:02:16|2019-10-10 00:03:58|    1439.18|           2|
|516897905|2019-10-10 00:16:15|2019-10-10 00:16:15|       65.9|           1|
+---------+-------------------+-------------------+-----------+------------+
only showing top 2 rows



In [None]:
# top category mua nhiều nhất
df_preferred_category = df.filter(col("event_type")=="purchase").groupBy("user_id","category_id","category_code").agg(count("*").alias("t"))
window = Window.partitionBy("user_id").orderBy(desc("t"))
df_preferred_category = df_preferred_category.withColumn("rank", rank().over(window))\
                        .filter(col("rank") < 3)

df_preferred_category__ = df_preferred_category.groupBy(col("user_id")).agg(collect_list("category_code").alias("most_purchase_category"))
      
df_preferred_category__.show(5)

+---------+----------------------+
|  user_id|most_purchase_category|
+---------+----------------------+
|401646272|                    []|
|469922078|                    []|
|482482539|  [electronics.audi...|
|489320606|  [electronics.smar...|
|497976822|                    []|
+---------+----------------------+
only showing top 5 rows



In [89]:
window = Window.partitionBy("user_id").orderBy(desc("event_time"))
df_last_product = df.filter(col("event_type") == "purchase") \
    .withColumn("rank", rank().over(window)) \
    .filter(col("rank") < 3).select("user_id", "product_id")

df__=df_last_product.groupBy("user_id").agg((collect_list("product_id")).alias("recent_purchase"))
df__.show(10)

+---------+------------------+
|  user_id|   recent_purchase|
+---------+------------------+
|401646272|        [13300559]|
|469922078|         [5300259]|
|482482539|         [4804409]|
|489320606|         [1005107]|
|497976822|        [12702958]|
|511064318|        [22700866]|
|512363973|[1004838, 1004838]|
|512364293|[1004768, 1004768]|
|512364372|         [1002524]|
|512364709|         [1307445]|
+---------+------------------+
only showing top 10 rows



In [None]:
# product view fact
df_pvf = df.groupBy("product_id",).agg(
        count(when(col("event_type") == "view", True)).alias("total_views"),
        count(when(col("event_type") == "cart", True)).alias("total_carts"),
        count(when(col("event_type") == "purchase", True)).alias("total_purchases"),
        round(sum(when(col("event_type") == "purchase", col("price")).otherwise(0)),2).alias("total_revenue")
    )
df_pvf = df_pvf.withColumn("conversion_rate", round(col("total_purchases")/col("total_views"),2)) \
                .withColumn("cart_abandon_rate",
                            when(col("total_carts") > 0, round((col("total_carts")-col("total_purchases"))/col("total_carts"),2))
                            .otherwise(0))
                # .withColumn()
df_pvf.show(10)

+----------+-----------+-----------+---------------+-------------+---------------+-----------------+
|product_id|total_views|total_carts|total_purchases|total_revenue|conversion_rate|cart_abandon_rate|
+----------+-----------+-----------+---------------+-------------+---------------+-----------------+
|  48300076|          5|          0|              0|          0.0|            0.0|              0.0|
|   1004739|       4277|        349|            172|     32587.48|           0.04|             0.51|
|  29100052|         65|          0|              1|        32.33|           0.02|              0.0|
|   1004666|        175|          4|              4|      3559.72|           0.02|              0.0|
|  17100091|          3|          0|              0|          0.0|            0.0|              0.0|
|   2900536|        560|          0|             23|      1183.58|           0.04|              0.0|
|  21406776|         21|          0|              0|          0.0|            0.0|         

In [None]:
#phân loại khách hàng
