In [3]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .getOrCreate())
sc = spark.sparkContext


In [4]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingestion-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [6]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fef494bcb50>

In [7]:
# Write the unvonverted dataframe (no strings)
# message back into Kafka in another topic#
# listen to it with a local consumer
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("topic", "spark-output") \
  .option("checkpointLocation", "/tmp") \
  .start() 

In [22]:
# Write the message into MongoDB

def f(row):
    #dfjson = row.value
    #write that dataframe to mongodb
    #df = spark.read.json(row.value)
    
    dataframe = spark.read.json(row.value)
    
    dataframe.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF

    #writes dataframe with complete kafka message
    #df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    #only get json sring from dataframe
    #value = df.select("value")
    
    # write each row to mongodb (there is only one)
    #df.foreach(f)

    from pyspark.sql.types import MapType,StringType
    from pyspark.sql.functions import from_json
    
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
    
    df3= df2.select("Quantity","UnitPrice","Country","CustomerID","StockCode","Description","InvoiceDate","InvoiceNo")
    
    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    
    
    
    pass

In [None]:
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()