In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType,StringType

In [3]:
#connecton to database
#Note :- Unable to data to MLysql there is driver problem with docker version and MYSQL connector driver 
# to connect the database to store live data 

def save_to_mysql(current_df,epoc_id):
    db_credentials={"user":"root",
                    "password":"password",
                    "driver":"com.mysql.jdbc.Driver"}
    print("Printing epoc_id: ")
    print(epoc_id)
    print("Writing to Mysql : "+str(epoc_id))
    
    current_df\
    .write\
    .jdbc(url="jdbc:mysql://192.168.99.100:3306/twitterdb",table="twitter",mode="append",properties=db_credentials)
    print("Saved to Mysql : "+str(epoc_id))
    

In [4]:
if __name__=='__main__':
    
    print('Welcome to Spark Streaming')
    print('Data processing Started')
    
    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .config("spark.jars", "spark-sql-kafka-0-10_2.11-2.4.5.jar,kafka-clients-1.1.0.jar,jsr166e-1.1.0.jar") \
        .config("spark.executor.extraClassPath", "spark-sql-kafka-0-10_2.11-2.4.5.jar,kafka-clients-1.1.0.jar,jsr166e-1.1.0.jar") \
        .config("spark.executor.extraLibrary", "spark-sql-kafka-0-10_2.11-2.4.5.jar,kafka-clients-1.1.0.jar,jsr166e-1.1.0.jar") \
        .config("spark.driver.extraClassPath", "spark-sql-kafka-0-10_2.11-2.4.5.jar,kafka-clients-1.1.0.jar,jsr166e-1.1.0.jar") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    print(" kafka Started ...")

    # Construct a streaming DataFrame that reads from testtopic
    transaction_detail_df = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", "192.168.99.100:9092")\
        .option("subscribe", "tweet")\
        .option("startingOffsets", "latest")\
        .load()
    
    print("Printing Schema of kafka transaction_detail_df: ")
    
    transaction_detail_df.printSchema()
        
    twitter_transaction_df = transaction_detail_df.selectExpr("CAST(value AS STRING)", "timestamp")
    
    print("Printing Schema of twitter_transaction_df: ")
    
    twitter_transaction_df.printSchema()
    
    twitter_detail_schema = StructType()\
        .add("screen_name", StringType())\
        .add("location", StringType())\
        .add("text", StringType())
    from pyspark.sql.functions import instr, split,col,when
    
    df = twitter_transaction_df.withColumn('separator_if_exists',(instr(col('value'),';') > 0) & instr(col('value'),';').isNotNull())
    df = df.withColumn('screen_name',when(col('separator_if_exists') == True,split(col('value'),';')[0]).otherwise(None))
    df = df.withColumn('location',when(col('separator_if_exists') == True,split(col('value'),';')[1]).otherwise(None))
    df = df.withColumn('text',when(col('separator_if_exists') == True,split(col('value'),';')[2]).otherwise(None)).drop('separator_if_exists')
    
    
    # To view data in Console docker 
    trans_detail_write_stream = df\
        .writeStream\
        .trigger(processingTime='10 seconds')\
        .outputMode("update")\
        .option("truncate", "false")\
        .format("console")\
        .start()
    
   #To Store data in Mysql 
    df\
    .writeStream\
    .trigger(processingTime='10 seconds')\
    .outputMode("update")\
    .foreachBatch(save_to_mysql)\
    .start()
  
    trans_detail_write_stream.awaitTermination()

    print("PySpark Structured Streaming with Kafka Demo Application Completed.")
    
    spark.stop()
    

Welcome to Spark Streaming
Data processing Started
 kafka Started ...
Printing Schema of kafka transaction_detail_df: 
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Printing Schema of twitter_transaction_df: 
root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Printing epoc_id: 
0
Writing to Mysql : 0


KeyboardInterrupt: 