In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


schema = StructType([
    StructField("properties", StructType([
        StructField("stationId", StringType(), True),
        StructField("timestamp", StringType(), True),
        StructField("temperature", StructType([
            StructField("value", DoubleType(), True)
        ]), True),
        StructField("relativeHumidity", StructType([
            StructField("value", DoubleType(), True)
        ]), True),
        StructField("windSpeed", StructType([
            StructField("value", DoubleType(), True)
        ]), True),
        StructField("textDescription", StringType(), True)
    ]))
])

spark = SparkSession.builder.appName("WeatherStreamProcessor") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5,"
                          "org.mongodb.spark:mongo-spark-connector_2.12:10.5.0") \
    .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.write.database", "weather") \
    .config("spark.mongodb.write.collection", "analytics") \
    .getOrCreate()

# check compatible versions for Structured Streaming + Kafka Integration
# print(spark.version)
# print(spark.sparkContext._jsc.sc().listJars())

raw_binary = spark.readStream.format('kafka').option(
    'kafka.bootstrap.servers', 'localhost:9092').option(
    'subscribe', 'weather_obs').option(
    'startingOffsets', 'latest'
).load()

deserialized_data =raw_binary.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col('json'), schema).alias('data')) \
    .select(
        col("data.properties.stationId").alias("station_id"),
        to_timestamp(col("data.properties.timestamp")).alias("obs_time"),
        col("data.properties.temperature.value").alias("temperature"),
        col("data.properties.relativeHumidity.value").alias("humidity"),
        col("data.properties.windSpeed.value").alias("wind_speed"),
        col("data.properties.textDescription").alias("description")
    )

# preview dataframe in console
# query = deserialized_data.writeStream.format('console').outputMode(
#     'append').trigger(processingTime='10 seconds').option('truncate', False)

# save data in parquet files
query = deserialized_data.writeStream \
        .format('parquet').option('path', './weather_data_parquet') \
        .option('checkpointLocation', '/tmp/checkpoints/weather') \
        .outputMode('append')

# query = deserialized_data.writeStream \
#     .format("mongodb") \
#     .option("uri", "mongodb://localhost:27017") \
#     .option("database", "weather") \
#     .option("collection", "analytics") \
#     .option("checkpointLocation", "/tmp/checkpoints/weather") \
#     .outputMode("append")

query.start().awaitTermination()
spark.stop()