In [1]:
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .getOrCreate())
sc = spark.sparkContext


In [2]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingestion-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [4]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7faf3329c150>

In [5]:
# Write the unvonverted dataframe (no strings)
# message back into Kafka in another topic#
# listen to it with a local consumer
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("topic", "spark-output") \
  .option("checkpointLocation", "/tmp") \
  .start() 

In [6]:
 

# Write the message into MongoDB
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF in this foreach

    # writes the dataframe with complete kafka message into mongodb
    df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    #df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))    
   
    # Transform the dataframe so that it will have individual columns 
    #df3= df2.select(["value.Quantity","value.UnitPrice","value.Country","value.CustomerID","value.StockCode","value.Description","value.InvoiceDate","value.InvoiceNo"])
    
    # Send the dataframe into MongoDB which will create a BSON document out of it
    #df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    pass

In [None]:
# Start the MongoDB stream and wait for termination
df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()