CREATED FACT ORDERS

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (
    SparkSession.builder
    .appName("S3Test")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .getOrCreate()
)



In [2]:
orders = (
    spark.read.parquet("s3a://pedro-datalake-project/silver/orders/")
    .drop("audit_timestamp")
)

order_items = (
    spark.read.parquet("s3a://pedro-datalake-project/silver/items/")
    .drop("audit_timestamp")
)

payments = (
    spark.read.parquet("s3a://pedro-datalake-project/silver/payments/")
    .drop("audit_timestamp")
)

sellers = (
    spark.read.parquet("s3a://pedro-datalake-project/silver/sellers/")
    .drop("audit_timestamp")
)


In [3]:
customers_dim = spark.read.parquet(
    "s3a://pedro-datalake-project/gold/dim_customer/"
)


In [4]:
payments_agg = (
    payments
    .groupBy("order_id")
    .agg(
        F.sum("payment_value").alias("total_paid"),
        F.first("payment_type").alias("payment_type"),
        F.first("payment_installments").alias("payment_installments")
    )
)
payments_agg.show(10)

+--------------------+----------+------------+--------------------+
|            order_id|total_paid|payment_type|payment_installments|
+--------------------+----------+------------+--------------------+
|00018f77f2f0320c5...|    259.83| credit_card|                   3|
|00042b26cf59d7ce6...|    218.04| credit_card|                   3|
|00054e8431b9d7675...|     31.75| credit_card|                   1|
|0006ec9db01a64e59...|     97.32| credit_card|                   4|
|000aed2e25dbad2f9...|    152.77| credit_card|                   1|
|000f25f4d72195062...|    164.39| credit_card|                   1|
|00143d0f86d6fbd9f...|    109.29| credit_card|                   1|
|0014ae671de39511f...|      30.6|      boleto|                   1|
|0015ebb40fb17286b...|      37.0| credit_card|                   1|
|001ac194d4a326a6f...|     62.54|      boleto|                   1|
+--------------------+----------+------------+--------------------+
only showing top 10 rows



In [5]:
fact_orders = (
    orders.alias("o")
    .join(order_items.alias("i"), "order_id", "left")
    .join(payments_agg.alias("p"), "order_id", "left")
)
fact_orders.show(10)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----------+------------+--------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|total_paid|payment_type|payment_installments|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----------+------------+--------------------+
|

In [6]:
fact_orders = fact_orders.join(
    sellers.alias("s"),
    "seller_id",
    "left"
)
fact_orders.show(10)

+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+-------------------+------+-------------+----------+------------+--------------------+----------------------+--------------------+------------+
|           seller_id|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|shipping_limit_date| price|freight_value|total_paid|payment_type|payment_installments|seller_zip_code_prefix|         seller_city|seller_state|
+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+---------

In [7]:
fact_orders = fact_orders.join(
    customers_dim.select("customer_id", "customer_sk"),
    "customer_id",
    "left"
)
fact_orders.show(10)

+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+-------------------+------+-------------+----------+------------+--------------------+----------------------+--------------------+------------+-----------+
|         customer_id|           seller_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|shipping_limit_date| price|freight_value|total_paid|payment_type|payment_installments|seller_zip_code_prefix|         seller_city|seller_state|customer_sk|
+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+---------------

In [8]:
fact_orders = (
    fact_orders
    .withColumn(
        "delivery_days",
        F.datediff(
            "order_delivered_customer_date",
            "order_purchase_timestamp"
        )
    )
    .withColumn(
        "is_delayed",
        F.col("order_delivered_customer_date")
        > F.col("order_estimated_delivery_date")
    )
    .withColumn("created_at", F.current_timestamp())
)


In [9]:
fact_orders.show(10, truncate=False)
fact_orders.printSchema()
fact_orders.count()


+--------------------------------+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------------------+-------------------+------+-------------+----------+------------+--------------------+----------------------+---------------------+------------+-----------+-------------+----------+--------------------------+
|customer_id                     |seller_id                       |order_id                        |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|product_id                      |shipping_limit_date|price |freight_value|total_paid|payment_type|payment_installments|seller_zip_code_prefix|seller_city          |seller_state|customer_sk|delivery_days|is_delayed|created_at            

113425