In [1]:
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://127.0.0.1/docstreaming.invoices")
         .config("spark.mongodb.output.uri","mongodb://127.0.0.1/docstreaming.invoices")
         .getOrCreate())
sc = spark.sparkContext


22/01/28 07:46:19 WARN Utils: Your hostname, stepwise resolves to a loopback address: 127.0.1.1; using 192.168.100.32 instead (on interface wlp3s0)
22/01/28 07:46:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/zalda/Desktop/Projects/Learn%20Data%20Engineering/document_streaming_project/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/zalda/.ivy2/cache
The jars for the packages stored in: /home/zalda/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-56d477e5-5c0a-4187-9184-c782d83b5b37;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 i

In [2]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "ingestion-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [4]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

22/01/28 07:46:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-eb5b1336-0f82-4677-823f-195ddd06b6e2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/01/28 07:46:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f880f6aa9a0>

In [5]:
# Write the unvonverted dataframe (no strings)
# message back into Kafka in another topic#
# listen to it with a local consumer
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "spark-output") \
  .option("checkpointLocation", "/tmp") \
  .start() 

22/01/28 07:46:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [6]:
 # Write the message into MongoDB
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF in this foreach

    # writes the dataframe with complete kafka message into mongodb
    df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))    
   
    # Transform the dataframe so that it will have individual columns 
    df3= df2.select(["value.Quantity","value.UnitPrice","value.Country","value.CustomerID","value.StockCode","value.Description","value.InvoiceDate","value.InvoiceNo"])
    
    # Send the dataframe into MongoDB which will create a BSON document out of it
    # df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    df3.write.format("mongo").mode("append").option("database","docstreaming").option("collection", "invoices").save()
    
    pass

In [7]:
# Start the MongoDB stream and wait for termination
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

22/01/28 07:46:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1132d95a-ff04-4074-9a95-7d21917b6bf6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/01/28 07:46:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
+---+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"InvoiceNo": 656...|
+----+--------------------+



                                                                                