In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import os

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,com.mysql:mysql-connector-j:9.1.0 pyspark-shell'


In [3]:
session = SparkSession \
    .builder \
    .appName('SparkDatamarts') \
    .master("spark://spark-master:7077") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/conda/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.mysql#mysql-connector-j added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d94f9ea2-30db-4d89-b683-eab997712482;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incuba

In [48]:
users = session.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "users") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .load() \
    .alias("users")

In [49]:
products = session.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "products") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .load() \
    .alias("products")

In [50]:
product_categories = session.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "product_categories") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .load() \
    .alias("product_categories")

In [44]:
orders = session.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "orders") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .load() \
    .alias("orders")

In [45]:
order_details = session.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "order_details") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .load() \
    .alias("order_details")

In [35]:
user_sales = orders \
    .join(users, orders.user_id == users.user_id, "inner") \
    .withColumn("order_month", f.date_trunc("MM", orders.order_date)) \
    .withColumn("delivery_period", f.date_diff("delivery_date", "order_date")) \
    .select(["order_month", "order_id", orders.user_id, "loyalty_status", "total_amount",
             "status", "delivery_period"]) \
    .groupBy(['order_month', 'loyalty_status']) \
    .agg(f.count("order_id").alias("orders_cnt"),
         f.count(f.when(f.col("status") == "Paid", f.col("order_id"))).alias("paid_orders"),
         f.count(f.when(f.col("status") == "Completed", f.col("order_id"))).alias("completed_orders"),
         f.count(f.when(f.col("status") == "Pending", f.col("order_id"))).alias("pending_orders"),
         f.count(f.when(f.col("status") == "Aborted", f.col("order_id"))).alias("aborted_orders"),
         f.countDistinct(orders.user_id).alias("unique_users"),
         f.sum("total_amount").alias("total_order_amount"),
         f.avg("total_amount").alias("avg_order_amount"),
         f.sum(f.when(f.col("status") != "Aborted", f.col("total_amount"))).alias("total_revenue"),
         f.avg("delivery_period").alias("avg_delivery_period")) \
    .orderBy(["order_month", "loyalty_status"])

In [36]:
user_sales.show()

+-------------------+--------------+----------+-----------+----------------+--------------+--------------+------------+------------------+----------------+-------------+-------------------+
|        order_month|loyalty_status|orders_cnt|paid_orders|completed_orders|pending_orders|aborted_orders|unique_users|total_order_amount|avg_order_amount|total_revenue|avg_delivery_period|
+-------------------+--------------+----------+-----------+----------------+--------------+--------------+------------+------------------+----------------+-------------+-------------------+
|2020-01-01 00:00:00|         Basic|         2|          0|               2|             0|             0|           2|             69.19|       34.595000|        69.19|                5.0|
|2020-01-01 00:00:00|          Gold|         1|          0|               0|             1|             0|           1|             46.60|       46.600000|        46.60|                5.0|
|2020-01-01 00:00:00|        Silver|         2|   

In [61]:
user_sales.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "user_sales") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .save()

                                                                                

In [58]:
product_sales = orders \
    .join(order_details, orders.order_id == order_details.order_id, "inner") \
    .join(products, order_details.product_id == products.product_id, "inner") \
    .join(product_categories, products.category_id == product_categories.category_id, "inner") \
    .withColumn("order_month", f.date_trunc("MM", orders.order_date)) \
    .select([f.col("orders.order_id").alias("order_id"), "order_month", "status",
             f.col("product_categories.name").alias("product_category"), f.col("products.name").alias("product_name"),
             f.col("order_details.total_amount").alias("total_amount"), "quantity"]) \
    .groupBy(["order_month", "product_category", "product_name"]) \
    .agg(f.countDistinct("order_id").alias("order_cnt"),
         f.sum("quantity").alias("ordered_quantity"),
         f.sum(f.when(f.col("status") == "Aborted", f.col("quantity"))).alias("aborted_quantity"),
         f.sum(f.when(f.col("status") == "Paid", f.col("quantity"))).alias("paid_quantity"),
         f.sum(f.when(f.col("status") == "Pending", f.col("quantity"))).alias("pending_quantity"),
         f.sum(f.when(f.col("status") == "Completed", f.col("quantity"))).alias("completed_quantity"),
         f.sum("total_amount").alias("total_amount"),
         f.sum(f.when(f.col("status") != "Aborted", f.col("total_amount"))).alias("total_revenue"),
        ) \
    .orderBy(["order_month", "product_category", "product_name"])

In [62]:
product_sales.show(20)

                                                                                

+-------------------+--------------------+--------------------+---------+----------------+----------------+-------------+----------------+------------------+------------+-------------+
|        order_month|    product_category|        product_name|order_cnt|ordered_quantity|aborted_quantity|paid_quantity|pending_quantity|completed_quantity|total_amount|total_revenue|
+-------------------+--------------------+--------------------+---------+----------------+----------------+-------------+----------------+------------------+------------+-------------+
|2020-01-01 00:00:00|      Action Figures| Licensed Rubber Car|        1|               1|            NULL|         NULL|            NULL|                 1|        1.61|         1.61|
|2020-01-01 00:00:00|      Action Figures|  Refined Cotton Car|        1|               2|            NULL|         NULL|            NULL|                 2|       64.74|        64.74|
|2020-01-01 00:00:00|  Food and Beverages|Practical Metal Soap|        1|  

In [63]:
product_sales.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:mysql://mysql:3306/mysql_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "product_sales") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .save()

                                                                                