In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/document_streaming.invoices?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/document_streaming.invoices?authSource=admin")
         .getOrCreate())

In [2]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "api-ingestion-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
# Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [4]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0130abcf90>

In [5]:
# Write the unconverted dataframe (no strings) message back into Kafka in another topic
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("topic", "spark-output") \
  .option("checkpointLocation", "/tmp") \
  .start() 

In [6]:
def foreach_batch_function(batch_df, epoch_id):
    """
    Write batchDF to MongoDB as a BSON document after transforming it
    into a format suitable for storage in MongoDB.

    Args:
        batch_df (pyspark.sql.DataFrame): The DataFrame to process
        epoch_id (int): The ID of the current epoch

    Returns:
        None
    """
    
    # Convert the 'value' column from a JSON string to a map of key-value pairs
    df_json = batch_df.withColumn("value", from_json(batch_df.value, MapType(StringType(), StringType())))
   
    # Select specific columns from the transformed DataFrame to write to MongoDB
    selected_cols_df = df_json.select(["value.Quantity", "value.UnitPrice", "value.Country",
                                       "value.CustomerID", "value.StockCode", "value.Description",
                                       "value.InvoiceDate", "value.InvoiceNo"])
    
    # Write the selected columns DataFrame to MongoDB
    selected_cols_df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    pass

In [None]:
# Start the MongoDB stream and wait for termination
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()