# Data Engineering Zoomcamp 2024 - Homework 6
## Question 6. Parsing the data

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"
kafka_clients_jar = 'org.apache.kafka:kafka-clients:3.4.1'

In [3]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("test_hw06_question6") \
    .config("spark.jars.packages", f"{kafka_jar_package},{kafka_clients_jar}") \
    .getOrCreate()

:: loading settings :: url = jar:file:/Users/mhack/.pyenv/versions/3.10.3/envs/pyspark-kafka/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/mhack/.ivy2/cache
The jars for the packages stored in: /Users/mhack/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e71eb352-1dda-4f23-92c2-b7ba74d9140a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found com.github.luben#zstd-jni;1.5.5-4 in central
	found org.lz4#lz4-java;1.8.0 in local-m2-cache
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in local-m2-cache
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.0 in ce

In [4]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()


In [5]:
green_stream.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
from pyspark.sql import types
from pyspark.sql import functions as F

In [7]:
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.IntegerType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())


In [8]:
# def parse_schema(df_stream, schema):
#     assert df_stream.isStreaming is True, 'DataFrame is not streaming'

#     return df_stream \
#     .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
#     .select("data.*")


In [9]:
parsed_df = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [10]:
parsed_df.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [11]:
# parsed_df.show()
# Queries with streaming sources must be executed with writeStream.start();

In [12]:
query = parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(once=True) \
    .start()
query.awaitTermination()
query.stop()


24/03/14 15:24:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/g0/707fw8gn4y1dd16yfrl8xpkm0000gn/T/temporary-d1f97f69-8009-4413-9d0b-5b12673c954b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 15:24:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/14 15:24:13 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
| 2019-10-01 00:26:02|  2019-10-01 00:39:58|         112|         196|              1|         5.88|       0.0|
| 2019-10-01 00:18:11|  2019-10-01 00:22:38|          43|         263|              1|          0.8|       0.0|
| 2019-10-01 00:09:31|  2019-10-01 00:24:47|         255|         228|              2|          7.5|       0.0|
| 2019-10-01 00:37:40|  2019-10-01 00:41:49|         181|         181|              1|          0.9|       0.0|
| 2019-10-01 00:08:13|  2019-10-01 00:17:56|          97|         188|              1|         2.52|      2.26|
| 2019-