In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("Check Data") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [2]:
schema = StructType([
    StructField("event_time", TimestampType(), nullable=False),
    StructField("event_type", StringType(), nullable=False),
    StructField("product_id", StringType(), nullable=False),
    StructField("category_id", StringType(), nullable=False),
    StructField("category_code", StringType(), nullable=True),
    StructField("brand", StringType(), nullable=True),
    StructField("price", DoubleType(), nullable=False),
    StructField("user_id", StringType(), nullable=False),
    StructField("user_session", StringType(), nullable=False),
])


In [7]:

path = "./data/2020-Jan.csv"

df = spark.read.format("csv").option("header", "true").schema(schema).load(path)

input_year = 2020
input_month = 1

df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [8]:
df = df.dropDuplicates() \
        .dropna(subset=["event_time", "brand", "product_id", "category_id", "price", "user_id", "user_session", "category_code"]) \
        .filter(month("event_time") == input_month)\
        .withColumn("category_level_1", split(col("category_code"), "\\.")[0]) \
        .withColumn("category_level_2", split(col("category_code"), "\\.")[1]) \
        .withColumn("category_level_3", split(col("category_code"), "\\.")[2]) \
        .withColumn("category_level_4", split(col("category_code"), "\\.")[3]) \
        .drop("category_code") \
        .withColumn("year", year("event_time")) \
        .withColumn("month", month("event_time")) \
        .withColumn("day", day("event_time"))

In [5]:
df.show(5)

+-------------------+----------+----------+-------------------+-------+-------+---------+--------------------+----------------+----------------+----------------+----------------+----+-----+---+
|         event_time|event_type|product_id|        category_id|  brand|  price|  user_id|        user_session|category_level_1|category_level_2|category_level_3|category_level_4|year|month|day|
+-------------------+----------+----------+-------------------+-------+-------+---------+--------------------+----------------+----------------+----------------+----------------+----+-----+---+
|2020-01-01 07:01:01|      view|   1003317|2232732093077520756|  apple|1055.37|519698804|69b5d72f-fd6e-4fe...|    construction|           tools|           light|            NULL|2020|    1|  1|
|2020-01-01 07:01:11|      view|  10301162|2232732104888681081|     rw|  25.71|595414577|f4d26c60-9753-474...|         apparel|           scarf|            NULL|            NULL|2020|    1|  1|
|2020-01-01 07:02:20|      vie

In [9]:
fact_events = df.withColumn("event_id", monotonically_increasing_id())\
    .withColumn("date_id", to_date(col("event_time"))) \
    .withColumn("hour", hour(col("event_time"))) \
    .withColumn("event_type_id",
                when(col("event_type") == "view", 1)
                .when(col("event_type") == "cart", 2)
                .when(col("event_type") == "purchase", 3)
                .otherwise(None)) \
    .withColumn("revenue", when(col("event_type") == "purchase", col("price")).otherwise(0)) \
    .withColumn("quantity", lit(1)) \
    .select("event_id", "date_id", "hour", "user_id", "user_session", "product_id", "event_type_id", "revenue", "quantity")

In [10]:
fact_events.groupBy("date_id").count().show(40)

+----------+-------+
|   date_id|  count|
+----------+-------+
|2020-01-21|1548408|
|2020-01-17|1650298|
|2020-01-25|1368864|
|2020-01-10|1339919|
|2020-01-24|1344563|
|2020-01-04|1506952|
|2020-01-11|1378439|
|2020-01-13|1390711|
|2020-01-28|1363209|
|2020-01-22|1425997|
|2020-01-03|1529268|
|2020-01-07|1451756|
|2020-01-05|1491489|
|2020-01-06|1457551|
|2020-01-27|1351769|
|2020-01-30|1391017|
|2020-01-12|1445568|
|2020-01-19|1843739|
|2020-01-23|1451858|
|2020-01-16|1490981|
|2020-01-02|1400641|
|2020-01-09|1346191|
|2020-01-31|1659988|
|2020-01-15|1338009|
|2020-01-26|1482827|
|2020-01-20|1617463|
|2020-01-18|1749128|
|2020-01-01| 934734|
|2020-01-14|1382614|
|2020-01-08|1400249|
|2020-01-29|1349780|
+----------+-------+



In [11]:
# === FORECAST NEXT MONTH DAILY REVENUE ===
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import calendar

# Tổng doanh thu theo ngày hiện tại
df_daily = fact_events.groupBy("date_id")\
    .agg(sum("revenue").alias("total_revenue"))\
    .withColumn("day_index", dayofmonth("date_id"))\
    .withColumn("day_of_week", dayofweek("date_id"))\
    .withColumn("is_weekend", col("day_of_week").isin(1, 7).cast("int"))

In [12]:
df_daily.printSchema()

root
 |-- date_id: date (nullable = true)
 |-- total_revenue: double (nullable = true)
 |-- day_index: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- is_weekend: integer (nullable = true)



In [None]:
df_daily.orderBy("day_index").show(35)

In [13]:

# Huấn luyện Linear Regression
feature_cols = ["day_index", "day_of_week", "is_weekend"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_features = assembler.transform(df_daily).select("features", "total_revenue")

lr = LinearRegression(featuresCol="features", labelCol="total_revenue")
model = lr.fit(df_features)

# Tính tháng kế tiếp và số ngày chính xác
next_month = input_month + 1 if input_month < 12 else 1
next_year = input_year if input_month < 12 else input_year + 1
days_in_next_month = calendar.monthrange(next_year, next_month)[1]

# Tạo dataframe day_index cho tháng kế tiếp
future_days = spark.range(1, days_in_next_month + 1).toDF("day_index")

future_days = future_days.withColumn(
    "forecast_date",
    to_date(concat_ws("-", lit(next_year), lpad(lit(next_month), 2, "0"), lpad(col("day_index"), 2, "0")))
)

future_days = future_days.withColumn("day_of_week", dayofweek("forecast_date"))\
    .withColumn("is_weekend", col("day_of_week").isin(1, 7).cast("int"))

df_future = assembler.transform(future_days)
predictions = model.transform(df_future)


In [14]:
predictions.printSchema()

root
 |-- day_index: long (nullable = false)
 |-- forecast_date: date (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- is_weekend: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [15]:
predictions.select("day_index", "forecast_date", "day_of_week", "is_weekend", col("prediction").alias("predicted_revenue")).show(32)

+---------+-------------+-----------+----------+--------------------+
|day_index|forecast_date|day_of_week|is_weekend|   predicted_revenue|
+---------+-------------+-----------+----------+--------------------+
|        1|   2020-02-01|          7|         1|   6766896.070771403|
|        2|   2020-02-02|          1|         1|   7136916.133644527|
|        3|   2020-02-03|          2|         0|   6340911.597130082|
|        4|   2020-02-04|          3|         0|   6415646.008590045|
|        5|   2020-02-05|          4|         0|   6490380.420050009|
|        6|   2020-02-06|          5|         0|   6565114.831509973|
|        7|   2020-02-07|          6|         0|   6639849.242969937|
|        8|   2020-02-08|          7|         1|   7585322.602404309|
|        9|   2020-02-09|          1|         1|   7955342.665277433|
|       10|   2020-02-10|          2|         0|   7159338.128762987|
|       11|   2020-02-11|          3|         0|   7234072.540222951|
|       12|   2020-0

In [None]:

# Gắn cột forecast_date
predictions = predictions.withColumn(
    "forecast_date",
    to_date(concat_ws("-", lit(next_year), lpad(lit(next_month), 2, "0"), lpad(col("day_index"), 2, "0")))
).select("forecast_date", col("prediction").alias("predicted_revenue"))

In [11]:
predictions.show(31)

+-------------+------------------+
|forecast_date| predicted_revenue|
+-------------+------------------+
|   2020-02-01| 5754166.431831241|
|   2020-02-02| 5894458.657382639|
|   2020-02-03| 6034750.882934038|
|   2020-02-04| 6175043.108485436|
|   2020-02-05| 6315335.334036835|
|   2020-02-06| 6455627.559588233|
|   2020-02-07|6595919.7851396315|
|   2020-02-08|  6736212.01069103|
|   2020-02-09| 6876504.236242428|
|   2020-02-10| 7016796.461793827|
|   2020-02-11| 7157088.687345225|
|   2020-02-12| 7297380.912896624|
|   2020-02-13| 7437673.138448022|
|   2020-02-14| 7577965.363999421|
|   2020-02-15| 7718257.589550819|
|   2020-02-16| 7858549.815102218|
|   2020-02-17| 7998842.040653616|
|   2020-02-18| 8139134.266205015|
|   2020-02-19| 8279426.491756413|
|   2020-02-20| 8419718.717307812|
|   2020-02-21|  8560010.94285921|
|   2020-02-22| 8700303.168410609|
|   2020-02-23| 8840595.393962007|
|   2020-02-24| 8980887.619513405|
|   2020-02-25| 9121179.845064804|
|   2020-02-26| 9261

In [7]:
df_hourly = fact_events.groupBy("hour")\
    .agg(sum("revenue").alias("total_revenue"))\
    .orderBy("hour")

In [12]:
df_hourly.show()

+----+--------------------+
|hour|       total_revenue|
+----+--------------------+
|   0|1.0185247299999999E7|
|   1|          9445669.47|
|   2|   7666661.540000001|
|   3|  5063888.1899999995|
|   4|  3350407.4499999997|
|   5|          2236155.63|
|   6|  1503758.6599999995|
|   7|  1157306.7999999998|
|   8|  1314360.9200000002|
|   9|  2888019.8400000003|
|  10|   6866585.619999999|
|  11|1.0618063220000003E7|
|  12|1.3455010950000007E7|
|  13|1.6174469229999999E7|
|  14|1.7776628759999998E7|
|  15|1.9247463419999994E7|
|  16|1.8946388580000006E7|
|  17|1.8527336190000013E7|
|  18|1.6825393369999994E7|
|  19|       1.530009473E7|
+----+--------------------+
only showing top 20 rows



In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=["hour"], outputCol="features")
df_features = assembler.transform(df_hourly).select("features", "total_revenue")

lr = LinearRegression(featuresCol="features", labelCol="total_revenue")
model = lr.fit(df_features)


In [9]:
future_hours = spark.range(0, 24).toDF("hour")
df_future = assembler.transform(future_hours)
predictions = model.transform(df_future)

In [11]:
predictions.select("hour", col("prediction").alias("predicted_revenue")).show()

+----+--------------------+
|hour|   predicted_revenue|
+----+--------------------+
|   0|  3936860.3984666597|
|   1|   4498036.344614486|
|   2|   5059212.290762313|
|   3|    5620388.23691014|
|   4|   6181564.183057967|
|   5|   6742740.129205793|
|   6|   7303916.075353621|
|   7|   7865092.021501447|
|   8|   8426267.967649274|
|   9|     8987443.9137971|
|  10|   9548619.859944927|
|  11|1.0109795806092754E7|
|  12| 1.067097175224058E7|
|  13|1.1232147698388407E7|
|  14|1.1793323644536234E7|
|  15| 1.235449959068406E7|
|  16|1.2915675536831887E7|
|  17|1.3476851482979715E7|
|  18| 1.403802742912754E7|
|  19|1.4599203375275368E7|
+----+--------------------+
only showing top 20 rows



In [16]:
dim_time = df.select(
    hour(col("event_time")).alias("hour")
).distinct()\
.withColumn("hour_label", format_string("%02d:00", col("hour")))\
.withColumn("is_morning", col("hour").between(6, 11))\
.withColumn("is_afternoon", col("hour").between(12, 17))\
.withColumn("is_evening", col("hour").between(18, 21))\
.withColumn("is_night", (col("hour") >= 22) | (col("hour") <= 5))\
.withColumn("hour_group", when(col("hour").between(6,11), "Morning")
                         .when(col("hour").between(12,17), "Afternoon")
                         .when(col("hour").between(18,21), "Evening")
                         .otherwise("Night"))

In [17]:
dim_time.show()

+----+----------+----------+------------+----------+--------+----------+
|hour|hour_label|is_morning|is_afternoon|is_evening|is_night|hour_group|
+----+----------+----------+------------+----------+--------+----------+
|  12|     12:00|     false|        true|     false|   false| Afternoon|
|  22|     22:00|     false|       false|     false|    true|     Night|
|  13|     13:00|     false|        true|     false|   false| Afternoon|
|  16|     16:00|     false|        true|     false|   false| Afternoon|
|  20|     20:00|     false|       false|      true|   false|   Evening|
|  19|     19:00|     false|       false|      true|   false|   Evening|
|  15|     15:00|     false|        true|     false|   false| Afternoon|
|   9|     09:00|      true|       false|     false|   false|   Morning|
|  17|     17:00|     false|        true|     false|   false| Afternoon|
|   8|     08:00|      true|       false|     false|   false|   Morning|
|   7|     07:00|      true|       false|     false

In [21]:
df_cleaned.select("product_id", "category_id").distinct().groupBy("product_id").count().orderBy("count", ascending=False).show()

+----------+-----+
|product_id|count|
+----------+-----+
|   3701313|    1|
| 100038888|    1|
|  10800206|    1|
|   8700495|    1|
|   4802333|    1|
| 100003332|    1|
|  28401278|    1|
|  31700044|    1|
|  16700401|    1|
|   9200351|    1|
|   4200507|    1|
|  21404126|    1|
|  18500057|    1|
|   4803400|    1|
|  50300571|    1|
|  43500040|    1|
|   1005263|    1|
|   1801987|    1|
|   1801967|    1|
|  14300042|    1|
+----------+-----+
only showing top 20 rows



In [22]:
from pyspark.sql import Window
dim_product = df_cleaned.select(
    "product_id", "category_id", "brand", "price"
).distinct().groupBy("product_id", "category_id", "brand").agg(
    round(avg("price"), 2).alias("price")
).distinct()


window = Window.partitionBy("product_id", "category_id").orderBy("brand")
dim_product = dim_product.withColumn("row_number", row_number().over(window)) \
    .filter("row_number = 1")\
    .drop("row_number")\
    .distinct()

dim_product.show()

+----------+-------------------+---------+------+
|product_id|        category_id|    brand| price|
+----------+-------------------+---------+------+
| 100000020|2232732065311228801|  kroskaa| 20.12|
| 100000024|2232732105123562109|  ubisoft| 54.04|
| 100000025|2053013553375346967| spalenka| 16.49|
| 100000040|2053013553375346967|   adamas| 19.05|
| 100000050|2232732081585127530|    delta|119.44|
| 100000070|2232732081585127530|    delta|105.79|
| 100000071|2232732081585127530|    delta| 109.4|
| 100000097|2232732081585127530|    delta|158.31|
| 100000112|2053013563693335403|    delta| 96.01|
| 100000136|2053013556990837237|     emsa| 12.44|
| 100000150|2053013556990837237|  bohemia| 28.87|
| 100000151|2053013565782098913|  respect| 71.82|
| 100000158|2053013556990837237|  bohemia| 26.16|
| 100000210|2053013554415534427|      jvc|334.63|
| 100000215|2053013555573162395|milavitsa| 23.59|
| 100000216|2053013556462354899|      dam| 48.52|
| 100000221|2053013553375346967|belashoff| 23.56|


In [17]:
dim_product.groupBy("product_id").agg(
    count("brand").alias("count brand")
).orderBy("count brand", ascending=False).show()

+----------+-----------+
|product_id|count brand|
+----------+-----------+
| 100000002|          1|
| 100000000|          1|
| 100000009|          1|
| 100000001|          1|
| 100000003|          1|
| 100000010|          1|
| 100000024|          1|
| 100000013|          1|
| 100000019|          1|
| 100000020|          1|
| 100000046|          1|
| 100000022|          1|
| 100000025|          1|
| 100000026|          1|
| 100000056|          1|
| 100000027|          1|
| 100000038|          1|
| 100000028|          1|
| 100000078|          1|
| 100000031|          1|
+----------+-----------+
only showing top 20 rows



In [11]:
dim_product.filter(col("product_id") == "17600723").distinct().show()

+----------+---------+-----+
|product_id|    brand|price|
+----------+---------+-----+
|  17600723|   doliva|19.78|
|  17600723|christina|19.78|
+----------+---------+-----+



In [28]:
dim_date = df.select(
    to_date(col("event_time")).alias("date_id"),
    month("event_time").alias("month"),
    dayofmonth("event_time").alias("day"),
    quarter("event_time").alias("quarter"),
    dayofweek("event_time").alias("day_of_week"),
    date_format(col("date_id"), "EEEE").alias("day_name")
).distinct()

# dim_event_type
dim_event_type = spark.createDataFrame([
    ("view", 1),
    ("cart", 2),
    ("purchase", 3)
], ["event_type_name", "event_type_id"])

# dim_product
dim_product = df_cleaned.select(
    "product_id", "brand", "category_id", col("price").alias("price_current")
).distinct()

# dim_category
dim_category = df_cleaned.select(
    "category_id", "category_level_1", "category_level_2", "category_level_3", "category_level_4"
).distinct()

In [29]:
dim_date.show()

+----------+-----+---+-------+-----------+---------+
|   date_id|month|day|quarter|day_of_week| day_name|
+----------+-----+---+-------+-----------+---------+
|2020-01-01|    1|  1|      1|          4|Wednesday|
|2020-01-02|    1|  2|      1|          5| Thursday|
|2020-01-03|    1|  3|      1|          6|   Friday|
|2020-01-04|    1|  4|      1|          7| Saturday|
|2020-01-05|    1|  5|      1|          1|   Sunday|
|2020-01-06|    1|  6|      1|          2|   Monday|
|2020-01-07|    1|  7|      1|          3|  Tuesday|
|2020-01-08|    1|  8|      1|          4|Wednesday|
|2020-01-09|    1|  9|      1|          5| Thursday|
|2020-01-10|    1| 10|      1|          6|   Friday|
|2020-01-11|    1| 11|      1|          7| Saturday|
|2020-01-12|    1| 12|      1|          1|   Sunday|
|2020-01-13|    1| 13|      1|          2|   Monday|
|2020-01-14|    1| 14|      1|          3|  Tuesday|
|2020-01-15|    1| 15|      1|          4|Wednesday|
|2020-01-16|    1| 16|      1|          5| Thu

In [30]:
df_count_event_by_product = df_cleaned.groupBy("product_id", "event_type").count() \
        .groupBy("product_id") \
        .pivot("event_type", ["view", "cart", "purchase"]) \
        .sum("count") \
        .fillna(0)

df_count_event_by_product = df_count_event_by_product.withColumn("total_events", col("view") + col("cart") + col("purchase"))\
                                                     .withColumn("purchase_conversion", round((col("purchase") / (col("view") + col("cart"))) * 100, 2))\
                                                     .sort("total_events", ascending=False)

In [31]:
df_count_event_by_product.show()

+----------+------+-----+--------+------------+-------------------+
|product_id|  view| cart|purchase|total_events|purchase_conversion|
+----------+------+-----+--------+------------+-------------------+
|   1004767|768721|92406|   36854|      897981|               4.28|
|   1005115|522888|64899|   26401|      614188|               4.49|
|   1005160|454484|38689|   11894|      505067|               2.41|
|   1005100|389261|61243|   26996|      477500|               5.99|
|   4804056|256735|47650|   15442|      319827|               5.07|
|   1002544|257457|35616|   14992|      308065|               5.12|
|   1004873|268510|23133|    9795|      301438|               3.36|
|   1005212|239200|39409|   16553|      295162|               5.94|
|   1005105|250436|25733|   10629|      286798|               3.85|
|   1005174|240034|11977|    5475|      257486|               2.17|
|   1004856|217525|23721|    9008|      250254|               3.73|
|   4804718|218969|10470|    3386|      232825| 

In [None]:
# # === FACT TABLE: fact_summary ===
# fact = fact_events \
#     .join(dim_product, "product_id", "left") \
#     .join(dim_category, "category_id", "left") \
#     .join(dim_time.select("time_id", to_date("event_time").alias("date"), "hour"), "time_id", "left")

# fact_summary = fact.groupBy("date", "hour", "brand", "category_level_1").agg(
#     count(when(col("event_type_id") == 1, True)).alias("view_count"),
#     count(when(col("event_type_id") == 2, True)).alias("cart_count"),
#     count(when(col("event_type_id") == 3, True)).alias("purchase_count"),
#     sum(when(col("event_type_id") == 3, col("price"))).alias("revenue")
# ).withColumn("conversion_rate_view_to_purchase",
#              round(100.0 * col("purchase_count") / when(col("view_count") > 0, col("view_count")), 2)) \
#  .withColumn("conversion_rate_cart_to_purchase",
#              round(100.0 * col("purchase_count") / when(col("cart_count") > 0, col("cart_count")), 2))

In [None]:
df_1 = df.groupBy('category_id','event_type').agg(count('*').alias('counts'))

In [None]:
df_1.groupBy("category_id").pivot("event_type", ["view", "cart"]).sum("counts").show()

In [None]:
df.groupBy("brand").count().orderBy("count", ascending=False).show()

In [None]:
spark.stop()