In [1]:
import boto3
import os
from dotenv import load_dotenv
from datetime import timedelta, timezone

# Load the environment variables from the .env file
load_dotenv()

# Provide the access key from AWS SSM
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
session = boto3.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
ssm_client = session.client('ssm', region_name='ap-southeast-2')
s3_client = session.client('s3', region_name='ap-southeast-2')
s3_resource = session.resource('s3')
utc_plus_8 = timezone(timedelta(hours=8))

In [2]:
# # Run this block to download data for local development

# # Bucket and prefixes for each folder
# bucket_name = 'tug-dinlr'
# prefixes = [
#     'test/root',
#     'test/root_items',
#     'test/root_discounts',
#     'test/root_vouchers',
#     'test/root_payments',
#     'test/root_refunds',
#     'test/root_items_discounts',
#     'test/root_items_modifier_options',
#     'test/root_payments_payment_inputs',
#     'test/root_refunds_refund_payments'
# ]

# # Get data from S3 and save locally in data/test folder
# for prefix in prefixes:
#     exact_match_prefix = prefix + '/'
#     response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=exact_match_prefix)
#     for obj in response.get('Contents', []):
#         key = obj['Key']
#         # Ensure we only process the files in the exact folder
#         if key.endswith('.csv'):
#             file_name = key.split('/')[-1]
#             file_path = f'data/{prefix}.csv'
#             s3_client.download_file(bucket_name, key, file_path)
#             print(f"Downloaded {file_name} from {prefix}.")

In [3]:
import pyspark as ps
import sys
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col, explode, lit
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructField, StructType, \
ArrayType, MapType, BooleanType, LongType, DateType, TimestampType, DecimalType, BinaryType, \
ShortType, ByteType, FloatType 

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TUG-DINLR").getOrCreate()

# Use pyspark to read all csv files named "combined.csv" from data/test folders, and name them dyf_{folder_name}
dyf_root = spark.read.csv('data/test/root.csv', header=True, inferSchema=True)
dyf_root_items = spark.read.csv('data/test/root_items.csv', header=True, inferSchema=True)
dyf_root_discounts = spark.read.csv('data/test/root_discounts.csv', header=True, inferSchema=True)
dyf_root_vouchers = spark.read.csv('data/test/root_vouchers.csv', header=True, inferSchema=True)
dyf_root_payments = spark.read.csv('data/test/root_payments.csv', header=True, inferSchema=True)
dyf_root_refunds = spark.read.csv('data/test/root_refunds.csv', header=True, inferSchema=True)
dyf_root_items_discounts = spark.read.csv('data/test/root_items_discounts.csv', header=True, inferSchema=True)
dyf_root_items_modifier_options = spark.read.csv('data/test/root_items_modifier_options.csv', header=True, inferSchema=True)
dyf_root_payments_payment_inputs = spark.read.csv('data/test/root_payments_payment_inputs.csv', header=True, inferSchema=True)
dyf_root_refunds_refund_payments = spark.read.csv('data/test/root_refunds_refund_payments.csv', header=True, inferSchema=True)


24/06/11 03:38:39 WARN Utils: Your hostname, codespaces-5971cb resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/06/11 03:38:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/11 03:38:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [5]:
dyf_root_items.show()

+---+-----+--------------------+--------------------+--------------------+-----------------------+-------------+-------------------+---------------+--------------------+----------------------+---------------------------+--------------+--------------------------+-------------------+--------------------------+----------------------+------------------------------+
| id|index|        items.val.id|      items.val.item|      items.val.name|items.val.price_per.int|items.val.qty|items.val.price.int|items.val.notes|   items.val.variant|items.val.variant_name|items.val.variant_price.int|items.val.sort|items.val.modifier_options|items.val.discounts|items.val.price_per.double|items.val.price.double|items.val.variant_price.double|
+---+-----+--------------------+--------------------+--------------------+-----------------------+-------------+-------------------+---------------+--------------------+----------------------+---------------------------+--------------+--------------------------+----------

24/06/11 03:38:52 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
from pyspark.sql import DataFrame

def cleanColumnNames(df: DataFrame) -> DataFrame:
    """
    Rename all columns in a DataFrame by replacing periods with underscores.
    
    Args:
    df (DataFrame): Input DataFrame with columns to rename.
    
    Returns:
    DataFrame: DataFrame with renamed columns.
    """
    for col_name in df.columns:
        new_col_name = col_name.replace('.val.', '_')
        new_col_name = new_col_name.replace('.', '_')
        df = df.withColumnRenamed(col_name, new_col_name)
    return df

def createDimDate(df: DataFrame, date_col: str) -> DataFrame:
    """
    Create a dimension date DataFrame from a specified date column in the input DataFrame.
    
    Args:
    df (DataFrame): Input DataFrame with date column.
    date_col (str): Name of the date column to extract date components from.
    
    Returns:
    DataFrame: Dimension date DataFrame.
    """
    return df.withColumn("date_key", F.to_timestamp(F.col(date_col), "yyyy-MM-dd'T'HH:mm:ssXXX").cast(TimestampType())) \
             .withColumn("year", F.year(F.col(date_col))) \
             .withColumn("quarter", F.quarter(F.col(date_col))) \
             .withColumn("month", F.month(F.col(date_col))) \
             .withColumn("day", F.dayofmonth(F.col(date_col))) \
             .withColumn("day_of_week", F.dayofweek(F.col(date_col))) \
             .withColumn("week_of_year", F.weekofyear(F.col(date_col))) \
             .withColumn("hour", F.hour(F.col(date_col))) \
             .withColumn("minute", F.minute(F.col(date_col))) \
             .withColumn("second", F.second(F.col(date_col))) \
             .select("date_key", "year", "quarter", "month", "day", "day_of_week", "week_of_year", "hour", "minute", "second") \
             .distinct()

def pivotDiscounts(df: DataFrame) -> DataFrame:
    
    df = df.withColumnRenamed("discounts_discount", "discounts_id") \
        .withColumnRenamed("discounts_promotion", "promotion_id") \
        .withColumnRenamed("discounts_voucher", "voucher_id")

    # Add discounts_type field
    df = df.withColumn("discounts_type", 
        when(F.col("promotion_id").isNotNull(), "promotion")
        .when(F.col("voucher_id").isNotNull(), "voucher")
        .otherwise("discount"))

    # Replace nulls with values from promotion_id or voucher_id
    df = df.withColumn("discounts_id", 
        when(F.col("discounts_id").isNull(), F.col("promotion_id"))
        .when(F.col("discounts_id").isNull(), F.col("voucher_id"))
        .otherwise(F.col("discounts_id")))

    # Drop intermediate columns
    df = df.drop("promotion_id", "voucher_id")
    
    df = df.withColumn("discounts_amount", when(F.col("discounts_amount_int").isNotNull(), F.col("discounts_amount_int").cast(DoubleType()))
                                     .otherwise(F.col("discounts_amount_double")))
    
    # Drop the intermediate columns used for resolving choices
    df = df.drop("discounts_amount_double", "discounts_amount_int")
    
    return df

In [7]:
dyf_root = cleanColumnNames(dyf_root)
dyf_root_items = cleanColumnNames(dyf_root_items)
dyf_root_discounts = cleanColumnNames(dyf_root_discounts)
dyf_root_vouchers = cleanColumnNames(dyf_root_vouchers)
dyf_root_payments = cleanColumnNames(dyf_root_payments)
dyf_root_refunds = cleanColumnNames(dyf_root_refunds)
dyf_root_items_discounts = cleanColumnNames(dyf_root_items_discounts)
dyf_root_items_modifier_options = cleanColumnNames(dyf_root_items_modifier_options)
dyf_root_payments_payment_inputs = cleanColumnNames(dyf_root_payments_payment_inputs)
dyf_root_refunds_refund_payments = cleanColumnNames(dyf_root_refunds_refund_payments)

dyf_root_items = dyf_root_items.filter(
    (col("items_name").isNotNull()) & (col("items_name") != "") &
    (col("items_variant_name").isNotNull()) & (col("items_variant_name") != "") &
    (col("items_variant").isNotNull()) & (col("items_variant") != "")
)

# Resolve choices for price_per, price, and variant_price columns
dyf_root_items = dyf_root_items.withColumn("items_price_per", F.when(F.col("items_price_per_int").isNotNull(), F.col("items_price_per_int").cast(DoubleType()))
                                     .otherwise(F.col("items_price_per_double"))) \
                        .withColumn("items_price", F.when(F.col("items_price_int").isNotNull(), F.col("items_price_int").cast(DoubleType()))
                                     .otherwise(F.col("items_price_double"))) \
                        .withColumn("items_variant_price", F.when(F.col("items_variant_price_int").isNotNull(), F.col("items_variant_price_int").cast(DoubleType()))
                                          .otherwise(F.col("items_variant_price_double")))

# Rename "id" column in dyf_root_items to "join_id"
dyf_root_items = dyf_root_items.withColumnRenamed("id", "root_items_id")
dyf_root_discounts = dyf_root_discounts.withColumnRenamed("id", "root_discounts_id")
dyf_root_vouchers = dyf_root_vouchers.withColumnRenamed("id", "root_vouchers_id")
dyf_root_payments = dyf_root_payments.withColumnRenamed("id", "root_payments_id")
dyf_root_refunds = dyf_root_refunds.withColumnRenamed("id", "root_refunds_id")
dyf_root_items_discounts = dyf_root_items_discounts.withColumnRenamed("id", "root_items_discounts_id")
dyf_root_items_modifier_options = dyf_root_items_modifier_options.withColumnRenamed("id", "root_items_modifer_options_id")
dyf_root_payments_payment_inputs = dyf_root_payments_payment_inputs.withColumnRenamed("id", "root_payments_payment_inputs_id")
dyf_root_refunds_refund_payments = dyf_root_refunds_refund_payments.withColumnRenamed("id", "root_refunds_refund_payments_id")

# Drop the intermediate columns used for resolving choices
dyf_root_items = dyf_root_items.drop("items_price_per_int", "items_price_per_double", "items_price_int", "items_price_double", "items_variant_price_int", "items_variant_price_double", "items_notes")
dyf_root_items.show()

+-------------+-----+--------------------+--------------------+--------------------+---------+--------------------+------------------+----------+----------------------+---------------+---------------+-----------+-------------------+
|root_items_id|index|            items_id|          items_item|          items_name|items_qty|       items_variant|items_variant_name|items_sort|items_modifier_options|items_discounts|items_price_per|items_price|items_variant_price|
+-------------+-----+--------------------+--------------------+--------------------+---------+--------------------+------------------+----------+----------------------+---------------+---------------+-----------+-------------------+
|            1|    0|6EA3532D-E162-456...|480548f7-459e-49d...|   Oatside Genmaicha|        1|e46ce3f5-e25e-424...|     Regular Price|         0|                     1|              1|           12.0|       12.0|               11.0|
|            2|    0|A3F2ECAB-10F6-46D...|d6d90e1f-c097-412...|Choco

In [8]:
# items_fact = dyf_root_items.join(dyf_root_items_discounts, dyf_root_items["items_discounts"] == dyf_root_items_discounts["id"], "left") \
#                             .join(dyf_root_items_modifier_options, dyf_root_items["items_modifier_options"] == dyf_root_items_modifier_options["id"], "left") \
#                             .join(dyf_root, dyf_root_items["id"] == dyf_root["items"], "right")

items_fact = dyf_root_items.join(dyf_root, dyf_root_items["root_items_id"] == dyf_root["items"], "right")

In [9]:
items_fact.show()

24/06/11 03:38:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------------+-----+--------------------+--------------------+--------------------+---------+--------------------+------------------+----------+----------------------+---------------+---------------+-----------+-------------------+--------+----------+--------------------+------------------+-----+-----------+-------------------+-------------------+-----+---------+--------+--------+-------+--------+---+-----+
|root_items_id|index|            items_id|          items_item|          items_name|items_qty|       items_variant|items_variant_name|items_sort|items_modifier_options|items_discounts|items_price_per|items_price|items_variant_price|customer|  order_no|       dining_option|dining_option_name|notes|void_reason|         updated_at|         created_at|items|discounts|vouchers|payments|refunds|location|pax|total|
+-------------+-----+--------------------+--------------------+--------------------+---------+--------------------+------------------+----------+----------------------+--------

In [10]:
from pyspark.sql.functions import first, last

# from pyspark.sql.window import Window
# items_fact = items_fact.withColumn("items_price_min_date", F.min("created_at").over(Window.partitionBy("items_name", "items_variant_name", "items_variant"))) \
#                        .withColumn("items_price_max_date", F.max("created_at").over(Window.partitionBy("items_name", "items_variant_name", "items_variant")))


first_occurrence = items_fact.groupBy("items_variant", "items_name", "items_variant_name", "items_variant_price").agg(first("created_at").alias("first_occurrence"))
last_occurrence = items_fact.groupBy("items_variant", "items_name", "items_variant_name", "items_variant_price").agg(last("created_at").alias("last_occurrence"))

first_occurrence = first_occurrence.orderBy("items_name")
last_occurrence = last_occurrence.orderBy("items_name")

first_occurrence.show(n=500, truncate=False)
last_occurrence.show(n=500, truncate=False)



                                                                                

+------------------------------------+-----------------------------------------------------+-----------------------+-------------------+-------------------+
|items_variant                       |items_name                                           |items_variant_name     |items_variant_price|first_occurrence   |
+------------------------------------+-----------------------------------------------------+-----------------------+-------------------+-------------------+
|NULL                                |NULL                                                 |NULL                   |NULL               |2023-10-24 10:47:27|
|5b2b6da2-ae1b-4615-b543-054764390baa|100% Chocolate Sorbet                                |Regular Price          |38.0               |2024-05-22 11:12:03|
|09ea54f3-21ad-485c-a774-0b332b9763ff|100% Chocolate Sorbet                                |Regular Price          |15.0               |2024-05-22 03:38:22|
|389ae33c-d32e-4620-a6f8-6547ce1387df|100% Chocolate Sorbe

In [11]:
# Select the min and max date for each item and variant combination and show the result
items_fact.select("items_name", "items_variant_name", "items_variant", "items_variant_price").distinct().orderBy("items_name").show(n=500, truncate=False)

+-----------------------------------------------------+-----------------------+------------------------------------+-------------------+
|items_name                                           |items_variant_name     |items_variant                       |items_variant_price|
+-----------------------------------------------------+-----------------------+------------------------------------+-------------------+
|NULL                                                 |NULL                   |NULL                                |NULL               |
|100% Chocolate Sorbet                                |Regular Price          |389ae33c-d32e-4620-a6f8-6547ce1387df|15.0               |
|100% Chocolate Sorbet                                |Regular Price          |5b2b6da2-ae1b-4615-b543-054764390baa|38.0               |
|100% Chocolate Sorbet                                |Regular Price          |09ea54f3-21ad-485c-a774-0b332b9763ff|15.0               |
|72% Dark Chocolate                      

In [12]:
# select `items_name` = 'Avocado Pistachio' and `created_at` is 2023-10-20 12:53:06
items_fact.filter((col("items_name") == "Avocado Pistachio") & (col("items_variant") == 'ba029a57-6a5a-421b-8087-846ff72b332b')).show()

+-------------+-----+--------------------+--------------------+-----------------+---------+--------------------+------------------+----------+----------------------+---------------+---------------+-----------+-------------------+--------+----------+--------------------+------------------+-----+-----------+-------------------+-------------------+-----+---------+--------+--------+-------+--------+---+-----+
|root_items_id|index|            items_id|          items_item|       items_name|items_qty|       items_variant|items_variant_name|items_sort|items_modifier_options|items_discounts|items_price_per|items_price|items_variant_price|customer|  order_no|       dining_option|dining_option_name|notes|void_reason|         updated_at|         created_at|items|discounts|vouchers|payments|refunds|location|pax|total|
+-------------+-----+--------------------+--------------------+-----------------+---------+--------------------+------------------+----------+----------------------+---------------+-

In [13]:
# Resolve columns for items_discounts_amount
dyf_root_items_discounts = dyf_root_items_discounts.withColumn("items_discounts_amount", when(F.col("items_discounts_amount_int").isNotNull(), F.col("items_discounts_amount_int").cast(DoubleType()))
                                     .otherwise(F.col("items_discounts_amount_double")))
dyf_root_items_discounts = dyf_root_items_discounts.drop("items_discounts_amount_int", "items_discounts_amount_double")

# Rename "index" column to "items_index"
dyf_root_items_discounts = dyf_root_items_discounts.withColumnRenamed("index", "items_index")

# Consolidate columns "items_discounts_discount" and "items_discounts_promotion" into "discounts_id" in dyf_root_items_discounts
dyf_root_items_discounts = dyf_root_items_discounts.withColumn("items_discounts_id", 
        when(F.col("items_discounts_discount").isNull(), F.col("items_discounts_promotion"))
        .otherwise(F.col("items_discounts_discount")))

# Drop intermediate columns
dyf_root_items_discounts = dyf_root_items_discounts.drop("items_discounts_discount", "items_discounts_promotion")
dyf_root_discounts = pivotDiscounts(dyf_root_discounts)
dyf_root_discounts = dyf_root_discounts.drop("discounts_type")
dyf_root_items_discounts.show(5)

+-----------------------+--------------------+-----------+----------------------+------------------+
|root_items_discounts_id|items_discounts_name|items_index|items_discounts_amount|items_discounts_id|
+-----------------------+--------------------+-----------+----------------------+------------------+
|                      1|                NULL|       NULL|                  NULL|              NULL|
|                      2|                NULL|       NULL|                  NULL|              NULL|
|                      3|                NULL|       NULL|                  NULL|              NULL|
|                      4|                NULL|       NULL|                  NULL|              NULL|
|                      5|                NULL|       NULL|                  NULL|              NULL|
+-----------------------+--------------------+-----------+----------------------+------------------+
only showing top 5 rows



In [14]:
# Create a new column "discount_level" in dyf_root_items_discounts and fill with value = "item"
dyf_root_items_discounts = dyf_root_items_discounts.withColumn("items_discount_level", lit("item"))
dyf_root_discounts = dyf_root_discounts.withColumn("discount_level", lit("root"))
dyf_root_items_discounts.show()
dyf_root_discounts.show()

+-----------------------+--------------------+-----------+----------------------+------------------+--------------------+
|root_items_discounts_id|items_discounts_name|items_index|items_discounts_amount|items_discounts_id|items_discount_level|
+-----------------------+--------------------+-----------+----------------------+------------------+--------------------+
|                      1|                NULL|       NULL|                  NULL|              NULL|                item|
|                      2|                NULL|       NULL|                  NULL|              NULL|                item|
|                      3|                NULL|       NULL|                  NULL|              NULL|                item|
|                      4|                NULL|       NULL|                  NULL|              NULL|                item|
|                      5|                NULL|       NULL|                  NULL|              NULL|                item|
|                      6

In [15]:
dyf_root_items.show(5)
dyf_root_items_discounts.show(5)

+-------------+-----+--------------------+--------------------+--------------------+---------+--------------------+------------------+----------+----------------------+---------------+---------------+-----------+-------------------+
|root_items_id|index|            items_id|          items_item|          items_name|items_qty|       items_variant|items_variant_name|items_sort|items_modifier_options|items_discounts|items_price_per|items_price|items_variant_price|
+-------------+-----+--------------------+--------------------+--------------------+---------+--------------------+------------------+----------+----------------------+---------------+---------------+-----------+-------------------+
|            1|    0|6EA3532D-E162-456...|480548f7-459e-49d...|   Oatside Genmaicha|        1|e46ce3f5-e25e-424...|     Regular Price|         0|                     1|              1|           12.0|       12.0|               11.0|
|            2|    0|A3F2ECAB-10F6-46D...|d6d90e1f-c097-412...|Choco

In [19]:
fact_discounts = dyf_root.alias('d').join(dyf_root_discounts, dyf_root["discounts"] == dyf_root_discounts["discounts_id"], "left") \
    .select('d.order_no', 'd.created_at', 'd.items', 'd.total', 'discounts_id', 'discounts_name', 'index', 'discounts_amount', 'discount_level')

fact_discounts = fact_discounts.alias('d').join(dyf_root_items, fact_discounts["items"] == dyf_root_items["root_items_id"]) \
                .select('d.*', 'items_id', 'items_discounts')

fact_discounts.orderBy('created_at').show()

fact_discounts = fact_discounts.join(dyf_root_items_discounts, fact_discounts["items_discounts"] == dyf_root_items_discounts["root_items_discounts_id"], "left") \
    .select('order_no', 'created_at', 'total', 'items_id', 'discounts_id', 'items_discounts_id', 'discounts_name', 'items_discounts_name',
            'index', 'items_index', 'discounts_amount', 'items_discounts_amount',
             'discount_level', 'items_discount_level')
fact_discounts.orderBy('created_at').show()

+----------+-------------------+-----+-----+------------+--------------+-----+----------------+--------------+--------------------+---------------+
|  order_no|         created_at|items|total|discounts_id|discounts_name|index|discounts_amount|discount_level|            items_id|items_discounts|
+----------+-------------------+-----+-----+------------+--------------+-----+----------------+--------------+--------------------+---------------+
|230915P9WJ|2023-09-15 07:13:14| 7447| 18.0|        NULL|          NULL| NULL|            NULL|          NULL|46F01D33-B126-4BE...|          12645|
|230915P9WJ|2023-09-15 07:13:14| 7447| 18.0|        NULL|          NULL| NULL|            NULL|          NULL|5EA51A4C-C361-476...|          12644|
|2309159UE8|2023-09-15 07:15:41| 7448| 19.0|        NULL|          NULL| NULL|            NULL|          NULL|2D97BBA4-A1FD-4F5...|          12646|
|230915579N|2023-09-15 07:16:16| 7449| 22.0|        NULL|          NULL| NULL|            NULL|          NULL|1C

In [20]:
fact_discounts = fact_discounts.withColumn("discount_amount", when(F.col("discounts_amount").isNotNull(), F.col("discounts_amount").cast(DoubleType())) \
                                           .when(F.col("items_discounts_amount").isNotNull(), F.col("items_discounts_amount").cast(DoubleType()))
                                            .otherwise(None)) \
                                        .withColumn("discount_name", when(F.col("discounts_name").isNotNull(), F.col("discounts_name")) \
                                            .when(F.col("items_discounts_name").isNotNull(), F.col("items_discounts_name")) \
                                            .otherwise(None)) \
                                        .withColumn("index", when(F.col("items_index").isNotNull(), F.col("items_index").cast(IntegerType())) \
                                            .when(F.col("index").isNotNull(), F.col("index").cast(IntegerType())) \
                                            .otherwise(F.col("index"))) \
                                        .withColumn("discount_level", when(F.col("discount_level").isNotNull() & F.col("discount_amount").isNotNull(), F.col("discount_level")) \
                                            .when(F.col("items_discount_level").isNotNull() & F.col("discount_amount").isNotNull(), F.col("items_discount_level")) \
                                            .otherwise(F.col("discount_amount"))) \
                                        .withColumn("discounts_id", when(F.col("discounts_id").isNotNull(), F.col("discounts_id")) \
                                            .when(F.col("items_discounts_id").isNotNull(), F.col("items_discounts_id")) \
                                            .otherwise(None)) \
                                        .withColumn("items_id", when(F.col("discount_level") == "item", F.col("items_id")) \
                                            .otherwise(None))

In [22]:
fact_discounts = fact_discounts.drop("discounts_amount", "discounts_name", "items_discounts_amount", "items_discounts_name", "items_index", 
                    "items_discount_level", "items_discounts_id", "root_discounts_id")

# fact_discounts drop rows with null values in discount_amount and order by created_at
fact_discounts = fact_discounts.filter(col("discount_amount").isNotNull()).dropDuplicates().orderBy('created_at')
fact_discounts.show()

+----------+-------------------+-----+--------------------+--------------------+-----+--------------+---------------+-----------------+
|  order_no|         created_at|total|            items_id|        discounts_id|index|discount_level|discount_amount|    discount_name|
+----------+-------------------+-----+--------------------+--------------------+-----+--------------+---------------+-----------------+
|230915J5LX|2023-09-15 07:20:32| 18.0|6D1636DA-D599-48F...|3c9fc489-e7a4-4bb...|    0|          item|            3.0|           DOUBLE|
|230915SIGE|2023-09-15 07:23:10| 48.8|DF90061D-0B01-448...|80389f24-b26c-468...|    0|          item|            2.2|Manual Discount %|
|230915SIGE|2023-09-15 07:23:10| 48.8|0F3C6FED-FEEB-4B7...|3c9fc489-e7a4-4bb...|    0|          item|            3.0|           DOUBLE|
|230915P8RT|2023-09-15 07:25:00| 17.0|686BD60B-F2B4-46B...|3c9fc489-e7a4-4bb...|    0|          item|            3.0|           DOUBLE|
|230915VQ43|2023-09-15 07:28:52| 62.0|512C3A20-D