## Init Spark Session

In [1]:
import os

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Check if the JDBC driver exists
jdbc_driver_path = "/home/user/work/jars/postgresql-42.7.3.jar"
if not os.path.isfile(jdbc_driver_path):
    raise FileNotFoundError(f"The JDBC driver was not found at the specified path: {jdbc_driver_path}")

# Initialize the Spark session
spark = SparkSession.builder \
    .appName("PostgreSQL to PySpark") \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

# PostgreSQL connection properties
jdbc_url = "jdbc:postgresql://postgres-staging:5432/staging_db"
connection_properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

24/06/22 18:36:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/22 18:37:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/06/22 18:37:00 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/06/22 18:37:00 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


## Supplier Monthly Revenue

In [2]:
# Load necessary tables from JDBC source
order_details = spark.read.jdbc(url=jdbc_url, table="order_details", properties=connection_properties)
orders = spark.read.jdbc(url=jdbc_url, table="orders", properties=connection_properties)
products = spark.read.jdbc(url=jdbc_url, table="products", properties=connection_properties)
suppliers = spark.read.jdbc(url=jdbc_url, table="suppliers", properties=connection_properties)

# Perform the equivalent operations in PySpark
order_details = order_details.alias("od")
orders = orders.alias("o")
products = products.alias("p")
suppliers = suppliers.alias("s")

# Join to get order_details
order_details_join = orders.join(order_details, orders["orderID"] == order_details["orderID"], "inner")

# Join to get product_revenue
product_revenue = order_details_join.join(products, order_details_join["productID"] == products["productID"], "inner") \
    .select(
        F.date_trunc("month", order_details_join["orderDate"]).alias("orderMonth"),
        products["supplierID"],
        order_details_join["productID"],
        order_details_join["unitPrice"],
        order_details_join["quantity"],
        order_details_join["discount"],
        ((order_details_join["unitPrice"] - (order_details_join["unitPrice"] * order_details_join["discount"])) * order_details_join["quantity"]).alias("gross_revenue")
    )

# Aggregate to get supplier_monthly_revenue
supplier_monthly_revenue = product_revenue.groupBy(
    F.date_format(F.date_trunc("month", product_revenue["orderMonth"]), "yyyy-MM-dd").alias("month"),
    product_revenue["supplierID"]
).agg(
    F.sum(product_revenue["gross_revenue"]).alias("total_gross_revenue")
)

# Join with suppliers to get final result
final_result = supplier_monthly_revenue.join(
    suppliers, supplier_monthly_revenue["supplierID"] == suppliers["supplierID"], "inner"
).select(
    supplier_monthly_revenue["month"],
    suppliers["supplierID"],
    suppliers["companyName"].alias("supplierName"),
    supplier_monthly_revenue["total_gross_revenue"]
)

# Show or save the final result as needed
final_result.orderBy(F.asc("month"), F.desc("total_gross_revenue")).show(50)




+----------+----------+--------------------+-------------------+
|     month|supplierID|        supplierName|total_gross_revenue|
+----------+----------+--------------------+-------------------+
|1996-07-01|        28|        Gai pâturage|        5026.000000|
|1996-07-01|        24|          G'day Mate|        3350.400000|
|1996-07-01|         8|Specialty Biscuit...|        2702.400000|
|1996-07-01|         7|        Pavlova Ltd.|        2477.000000|
|1996-07-01|         2|New Orleans Cajun...|        1597.400000|
|1996-07-01|         1|      Exotic Liquids|        1444.000000|
|1996-07-01|        12|Plutzer Lebensmit...|        1270.800000|
|1996-07-01|        14|Formaggi Fortini ...|        1136.280000|
|1996-07-01|        19|New England Seafo...|        1068.025000|
|1996-07-01|        26|Pasta Buttini s.r.l.|        1063.100000|
|1996-07-01|        13|Nord-Ost-Fisch Ha...|         931.500000|
|1996-07-01|        11|Heli Süßwaren Gmb...|         877.500000|
|1996-07-01|        23|  

                                                                                