In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from dotenv import load_dotenv
import os
import logging
import traceback
import json
load_dotenv()

True

In [None]:
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
warnings.filterwarnings('ignore')

In [7]:
###############################################
# Parameters & Arguments
###############################################
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
BUCKET_NAME = os.getenv("BUCKET_NAME")
MINIO_ENDPOINT, MINIO_ACCESS_KEY, BUCKET_NAME


('localhost:9000', 'minio_secret_key', 'raw')

In [8]:
###############################################
# PySpark
###############################################
def create_spark_session():
    """
        Create the Spark Session with suitable configs
    """
    from pyspark.sql import SparkSession

    try: 
        spark = (SparkSession.builder.config("spark.executor.memory", "4g") \
                        .config(
                            "spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.hadoop:hadoop-aws:2.8.2"
                        )
                        .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
                        .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
                        .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
                        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
                        .config("spark.hadoop.fs.s3a.path.style.access", "true")
                        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
                        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
                        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
                        .appName("Streaming Processing Application")
                        .getOrCreate()
        )
        
        logging.info('Spark session successfully created!')

    except Exception as e:
        traceback.print_exc(file=sys.stderr)
        logging.error(f"Couldn't create the spark session due to exception: {e}")

    return spark

In [None]:
spark = create_spark_session()

In [None]:
def create_initial_dataframe(spark_session):
    """
        Reads the streaming data and creates the initial dataframe accordingly
    """
    try: 
        df = (spark_session
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "device.iot.taxi_nyc_time_series")
            # .option("startingOffsets", "earliest")
            .option("failOnDataLoss", "false")
            .load())
        logging.info("Initial dataframe created successfully!")
    except Exception as e:
        logging.warning(f"Initial dataframe could not be created due to exception: {e}")

    return df

In [None]:
BOOTSTRAP_SERVERS = ['localhost:9092', 'localhost:9093', 'localhost:9094']

In [13]:
BOOTSTRAP_SERVERS = ['localhost:9092', 'localhost:9093', 'localhost:9094']

In [15]:
",".join(BOOTSTRAP_SERVERS)

'localhost:9092,localhost:9093,localhost:9094'

In [None]:
df_static = (spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", ",".join(BOOTSTRAP_SERVERS))
        .option("subscribe", "streaming.public.green_trip_raw")
        # .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .option("startingOffsets", "earliest")
        .load())

In [30]:
df = (spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", ",".join(BOOTSTRAP_SERVERS))
        .option("subscribe", "streaming.public.green_trip_raw")
        # .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .option("startingOffsets", "earliest")
        .load())

In [18]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampNTZType, DoubleType, LongType
from pyspark.sql.functions import col, from_json, udf


In [20]:
with open('./schema_config.json', 'r') as f:
    config = json.load(f)

# Define a mapping from type names to PySpark types
type_mapping = {
    "IntegerType": IntegerType(),
    "StringType": StringType(),
    "TimestampNTZType": TimestampNTZType(),
    "DoubleType": DoubleType(),
    "LongType": LongType()
}

In [21]:
# Create the schema based on the configuration file
payload_after_schema = StructType([
    StructField(field["name"], type_mapping[field["type"]], field["nullable"])
    for field in config["fields"]
])

In [22]:
payload_after_schema

StructType([StructField('dolocationid', IntegerType(), True), StructField('pulocationid', IntegerType(), True), StructField('ratecodeid', DoubleType(), True), StructField('vendorid', IntegerType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('passenger_count', DoubleType(), True), StructField('payment_type', IntegerType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('tpep_dropoff_datetime', LongType(), True), StructField('tpep_pickup_datetime', LongType(), True), StructField('trip_distance', DoubleType(), True)])

In [23]:
data_schema = StructType([
    StructField("payload", StructType([
        StructField("after", payload_after_schema, True)
    ]), True)
])

In [24]:
data_schema

StructType([StructField('payload', StructType([StructField('after', StructType([StructField('dolocationid', IntegerType(), True), StructField('pulocationid', IntegerType(), True), StructField('ratecodeid', DoubleType(), True), StructField('vendorid', IntegerType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('passenger_count', DoubleType(), True), StructField('payment_type', IntegerType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('tpep_dropoff_datetime', LongType(), True), StructField('tpep_pickup_datetime', LongType(), True), StructField('trip_distance', DoubleType(), True)]), True)]), True)])

In [25]:
parsed_df = df.selectExpr("CAST(value AS STRING) as json") \
                .select(from_json(col("json"), data_schema).alias("data")) \
                .select("data.payload.after.*")

In [28]:
print(df)

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]


In [32]:
query = df \
    .writeStream \
    .format("console") \
    .option("truncate", False) \
    .start()

In [33]:
query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = a06088e9-4517-40f5-86cd-dd728c7af118, runId = f3e53613-ecf5-46fe-b45a-2faa4f91c8dc] terminated with exception: Job aborted due to stage failure: Task serialization failed: java.nio.file.NoSuchFileException: C:\Users\viett\AppData\Local\Temp\blockmgr-93b84c72-1b8a-4d23-bffc-8c6962c89754\0e
java.nio.file.NoSuchFileException: C:\Users\viett\AppData\Local\Temp\blockmgr-93b84c72-1b8a-4d23-bffc-8c6962c89754\0e
	at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
	at java.base/sun.nio.fs.WindowsFileSystemProvider.createDirectory(WindowsFileSystemProvider.java:509)
	at java.base/java.nio.file.Files.createDirectory(Files.java:690)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:108)
	at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:133)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2076)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1551)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
	at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:1425)
	at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1924)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:154)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1657)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1585)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1402)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1337)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3003)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


In [26]:
parsed_df

DataFrame[dolocationid: int, pulocationid: int, ratecodeid: double, vendorid: int, congestion_surcharge: double, extra: double, fare_amount: double, improvement_surcharge: double, mta_tax: double, passenger_count: double, payment_type: int, tip_amount: double, tolls_amount: double, total_amount: double, tpep_dropoff_datetime: bigint, tpep_pickup_datetime: bigint, trip_distance: double]

In [None]:
parsed_df = parsed_df \
    .withColumn("tpep_pickup_datetime", (col("tpep_pickup_datetime") / 1000000).cast("timestamp")) \
    .withColumn("tpep_dropoff_datetime", (col("tpep_dropoff_datetime") / 1000000).cast("timestamp"))

parsed_df.createOrReplaceTempView("nyc_taxi_view")

df_final = spark.sql("""
    SELECT
        * 
    FROM public.green_trip_raw
""")