In [1]:
# docker exec -it spark2 bash

# export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
# jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root

#http://127.0.0.1:9888/lab

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, row_number, when, window, current_timestamp, collect_set, round, concat_ws
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("Retailrocket-BatchLayer") \
    .master("spark://spark2:7077") \
    .config("spark.executor.cores", "1") \
    .config("spark.driver.cores", "1") \
    .config("spark.cores.max", "1") \
    .config("spark.cassandra.connection.host", "cassandra1") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,"
            "com.datastax.spark:spark-cassandra-connector_2.12:3.4.0") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b91c906f-fc1d-4dc3-a5d7-084b018b49e6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	foun

In [2]:
df = spark.read.parquet("hdfs://hadoop:8020/data/retailrocket_raw")
# df.createOrReplaceTempView("batch_view")

                                                                                

In [4]:
from cassandra.cluster import Cluster

cluster = Cluster(['cassandra1'], port=9042)
session = cluster.connect()

# Creating a keyspace named event_data_view, where data will be written for kappa
session.execute(
    """
    CREATE KEYSPACE IF NOT EXISTS event_data_view
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """
)

session.set_keyspace("event_data_view")


session.execute("""
    CREATE TABLE IF NOT EXISTS top10_items (
        event text,
        itemid text,
        total_count int,
        rank int,
        PRIMARY KEY (event, rank, itemid)
    )
""")

session.execute("""
    CREATE TABLE IF NOT EXISTS top_abandoned_items (
        itemid text,
        addtocart int,
        transaction int,
        abandoned_count int,
        abandon_rate double,
        PRIMARY KEY (itemid)
    )
""")

session.execute("""
    CREATE TABLE IF NOT EXISTS event_counts_30s (
        event text,
        window_start timestamp,
        window_end timestamp,
        count int,
        PRIMARY KEY (event, window_start)
    )
""")

session.execute("""
    CREATE TABLE IF NOT EXISTS conversion_rate (
        batch_timestamp timestamp PRIMARY KEY,
        n_view bigint,
        addtocart bigint,
        transaction bigint,
        view_to_cart_rate double,
        cart_to_transaction_rate double,
        view_to_transaction_rate double
    )
""")

session.execute("""
    CREATE TABLE IF NOT EXISTS frequent_itemsets (
        items text,
        freq int,
        PRIMARY KEY (items)
    )
""")

session.execute("""
    CREATE TABLE IF NOT EXISTS association_rules (
        antecedent text,
        consequent text,
        confidence double,
        lift double,
        PRIMARY KEY (antecedent, consequent)
    )
""")

# CREATE TABLE IF NOT EXISTS association_rules (
#         antecedent text,   -- sản phẩm(s) ban đầu
#         consequent text,   -- sản phẩm(s) thường đi kèm
#         confidence double,       -- độ tin cậy
#         lift double,             -- độ nâng
#         PRIMARY KEY (antecedent, consequent)
#     )

session.shutdown()
cluster.shutdown()

In [5]:
# 3️⃣ Tính tổng số lượt cho mỗi (itemid, event)
agg = df.groupBy("itemid", "event").agg(count("*").alias("total_count"))

# 4️⃣ Tạo ranking theo từng event
window_spec = Window.partitionBy("event").orderBy(desc("total_count"))
ranked = agg.withColumn("rank", row_number().over(window_spec))

# 5️⃣ Lọc ra Top 10 sản phẩm cho mỗi loại event
top10 = ranked.filter(col("rank") <= 10)

top10.show(5)

# 6️⃣ Ghi vào Cassandra
top10.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="top10_items", keyspace="event_data_view") \
    .save()

                                                                                

+------+---------+-----------+----+
|itemid|    event|total_count|rank|
+------+---------+-----------+----+
|431417|addtocart|          5|   1|
|244506|addtocart|          4|   2|
|   546|addtocart|          4|   3|
|355741|addtocart|          3|   4|
|269610|addtocart|          3|   5|
+------+---------+-----------+----+
only showing top 5 rows



                                                                                

In [6]:
# 3️⃣ Đếm số lượt addtocart và transaction cho từng itemid
agg = df.groupBy("itemid").pivot("event", ["addtocart", "transaction"]).agg(count("*"))

# 4️⃣ Thay null bằng 0
agg = agg.fillna(0, subset=["addtocart", "transaction"])

# 5️⃣ Tính số lượt bị bỏ giỏ và tỷ lệ bỏ giỏ
agg = agg.withColumn("abandoned_count", col("addtocart") - col("transaction")) \
         .withColumn("abandon_rate", 
                     when(col("addtocart") > 0, 
                          (col("addtocart") - col("transaction")) / col("addtocart"))
                     .otherwise(0))

# 6️⃣ Lấy top 10 sản phẩm có tỷ lệ bỏ giỏ cao nhất
top_abandoned = agg.filter(col("addtocart") > 0) \
                   .orderBy(desc("abandon_rate"), desc("abandoned_count")) \
                   .limit(10)

# 7️⃣ Ghi kết quả vào Cassandra
top_abandoned.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="top_abandoned_items", keyspace="event_data_view") \
    .save()

                                                                                

In [7]:
# # 2️⃣ Chuyển timestamp (ms) -> timestamp type
# df = df.withColumn("event_ts", (col("timestamp") / 1000).cast("timestamp"))

# # 3️⃣ Gom nhóm theo cửa sổ 10 giây
# agg_10s = df.groupBy(
#     window(col("event_ts"), "30 seconds"),
#     col("event")
# ).agg(count("*").alias("count"))

# # 4️⃣ Tách cột window thành start / end để lưu
# agg_10s = agg_10s.select(
#     col("window.start").alias("window_start"),
#     col("window.end").alias("window_end"),
#     col("event"),
#     col("count")
# )

# # 5️⃣ Ghi vào Cassandra
# agg_10s.write \
#     .format("org.apache.spark.sql.cassandra") \
#     .mode("append") \
#     .options(table="event_counts_30s", keyspace="event_data_view") \
#     .save()


In [8]:
# 2️⃣ Đếm số lượng từng loại event
event_counts = df.groupBy("event").agg(count("*").alias("count"))

# 3️⃣ Chuyển thành bảng pivot để có cột view/addtocart/transaction
pivot = event_counts.groupBy().pivot("event", ["view", "addtocart", "transaction"]).sum("count")
pivot = pivot.na.fill(0, ["view", "addtocart", "transaction"]) 

# 4️⃣ Tính tỷ lệ chuyển đổi
conversion = pivot.withColumn(
        "view_to_cart_rate",
        round(when(col("view") > 0, col("addtocart") / col("view") * 100).otherwise(0), 2)
    ).withColumn(
        "cart_to_transaction_rate",
        round(when(col("addtocart") > 0, col("transaction") / col("addtocart") * 100).otherwise(0), 2)
    ).withColumn(
        "view_to_transaction_rate",
        round(when(col("view") > 0, col("transaction") / col("view") * 100).otherwise(0), 2)
    ).withColumn(
        "batch_timestamp", current_timestamp()
    ).withColumnRenamed("view", "n_view")

conversion.show()

conversion.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="conversion_rate", keyspace="event_data_view") \
    .save()

                                                                                

+------+---------+-----------+-----------------+------------------------+------------------------+--------------------+
|n_view|addtocart|transaction|view_to_cart_rate|cart_to_transaction_rate|view_to_transaction_rate|     batch_timestamp|
+------+---------+-----------+-----------------+------------------------+------------------------+--------------------+
|   610|      296|        224|            48.52|                   75.68|                   36.72|2025-10-09 10:35:...|
+------+---------+-----------+-----------------+------------------------+------------------------+--------------------+



                                                                                

In [3]:
from pyspark.ml.fpm import FPGrowth


# 3️⃣ Lọc ra các sự kiện mua hàng (hoặc addtocart nếu bạn muốn mở rộng hành vi)
transactions = df.filter(df.event.isin("transaction")).limit(100)

# 4️⃣ Gom nhóm theo visitorid -> danh sách sản phẩm người đó đã mua/thêm giỏ
baskets = transactions.groupBy("visitorid") \
    .agg(collect_set("itemid").alias("items"))

# 5️⃣ Áp dụng FP-Growth để tìm liên kết sản phẩm
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.005, minConfidence=0.2)
model = fpGrowth.fit(baskets)

# 6️⃣ Các tập sản phẩm thường xuất hiện cùng nhau
freqItemsets = model.freqItemsets.limit(10)
freqItemsets.show(10)

# 7️⃣ Luật liên kết giữa các sản phẩm
rules = model.associationRules.drop("support").limit(10)
rules.show(10)

# 9️⃣ Ghi kết quả vào Cassandra
freqItemsets.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="frequent_itemsets", keyspace="event_data_view") \
    .save()

rules.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="association_rules", keyspace="event_data_view") \
    .save()


                                                                                

+--------------------+----+
|               items|freq|
+--------------------+----+
|            [463663]|   1|
|            [333140]|   1|
|    [333140, 275692]|   1|
|            [150215]|   1|
|            [192990]|   1|
|    [192990, 241716]|   1|
|            [210087]|   1|
|    [210087, 333140]|   1|
|[210087, 333140, ...|   1|
|    [210087, 275692]|   1|
+--------------------+----+

+--------------------+----------+----------+----+
|          antecedent|consequent|confidence|lift|
+--------------------+----------+----------+----+
|    [212006, 235880]|  [399192]|       1.0|58.0|
|    [212006, 235880]|   [41963]|       1.0|58.0|
|    [212006, 235880]|     [720]|       1.0|58.0|
|    [212006, 235880]|  [450641]|       1.0|58.0|
|    [212006, 235880]|  [124214]|       1.0|58.0|
|    [212006, 235880]|  [368403]|       1.0|58.0|
|     [248455, 89688]|   [22568]|       1.0|58.0|
|[399192, 41963, 3...|  [450641]|       1.0|58.0|
|[399192, 41963, 3...|     [720]|       1.0|58.0|
|    [1

                                                                                

In [4]:
spark.stop()

25/10/09 10:49:17 WARN ChannelPool: [s0|cassandra1/172.18.0.7:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=f1c9c688-5db3-4b64-bd62-61c825dc2733, APPLICATION_NAME=Spark-Cassandra-Connector-app-20251009104735-0007}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
25/10/09 10:49:17 WARN ControlConnection: [s0] Error connecting to Node(endPoint=cassandra1/172.18.0.7:9042, hostId=4f4074f1-c47c-4b1c-a48f-747120bea1ea, hashCode=44f0f50e), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=f1c9c688-5db3-4b64-bd62-61c825dc2733, APPLICATION_NAME=Spark-Cassandra-Connecto

In [None]:
top_items = spark.sql("""
    SELECT 
        itemid,
        COUNT(*) AS transaction_count
    FROM batch_view
    WHERE event = 'view'
    GROUP BY itemid
    ORDER BY transaction_count DESC
    LIMIT 10
""")

top_items.show()

In [11]:
# spark.stop()