In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("KafkaStreamingAnalysis") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# Đọc dữ liệu từ Kafka
df_kafka = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()

# Ép kiểu chuỗi
lines = df_kafka.selectExpr("CAST(value AS STRING) as csv")

# Tách các cột từ CSV
columns = ["User", "Card", "Year", "Month", "Day", "Time", "Amount", "Use Chip",
           "Merchant Name", "Merchant City", "Merchant State", "Zip", "MCC", "Errors?", "Is Fraud?"]

df_parsed = lines.selectExpr("split(csv, ',') as data") \
    .select([col("data")[i].alias(columns[i]) for i in range(len(columns))]) \
    .filter(
        (col("Year").isNotNull()) & (length(col("Year")) > 0) &
        (col("Month").isNotNull()) & (length(col("Month")) > 0) &
        (col("Day").isNotNull()) & (length(col("Day")) > 0) &
        (col("Time").isNotNull()) & (length(col("Time")) > 0)
    ) \
    .withColumn("Amount_casted", regexp_replace("Amount", "[$]", "").cast("float")) \
    .withColumn("event_time",
        to_timestamp(
            concat(
                col("Year"), lit("-"),
                lpad(col("Month"), 2, '0'), lit("-"),
                lpad(col("Day"), 2, '0'), lit(" "),
                col("Time"), lit(":00")
            ),
            "yyyy-MM-dd HH:mm:ss"
        )
    )


# Thêm watermark và tính tổng số giao dịch theo cửa sổ 1 giờ
agg_by_window = df_parsed \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "1 hour")) \
    .agg(count("*").alias("num_transactions"))

# Tách cột window thành 2 cột riêng để ghi ra CSV
agg_result = agg_by_window \
    .withColumn("window_start", col("window.start")) \
    .withColumn("window_end", col("window.end")) \
    .drop("window")

# Ghi ra HDFS dưới dạng file CSV
agg_result.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "hdfs://localhost:9000/transactions_by_hour") \
    .option("checkpointLocation", "hdfs://localhost:9000/checkpoints_DE") \
    .start()

spark.streams.awaitAnyTermination()


:: loading settings :: url = jar:file:/opt/spark-3.5.5/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/panda/.ivy2/cache
The jars for the packages stored in: /home/panda/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-581c12e5-c5c4-475b-991e-7777d3e57a93;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 456ms :: artifacts dl 15ms
	:: m

Py4JError: An error occurred while calling o164.awaitAnyTermination

25/06/26 12:52:49 ERROR MicroBatchExecution: Query [id = c16d1a80-dbfc-459b-8f8a-2e93081cb193, runId = 1306a057-ffaf-4eff-b563-60556009fce0] terminated with error
java.io.FileNotFoundException: File hdfs://localhost:9000/transactions_by_hour/_spark_metadata does not exist.
	at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:316)
	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1915)
	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1911)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1917)
	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1876)
	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1835)
	at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:315)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.listBatches(HDFSMetadataLog.scala:32

# Version 2 for Power BI

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("KafkaStreamingAnalysis") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# Đọc dữ liệu từ Kafka
df_kafka = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()

# Ép kiểu chuỗi
lines = df_kafka.selectExpr("CAST(value AS STRING) as csv")

# Tách các cột từ CSV
columns = ["User", "Card", "Year", "Month", "Day", "Time", "Amount", "Use Chip",
           "Merchant Name", "Merchant City", "Merchant State", "Zip", "MCC", "Errors?", "Is Fraud?"]

df_parsed = lines.selectExpr("split(csv, ',') as data") \
    .select([col("data")[i].alias(columns[i]) for i in range(len(columns))]) \
    .filter(
        (col("Year").isNotNull()) & (length(col("Year")) > 0) &
        (col("Month").isNotNull()) & (length(col("Month")) > 0) &
        (col("Day").isNotNull()) & (length(col("Day")) > 0) &
        (col("Time").isNotNull()) & (length(col("Time")) > 0)
    ) \
    .withColumn("Amount_casted", regexp_replace("Amount", "[$]", "").cast("float")) \
    .withColumn("event_time",
        to_timestamp(
            concat(
                col("Year"), lit("-"),
                lpad(col("Month"), 2, '0'), lit("-"),
                lpad(col("Day"), 2, '0'), lit(" "),
                col("Time"), lit(":00")
            ),
            "yyyy-MM-dd HH:mm:ss"
        )
    ) \
    .withColumnRenamed("Errors?", "Errors") \
    .withColumnRenamed("Is Fraud?", "Is Fraud")


df_parsed_selected = df_parsed.select("User", "Card", "event_time", "Amount_casted", "Use Chip",
           "Merchant Name", "Merchant City", "Merchant State", "Zip", "MCC", "Errors", "Is Fraud")

# Ghi ra HDFS dưới dạng file CSV
df_parsed_selected.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("header", "true") \
    .option("path", "hdfs://localhost:9000/transactions_by_hour") \
    .option("checkpointLocation", "hdfs://localhost:9000/checkpoints_DE") \
    .start()

spark.streams.awaitAnyTermination()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Errors?` cannot be resolved. Did you mean one of the following? [`Errors`, `Amount`, `Card`, `Month`, `User`].;
'Project [User#1259, Card#1260, event_time#1306, Amount_casted#1289, Use Chip#1266, Merchant Name#1267, Merchant City#1268, Merchant State#1269, Zip#1270, MCC#1271, 'Errors?, 'Is Fraud?]
+- Project [User#1259, Card#1260, Year#1261, Month#1262, Day#1263, Time#1264, Amount#1265, Use Chip#1266, Merchant Name#1267, Merchant City#1268, Merchant State#1269, Zip#1270, MCC#1271, Errors#1324, Is Fraud?#1273 AS Is_Fraud#1342, Amount_casted#1289, event_time#1306]
   +- Project [User#1259, Card#1260, Year#1261, Month#1262, Day#1263, Time#1264, Amount#1265, Use Chip#1266, Merchant Name#1267, Merchant City#1268, Merchant State#1269, Zip#1270, MCC#1271, Errors?#1272 AS Errors#1324, Is Fraud?#1273, Amount_casted#1289, event_time#1306]
      +- Project [User#1259, Card#1260, Year#1261, Month#1262, Day#1263, Time#1264, Amount#1265, Use Chip#1266, Merchant Name#1267, Merchant City#1268, Merchant State#1269, Zip#1270, MCC#1271, Errors?#1272, Is Fraud?#1273, Amount_casted#1289, to_timestamp(concat(Year#1261, -, lpad(Month#1262, 2, 0), -, lpad(Day#1263, 2, 0),  , Time#1264, :00), Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Asia/Ho_Chi_Minh), false) AS event_time#1306]
         +- Project [User#1259, Card#1260, Year#1261, Month#1262, Day#1263, Time#1264, Amount#1265, Use Chip#1266, Merchant Name#1267, Merchant City#1268, Merchant State#1269, Zip#1270, MCC#1271, Errors?#1272, Is Fraud?#1273, cast(regexp_replace(Amount#1265, [$], , 1) as float) AS Amount_casted#1289]
            +- Filter (((((((isnotnull(Year#1261) AND (length(Year#1261) > 0)) AND isnotnull(Month#1262)) AND (length(Month#1262) > 0)) AND isnotnull(Day#1263)) AND (length(Day#1263) > 0)) AND isnotnull(Time#1264)) AND (length(Time#1264) > 0))
               +- Project [data#1257[0] AS User#1259, data#1257[1] AS Card#1260, data#1257[2] AS Year#1261, data#1257[3] AS Month#1262, data#1257[4] AS Day#1263, data#1257[5] AS Time#1264, data#1257[6] AS Amount#1265, data#1257[7] AS Use Chip#1266, data#1257[8] AS Merchant Name#1267, data#1257[9] AS Merchant City#1268, data#1257[10] AS Merchant State#1269, data#1257[11] AS Zip#1270, data#1257[12] AS MCC#1271, data#1257[13] AS Errors?#1272, data#1257[14] AS Is Fraud?#1273]
                  +- Project [split(csv#1255, ,, -1) AS data#1257]
                     +- Project [cast(value#1242 as string) AS csv#1255]
                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5740ccb6, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7aa83a94, [startingOffsets=earliest, kafka.bootstrap.servers=localhost:9092, subscribe=transactions], [key#1241, value#1242, topic#1243, partition#1244, offset#1245L, timestamp#1246, timestampType#1247], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6aa46774,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> transactions, startingOffsets -> earliest),None), kafka, [key#1234, value#1235, topic#1236, partition#1237, offset#1238L, timestamp#1239, timestampType#1240]
