In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .appName("KafkaAvroConsumer") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3," \
            "org.apache.spark:spark-avro_2.12:3.5.3") \
    .getOrCreate()

In [3]:
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "green-taxi"
schema_registry_url = "http://localhost:8081"

schema_str = """
{
  "type": "record",
  "name": "GreenTaxiSchema",
  "fields": [
    {"name": "VendorID", "type": ["null", "long"], "default": null},
    {"name": "lpep_pickup_datetime", "type": ["null", "string"], "default": null},
    {"name": "lpep_dropoff_datetime", "type": ["null", "string"], "default": null},
    {"name": "store_and_fwd_flag", "type": ["null", "string"], "default": null},
    {"name": "RatecodeID", "type": ["null", "long"], "default": null},
    {"name": "PULocationID", "type": ["null", "long"], "default": null},
    {"name": "DOLocationID", "type": ["null", "long"], "default": null},
    {"name": "passenger_count", "type": ["null", "long"], "default": null},
    {"name": "trip_distance", "type": ["null", "double"], "default": null},
    {"name": "fare_amount", "type": ["null", "double"], "default": null},
    {"name": "extra", "type": ["null", "double"], "default": null},
    {"name": "mta_tax", "type": ["null", "double"], "default": null},
    {"name": "tip_amount", "type": ["null", "double"], "default": null},
    {"name": "tolls_amount", "type": ["null", "double"], "default": null},
    {"name": "ehail_fee", "type": ["null", "double"], "default": null},
    {"name": "improvement_surcharge", "type": ["null", "double"], "default": null},
    {"name": "total_amount", "type": ["null", "double"], "default": null},
    {"name": "payment_type", "type": ["null", "long"], "default": null},
    {"name": "trip_type", "type": ["null", "long"], "default": null},
    {"name": "congestion_surcharge", "type": ["null", "double"], "default": null}
  ]
}
"""

In [4]:
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

df_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
# Bỏ 5 bytes đầu header Confluent (magic byte + schema id)
# Bắt đầu từ byte thứ 6, substring đánh index từ 1
df_value = df_stream \
    .selectExpr("substring(value, 6, length(value) - 5) as value_binary") \
    .select(from_avro(col("value_binary").cast("binary"), jsonFormatSchema=schema_str).alias("data")) \
    .select("data.*") \
    .withColumn("lpep_pickup_datetime", to_timestamp("lpep_pickup_datetime")) \
    .withColumn("lpep_dropoff_datetime", to_timestamp("lpep_dropoff_datetime"))

df_value.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
# 1 topic có n_partitions thì writeStream ra n_partitions files
# repartition() để giảm số lượng file ghi ra, tuy nhiên ảnh hưởng hiệu suất
df_value.repartition(1).writeStream \
    .format("json") \
    .option("path", "./tmp/no_arvo/output") \
    .option("checkpointLocation", "./tmp/no_arvo/checkpoints") \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .start() \
    .awaitTermination()