## Write the Streaming Data from kafka to mongoDB using spark 

In [1]:
#spark session and spark context
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

spark = (SparkSession
        .builder
        .master("local")
        .appName("kafka-mongo-streaming")
        # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
        # Mongo config including the username and password from compose file
        .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
        .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
        .getOrCreate())

sc = spark.sparkContext

In [2]:
# Read the message from the kafka stream
df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "ingestion-topic") \
        .load()

# convert the binary values to string
df1 = df.selectExpr("CAST (key AS STRING)", "CAST (value AS STRING)")

In [3]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [4]:
# Write out the message to the console of the environment
result = spark.sql("SELECT * from message")
result.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f86d8112150>

In [5]:
# Write the unconverted dataframe (no strings)
# message back into Kafka in another topic#
# listen to it with a local consumer
ds = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "spark-output") \
        .option("checkpointLocation", "/tmp") \
        .start()

In [6]:
# Write the message into MongoDB
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF in this foreach

    # writes the dataframe with complete kafka message into mongodb
    #df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2 = df.withColumn("value", from_json(df.value, MapType(StringType(), StringType())))
    
    # Transform the dataframe so that it will have individual columns |
    df3 = df2.select(["value.Quantity","value.UnitPrice","value.Country","value.CustomerID","value.StockCode","value.Description","value.InvoiceDate","value.InvoiceNo"])
    
    # Send the dataframe into MongoDB which will create a BSON document out of it
    df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    pass

In [7]:
# Start the MongoDB stream and wait for termination
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

StreamingQueryException: 'An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco\n    return f(*a, **kw)\n  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value\n    format(target_id, ".", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o61.withColumn.\n: org.apache.spark.sql.AnalysisException: Resolved attribute(s) value#69 missing from key#21,value#22 in operator !Project [key#21, jsontostructs(MapType(StringType,StringType,true), value#69, Some(Etc/UTC)) AS value#70]. Attribute(s) with the same name appear in the operation: value. Please check if the right attribute(s) are used.;;\n!Project [key#21, jsontostructs(MapType(StringType,StringType,true), value#69, Some(Etc/UTC)) AS value#70]\n+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]\n   +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@37333a8b, kafka, Map(subscribe -> ingestion-topic, kafka.bootstrap.servers -> kafka:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5cd93ee,kafka,List(),None,List(),None,Map(subscribe -> ingestion-topic, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:369)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:86)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:86)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)\n\tat org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)\n\tat org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3412)\n\tat org.apache.spark.sql.Dataset.select(Dataset.scala:1340)\n\tat org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2258)\n\tat org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2225)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy\n    return_value = getattr(self.pool[obj_id], method)(*params)\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 191, in call\n    raise e\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 188, in call\n    self.func(DataFrame(jdf, self.sql_ctx), batch_id)\n  File "<ipython-input-6-3e73884b8ef5>", line 9, in foreach_batch_function\n    df2 = df1.withColumn("value", from_json(df.value, MapType(StringType(), StringType())))\n  File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 1997, in withColumn\n    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)\n  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 69, in deco\n    raise AnalysisException(s.split(\': \', 1)[1], stackTrace)\npyspark.sql.utils.AnalysisException: \'Resolved attribute(s) value#69 missing from key#21,value#22 in operator !Project [key#21, jsontostructs(MapType(StringType,StringType,true), value#69, Some(Etc/UTC)) AS value#70]. Attribute(s) with the same name appear in the operation: value. Please check if the right attribute(s) are used.;;\\n!Project [key#21, jsontostructs(MapType(StringType,StringType,true), value#69, Some(Etc/UTC)) AS value#70]\\n+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]\\n   +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@37333a8b, kafka, Map(subscribe -> ingestion-topic, kafka.bootstrap.servers -> kafka:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5cd93ee,kafka,List(),None,List(),None,Map(subscribe -> ingestion-topic, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\\n\'\n\n=== Streaming Query ===\nIdentifier: [id = ec85182e-375d-46c4-ac0e-08b41ac3446f, runId = 21579509-55c2-4c8e-b7db-47f332d41215]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaV2[Subscribe[ingestion-topic]]: {"ingestion-topic":{"0":6}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]\n+- StreamingExecutionRelation KafkaV2[Subscribe[ingestion-topic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'