### Read from Website_stats and publish to Kafka

In [1]:
import mariadb
import datetime
from pyspark.sql import SparkSession

#Connect to website_stats database
summary_conn = mariadb.connect(
                user="spark",
                password="spark",
                host="127.0.0.1",
                port=3306,
                database="website_stats",
                autocommit=True
            )
summary_cursor = summary_conn.cursor()

#This assumes that the pipeline is executed on the same day as when
#the website_stats db is populated
start_date=datetime.datetime.today().strftime("%Y-%m-%d")
end_date=(datetime.datetime.today()+datetime.timedelta(1)).strftime("%Y-%m-%d")
print("Querying bounds for date ranges :", start_date,end_date)


#find min and max bounds for the parallel DB query
summary_cursor.execute(f"""
        SELECT min(`ID`) as MIN_ID, max(`ID`) as MAX_ID 
        FROM `website_stats`.`visit_stats`
        WHERE `INTERVAL_TIMESTAMP` BETWEEN 
            '{start_date}' AND '{end_date}'
    """)

min_bounds=0
max_bounds=0
for min_id, max_id in summary_cursor:
    min_bounds=min_id
    max_bounds=max_id
print("Query bounds are : ", min_id, max_id)

#Get all last_actions where duration > 15 seconds
last_action_query= f"""
            SELECT ID, LAST_ACTION, DURATION
            FROM `website_stats`.`visit_stats`
             WHERE `INTERVAL_TIMESTAMP` BETWEEN 
                '{start_date}' AND '{end_date}'     
            AND DURATION > 15
        """

#create spark session
website_spark = SparkSession\
            .builder\
            .appName("LongLastActionsJob")\
            .config("spark.sql.shuffle.partitions", 2)\
            .config("spark.default.parallelism", 2)\
            .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)\
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version","2")\
            .config("spark.jars", "jars/mysql-connector-j-8.4.0.jar," +\
                                    "jars/commons-pool2-2.12.0.jar," +\
                                    "jars/kafka-clients-3.6.0.jar," + \
                                    "jars/spark-sql-kafka-0-10_2.12-3.5.1.jar," +\
                                    "jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar," +\
                                    "jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar") \
            .config("spark.driver.extraClassPath","jars/*") \
            .master("local[2]")\
            .getOrCreate()

#Read long last actions with distributed processing
last_action_df = website_spark.read\
            .format("jdbc")\
            .option("url", "jdbc:mysql://localhost:3306/website_stats")\
            .option("dbtable", "( " + last_action_query + " ) as tmpLastAction")\
            .option("user", "spark")\
            .option("password", "spark")\
            .option("partitionColumn","ID")\
            .option("lowerBound", min_bounds)\
            .option("upperBound",max_bounds + 1)\
            .option("numPartitions",2)\
            .load()
    
last_action_df.show(10)

#Send the last actions to the Kafka topic
last_action_df.selectExpr("LAST_ACTION as key", "LAST_ACTION as value")\
        .write\
        .format("kafka")\
        .option("checkpointLocation", "/tmp/cp-lastaction")\
        .option("kafka.bootstrap.servers", "localhost:9092")\
        .option("topic", "spark.exercise.lastaction.long")\
        .save();


Querying bounds for date ranges : 2025-04-03 2025-04-04
Query bounds are :  1 38


25/04/03 21:06:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 21:06:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/03 21:06:48 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


+---+------------+--------+
| ID| LAST_ACTION|DURATION|
+---+------------+--------+
|  3|ShoppingCart|      17|
|  5|ShoppingCart|      20|
|  6|         FAQ|      22|
|  7|ShoppingCart|      19|
| 12|         FAQ|      26|
| 21|ShoppingCart|      16|
| 23|     Catalog|      20|
| 28|ShoppingCart|      19|
| 30|       Order|      23|
| 32|       Order|      16|
+---+------------+--------+
only showing top 10 rows

