# Install mysql connector and spark dependencies
* Note that this way is not recommended. Best to use docker to create an image which is ready

In [1]:
!pip install mysql-connector-python



In [2]:
import os
import mysql.connector

def connect():
    return mysql.connector.connect(
        host="jdbc",
        user="username",
        passwd="password",
        database="topics"
    )

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,mysql:mysql-connector-java:5.1.48 pyspark-shell'

In [4]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
import pyspark
from pyspark.sql.functions import unbase64, count, window, col
from pyspark.sql import SparkSession

# Create spark and spark streaming sessions

In [5]:
# Create normal spark session
spark = SparkSession.builder \
                    .appName("test") \
                    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

# Create a spark streaming session from the spark session
# The 5 means how often to 
ssc = StreamingContext(sc, 5)

# Try stream to stream processing

In [6]:
# Read topic
df = spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "ksrv1:19092") \
          .option("subscribe", "mytopic1") \
          .load()

# Aggergation
df = df.groupBy("value") \
       .agg(count("timestamp").alias("count")) \
       .select(col("value").cast("string").alias("key"),
               col("count").cast("string").alias("value"))

# Write to stream
df.writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ksrv1:19092") \
  .option("checkpointLocation", "checkpoint_mytopic2_1") \
  .option("topic", "mytopic2") \
  .outputMode("update") \
  .start() \
  .awaitTermination(20)  

False

# Try stream to DB processing

In [7]:
# Read topic
df_db = spark.readStream \
             .format("kafka") \
             .option("kafka.bootstrap.servers", "ksrv1:19092") \
             .option("subscribe", "mytopic1") \
             .load()

# Aggergation
df_db = df_db.groupBy("value") \
             .agg(count("timestamp").alias("count")) \
             .select(col("value").cast("string").alias("key"),
                     col("count").cast("string").alias("value"))

# Write to DB
def process_batch(df_db, epoch_id):
    pd_df = df_db.toPandas()
    try:
        con = connect()
        cur = con.cursor()
        for i in range(len(pd_df)):
            k = pd_df.iloc[i]["key"]
            v = pd_df.iloc[i]["value"]
            sql = "REPLACE INTO mytopic (`key`, `value`) VALUES ('{}', '{}')".format(k, v)
            cur.execute(sql)
        con.commit()
    except:
        pass
    finally:
        con.close()
    pass
    
query = df_db.writeStream \
             .outputMode("update") \
             .option("checkpointLocation", "checkpoint_mytopic2_2") \
             .foreachBatch(process_batch) \
             .start().awaitTermination(20)


In [None]:
ssc.stop()