In [6]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType, IntegerType, DoubleType, DateType

In [7]:
# create spark session and configure packages to work with kafka and mongodb
spark = (SparkSession
         .builder
         .master('local')  # use local since we don't have a cluster
         .appName('kafka-mongo-streaming')
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0')
         # config to be able to write data into mongo db
         .config('spark.mongodb.output.uri', 'mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin')
         .getOrCreate()
        )

In [8]:
# read message from Kafka ingestion-topic
msgs = (spark
      .readStream
      .format('kafka')
      .option('kafka.bootstrap.servers', 'kafka:9092')
      .option('subscribe', 'ingestion-topic')
      .load()
     )

In [9]:
def write_data_to_mongodb(msgs, epoch_id):
    """Transform and write data into MongoDB
    """
    # cast the msg from bytes to string
    msgs = msgs.selectExpr('CAST(key AS STRING)', 'CAST(value AS STRING)')
    
    # the original msg is a dataframe with col '_id' and 'value'
    docs = msgs.withColumn("json_value", F.from_json(msgs.value, MapType(StringType(), StringType())))
    
    # create a new df with individual columns
    docs = (docs.select(['json_value.InvoiceNo', 'json_value.InvoiceDate', 'json_value.CustomerID', 'json_value.Country',
                      'json_value.StockCode', 'json_value.Description', 'json_value.Quantity', 'json_value.UnitPrice']
          ))
    
    # cast columns to correct data types
    docs = (docs.withColumn('Quantity', F.col('Quantity').cast(IntegerType()))
           .withColumn('UnitPrice', F.col('UnitPrice').cast(DoubleType()))
           .withColumn('InvoiceDate', F.col('InvoiceDate').cast(DateType()))
          )
    
    docs.write.format('com.mongodb.spark.sql.DefaultSource').mode('append').save()
    
    pass

In [None]:
msgs.writeStream.foreachBatch(write_data_to_mongodb).start().awaitTermination()