In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum as spark_sum, avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

In [2]:
# Tạo SparkSession
spark = SparkSession.builder \
    .appName("Taxi Trip Analysis") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2") \
    .getOrCreate()

In [3]:
schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("Airport_fee", DoubleType(), True)
])

In [4]:
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "trip_data") \
    .option("startingOffsets", "earliest") \
    .load()

In [5]:
# Giải mã dữ liệu từ định dạng JSON
trip_data = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [6]:
# Thêm watermark dựa trên tpep_pickup_datetime
trip_data_with_watermark = trip_data.withWatermark("tpep_pickup_datetime", "15 minutes")

In [7]:
# Tính tổng số chuyến đi, tổng doanh thu, và khoảng cách trung bình
trip_analysis = trip_data_with_watermark.groupBy("VendorID").agg(
    spark_sum("total_amount").alias("Total_Revenue"),
    spark_sum("trip_distance").alias("Total_Distance"),
    avg("trip_distance").alias("Average_Distance"),
    spark_sum("passenger_count").alias("Total_Passengers")
)

In [8]:
def process_batch(batch_df, batch_id):
    print(f"Processing batch id: {batch_id}")
    batch_df.show()  # Hiển thị kết quả của batch

# test các output mode khác: append, complete

query = trip_analysis.writeStream \
    .outputMode("update") \
    .foreachBatch(process_batch) \
    .start()

# Chờ tiến trình kết thúc
query.awaitTermination(timeout=60)


Processing batch id: 0
+--------+------------------+-----------------+------------------+----------------+
|VendorID|     Total_Revenue|   Total_Distance|  Average_Distance|Total_Passengers|
+--------+------------------+-----------------+------------------+----------------+
|       1|10560.399999999996|           1620.2| 6.137121212121213|            NULL|
|       2| 51305.01999999999|8741.189999999995|7.1766748768472866|            NULL|
+--------+------------------+-----------------+------------------+----------------+



False

Processing batch id: 1
+--------+-----------------+-----------------+-----------------+----------------+
|VendorID|    Total_Revenue|   Total_Distance| Average_Distance|Total_Passengers|
+--------+-----------------+-----------------+-----------------+----------------+
|       2|51575.48999999999|8779.019999999995|7.160701468189229|            NULL|
+--------+-----------------+-----------------+-----------------+----------------+

Processing batch id: 2
+--------+-----------------+-----------------+------------------+----------------+
|VendorID|    Total_Revenue|   Total_Distance|  Average_Distance|Total_Passengers|
+--------+-----------------+-----------------+------------------+----------------+
|       2|51838.32999999999|8823.809999999996|7.1621834415584384|            NULL|
+--------+-----------------+-----------------+------------------+----------------+

Processing batch id: 3
+--------+-----------------+-----------------+-----------------+----------------+
|VendorID|    Total_Re