In [1]:
from pyspark.sql import SparkSession  #entry point to programming with the Dataset and DataFrame API

from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import from_json

#Start Spark session
spark = (SparkSession
        .builder
        .master('local')
        .appName('kafka-mongo-streaming')
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
        .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/fmi.measurements?authSource=admin")
        .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/fmi.measurements?authSource=admin")
        .getOrCreate())
sc = spark.sparkContext

In [2]:
# Read Kafka stream
df = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "fmi-ingestion-topic")
    .load())
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
# Create temporary view
df1.createOrReplaceTempView("message")

In [4]:
# Output results to the console
result = spark.sql("Select * from message")
result.writeStream.format('console')\
            .outputMode('append')\
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7ffaf118d110>

In [5]:
# Process incoming record
def process_batch(df, epoch_id):
    #df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    df2 = df.withColumn("value", from_json(df.value, MapType(StringType(), StringType())))
    #print(df2)
    df3 = df2.select(['value.time_UTC','value.station_name', 'value.latitude', 'value.longitude',
                     'value.temperature_2m', 'value.wind_speed_10min', 'value.wind_gust_10min', 
                     'value.wind_direction_10min', 'value.relative_humidity', 'value.dew_point_temperature',
                     'value.precipitation_amount_1h', 'value.precipitation_intensity_10min', 'value.snow_depth', 
                     'value.pressure_at_sea_level', 'value.horizontal_visibility', 'value.cloud_amount', 
                     'value.present_weather'])
    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    pass

In [None]:
# Write data to the MongoDB
df1.writeStream.foreachBatch(process_batch).start().awaitTermination()