# Streaming data from kafka into MongoDB

This notebook consumes messages from a Kafka topic, and then inserts them into a MongoDB Collection

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import from_json

#### Create Spark Session and Spark Context

In [11]:
spark = (SparkSession
        .builder.master('local')
        .appName("kafka-mongo-streaming")
        # Add kafka package and mongodb package as one string
        # Versions need to match the Spark version
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
        # Mongo config including the username and password
        .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
        .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
        .getOrCreate())

sc = spark.sparkContext

#### Read the message from the kafka stream, convert binary values to string

In [12]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "ingestiontopic") \
    .option("failOnDataLoss", "false") \
    .load()

df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

#### Create Temporary view for SparkSQL

In [13]:
df1.createOrReplaceTempView("message")

#### Write message to the console of the environment (terminal)

In [14]:
res = spark.sql("SELECT * FROM message")
res.writeStream.format("console") \
        .outputMode("append") \
        .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f7c2c345910>

#### Now, write the unconverted dataframe message back to another topic in Kafka and listen to it with a local consumer

In [15]:
ds = df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "sparkoutput") \
    .option("checkpointLocation", "/tmp") \
    .start()

#### Write message to MongoDB

The function transforms and writes the batch dataframe, with complete kafka message into mongodb.

In [16]:
def foreach_batch_function(df, epoch_id):
    # Transform the values of all rows in the column value and create a dataframe out of it (one row)
    df2 = df.withColumn("value", from_json(df.value, MapType(StringType(), StringType())))
    # Transform the dataframe to have individual columns
    df3 = df2.select(["value.Quantity", "value.UnitPrice", "value.Country", "value.CustomerID",\
                      "value.StockCode", "value.Description", "value.InvoiceDate", "value.InvoiceNo"])
    # Send the dataframe into MongoDB which will create a BSON?? document out of it
    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    pass

#### Start the mongodb stream and wait for completion

In [17]:
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination

<bound method StreamingQuery.awaitTermination of <pyspark.sql.streaming.StreamingQuery object at 0x7f7c2c1d9a90>>