In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, row_number, coalesce, when, first, monotonically_increasing_id,current_timestamp
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("DWH") \
    .config("spark.driver.extraClassPath", "/home/jovyan/postgresql-42.7.4.jar") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
properties = {
    "user": "postgres",
    "password": "12345",
    "driver": "org.postgresql.Driver"
}

In [None]:
wide = spark.read.jdbc(url=jdbc_url, table="source1.craft_market_wide", properties=properties)
masters_products = spark.read.jdbc(url=jdbc_url, table="source2.craft_market_masters_products", properties=properties)
orders_customers = spark.read.jdbc(url=jdbc_url, table="source2.craft_market_orders_customers", properties=properties)
df2 = orders_customers.join(masters_products, on="craftsman_id", how="inner") 
craftsmans = spark.read.jdbc(url=jdbc_url, table="source3.craft_market_craftsmans", properties=properties)
customers = spark.read.jdbc(url=jdbc_url, table="source3.craft_market_customers", properties=properties)
orders = spark.read.jdbc(url=jdbc_url, table="source3.craft_market_orders", properties=properties)
df3 = orders \
    .join(craftsmans, on="craftsman_id", how="inner") \
    .join(customers, on="customer_id", how="inner")

columns = ['craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email',
           'product_name', 'product_description', 'product_type', 'product_price',
           'order_created_date', 'order_completion_date', 'order_status',
           'customer_name', 'customer_address', 'customer_birthday', 'customer_email']
df = [wide, df2, df3]
df = [df.select([col(c) if c in df.columns else lit(None).alias(c) for c in columns]) for df in dfs]


final_df = df[0].union(df[1]).union(df[2])

window_craftsman = Window.orderBy("craftsman_name", "craftsman_birthday")
dwh_craftsmans = final_df.select("craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email") \
    .dropDuplicates(["craftsman_name", "craftsman_birthday"]) \
    .withColumn("craftsman_id", row_number().over(window_craftsman))

window_customer = Window.orderBy("customer_name", "customer_birthday")
dwh_customers = final_df.select("customer_name", "customer_address", "customer_birthday", "customer_email") \
    .dropDuplicates(["customer_name", "customer_birthday"]) \
    .withColumn("customer_id", row_number().over(window_customer))

window_product = Window.orderBy("product_name", "product_price")
dwh_products = final_df.select("product_name", "product_description", "product_type", "product_price") \
    .dropDuplicates(["product_name", "product_price"]) \
    .withColumn("product_id", row_number().over(window_product))

dwh_orders = final_df.join(d_products, ["product_name", "product_price"], "left") \
    .join(d_craftsmans, ["craftsman_name", "craftsman_birthday"], "left") \
    .join(d_customers, ["customer_name", "customer_birthday"], "left") \
    .select(
        row_number().over(Window.orderBy("product_id", "craftsman_id", "customer_id")).alias("order_id"),
        "product_id",
        "craftsman_id",
        "customer_id",
        "order_created_date",
        "order_completion_date",
        "order_status"
    )

dwh_craftsmans = d_craftsmans.withColumn("load_dttm", current_timestamp())
dwh_customers = d_customers.withColumn("load_dttm", current_timestamp())
dwh_products = d_products.withColumn("load_dttm", current_timestamp())
dwh_orders = f_orders.withColumn("load_dttm", current_timestamp())

dwh_craftsmans.write.jdbc(url=jdbc_url, table="dwh.d_craftsmans", mode="append", properties=properties)
dwh_customers.write.jdbc(url=jdbc_url, table="dwh.d_customers", mode="append", properties=properties)
dwh_products.write.jdbc(url=jdbc_url, table="dwh.d_products", mode="append", properties=properties)
dwh_orders.write.jdbc(url=jdbc_url, table="dwh.f_orders", mode="append", properties=properties)

In [None]:
last_load_date_df = spark.read.jdbc(url=jdbc_url, table="dwh.datamart", properties=properties)

last_load_date = last_load_date_df.agg(spark_max("load_dttm")).collect()[0][0]
if last_load_date is None:
    filtered_customers_df = d_customers
    filtered_products_df = d_products
    filtered_craftsmans_df = d_craftsmans
    filtered_orders_df = f_orders
else:
    filtered_customers_df = customers_df.filter(col("load_dttm") > last_load_date)
    filtered_products_df = products_df.filter(col("load_dttm") > last_load_date)
    filtered_craftsmans_df = craftsmans_df.filter(col("load_dttm") > last_load_date)
    filtered_orders_df = orders_df.filter(col("load_dttm") > last_load_date)
    
joined_df = filtered_orders_df.join(filtered_products_df, "product_id") \
    .join(filtered_craftsmans_df, "craftsman_id") \
    .join(filtered_customers_df, "customer_id")

joined_df = joined_df.withColumn("report_period", F.date_format(col("order_created_date"), "yyyy-MM"))

window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy("order_completion_date")
joined_df = joined_df.withColumn("median_time_order_completed",
                                 datediff(col("order_completion_date"), col("order_created_date")))

joined_df = joined_df.withColumn("customer_age",
                                 F.datediff(F.current_date(), col("customer_birthday")) / 365.25)

agg_df = joined_df.groupBy("craftsman_id", "craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email", "report_period") \
    .agg(
        spark_sum(col("product_price") * 0.9).alias("craftsman_money"),
        spark_sum(col("product_price") * 0.1).alias("platform_money"),
        count("order_id").alias("count_order"),
        avg(col("product_price")).alias("avg_price_order"),
        avg(col("customer_age")).alias("avg_age_customer"),
        expr("percentile_approx(median_time_order_completed, 0.5)").alias("median_time_order_completed"),
        count(when(col("order_status") == "created", True)).alias("count_order_created"),
        count(when(col("order_status") == "in progress", True)).alias("count_order_in_progress"),
        count(when(col("order_status") == "delivery", True)).alias("count_order_delivery"),
        count(when(col("order_status") == "done", True)).alias("count_order_done"),
        count(when(col("order_status").isin("created", "in progress", "delivery"), True)).alias("count_order_not_done")
    )

product_category_count_df = joined_df.groupBy("craftsman_id", "report_period", "product_type") \
    .agg(count("product_id").alias("product_count"))

window_spec_category = Window.partitionBy("craftsman_id", "report_period").orderBy(col("product_count").desc())
top_product_category_df = product_category_count_df.withColumn("rank", F.row_number().over(window_spec_category)) \
    .filter(col("rank") == 1) \
    .select("craftsman_id", "report_period", "product_type") \
    .withColumnRenamed("product_type", "top_product_category")

final_df = agg_df.join(top_product_category_df, ["craftsman_id", "report_period"], "left")
final_df.write.jdbc(url=jdbc_url, table="dwh.datamart", mode="append", properties=properties)

current_load_date = F.current_date()
spark.read.jdbc(url=jdbc_url, table="dwh.datamart", properties=properties) \
    .createOrReplaceTempView("datamart")

max_id_df = spark.sql("SELECT MAX(id) AS max_id FROM datamart")
max_id = max_id_df.collect()[0][0]

next_id = 1 if max_id is None else max_id + 1

spark.sql(f"INSERT INTO datamart (id, load_dttm) VALUES ({next_id}, CURRENT_DATE)")