<img src="https://github.com/mousastech/medallion/blob/fd1da67c7e3e3829e0ea84fc51c6c79a02e408da/imgs/Medallion.png?raw=true">

#Medallion Architecture
With Unity Catalog

<img src="https://github.com/mousastech/medallion/blob/92d8750f657288477d48ba7e07ac8c8340d49cf3/imgs/architecture.png?raw=true">

[Reference](https://www.databricks.com/glossary/medallion-architecture)

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

In [0]:
%run ./setup 

In [0]:
# Point out the external location mapped before
# Unity Catalog manages all permissions 

pathRaw = "s3a://dlt-ecommerce/bronze"
pathBronze = "s3a://dlt-ecommerce/bronze"
pathSilver = "s3a://dlt-ecommerce/silver"
pathGold = "s3a://dlt-ecommerce/gold"

catalog = "ecommerce" 

In [0]:
# Define the path to the CSV file in the raw location
orderPaymentsRaw = f"{pathRaw}/order_payments/olist_order_payments_dataset.csv"

# Read the CSV file into a DataFrame
dfOrderPayments = spark.read.csv(orderPaymentsRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bronze.order_payments_bronze"
dfOrderPayments.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(table_name)


#Reading from raw 
And create a delta table in the bronze layer

In [0]:
from pyspark.sql.functions import current_timestamp

# Define the path to the CSV file in the raw location
customersRaw = f"{pathRaw}/customers/olist_customers_dataset.csv"

# Read the CSV file into a DataFrame
df_customers = spark.read.csv(customersRaw, header=True, inferSchema=True)
df_customers = df_customers.withColumn("date_load", current_timestamp())

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bbronze.customers_bronze"
df_customers.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(table_name)

In [0]:
# Define the path to the CSV file in the raw location
geolocationRaw = f"{pathRaw}/geolocation/olist_geolocation_dataset.csv"

# Read the CSV file into a DataFrame
df_geolocation = spark.read.csv(geolocationRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table
table_name = f"{catalog}.bbronze.geolocation_bronze"
df_geolocation.write.format("delta").mode("overwrite").saveAsTable(table_name)


In [0]:
# Define the path to the CSV file in the raw location
orderItemsRaw = f"{pathRaw}/order_items/olist_order_items_dataset.csv"

# Read the CSV file into a DataFrame
df_order_items = spark.read.csv(orderItemsRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bbronze.order_items_bronze"
df_order_items.write.format("delta").mode("overwrite").saveAsTable(table_name)

In [0]:
# Define the path to the CSV file in the raw location
orderPaymentsRaw = f"{pathRaw}/order_payments/olist_order_payments_dataset.csv"

# Read the CSV file into a DataFrame
df_order_payments = spark.read.csv(orderItemsRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
#table_name = f"{catalog}.bbronze.order_payments_bronze"
table_name = f"tutorial.original.order_payments"

df_order_payments.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(table_name)

In [0]:
# Define the path to the CSV file in the raw location
orderReviewsRaw = f"{pathRaw}/order_reviews/olist_order_reviews_dataset.csv"

# Read the CSV file into a DataFrame
df_order_reviews = spark.read.csv(orderReviewsRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bbronze.order_reviews_bronze"
df_order_reviews.write.format("delta").mode("overwrite").saveAsTable(table_name)

In [0]:
# Define the path to the CSV file in the raw location
ordersRaw = f"{pathRaw}/orders/olist_orders_dataset.csv"

# Read the CSV file into a DataFrame
df_orders = spark.read.csv(ordersRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bbronze.orders_bronze"
df_orders.write.format("delta").mode("overwrite").saveAsTable(table_name)

In [0]:
# Define the path to the CSV file in the raw location
productsRaw = f"{pathRaw}/products/olist_products_dataset.csv"

# Read the CSV file into a DataFrame
df_products = spark.read.csv(productsRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bbronze.products_bronze"
df_products.write.format("delta").mode("overwrite").saveAsTable(table_name)

In [0]:
# Define the path to the CSV file in the raw location
sellersRaw = f"{pathRaw}/sellers/olist_sellers_dataset.csv"

# Read the CSV file into a DataFrame
df_sellers = spark.read.csv(sellersRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table with the prefix
table_name = f"{catalog}.bbronze.sellers_bronze"
df_sellers.write.format("delta").mode("overwrite").saveAsTable(table_name)

### Optional - just to know how to access a file directly from a github

In [0]:
spark = SparkSession.builder.appName("olist-analysis").getOrCreate()

link_csv_category = "https://raw.githubusercontent.com/mousastech/dlt_ingestion/refs/heads/main/data_olist/product_category_name_translation.csv"

pathRawcategory = "s3a://dlt-ecommerce/bronze"

df_category = spark.createDataFrame(pd.read_csv(link_csv_category))

df_category.write.format("csv").mode("overwrite").save(f'{pathRawcategory}/category/category.csv')

#Bronze
Create delta table and move the data to the bronze bucket

In [0]:
# Define the path to the CSV file in the raw location
customersRaw = f"{pathRaw}/customers/olist_customers_dataset.csv"

# Read the CSV file into a DataFrame
df_customers = spark.read.csv(customersRaw, header=True, inferSchema=True)

# Write the DataFrame to a Delta table
df_customers.write.format("delta").mode("overwrite").saveAsTable("ecommerce.bronze.customers_bronze")

In [0]:
# Carregando dados na Camada Silver

#Customers data
customersSilver = (
    spark.sql(f'''
       SELECT
            customerId,
            customerUniqueId,
            customerZipCodePrefix,
            customerCity,
            customerState,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.customers
          ) AS C
       WHERE
            C.rank = 1
       ''')
)
(
    customersSilver
     .write
     .format("delta")
     .mode("overwrite")
     .saveAsTable(f"tutorial.original.customers_silver")
)

# Geolocation
geolocationSilver = (
    spark.sql(f'''
       SELECT
            geolocationCodePrefix,
            geolocationLat,
            geolocationLng,
            geolocationCity,
            geolocationState,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.geolocation
          ) AS G
       WHERE
            G.rank = 1
       ''')
)
(
geolocationSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.geolocation_silver")
)

# Order_items
order_itemsSilver = (
    spark.sql(f'''
       SELECT
            orderId,
            orderItemId,
            productId,
            sellerId,
            shippingLimitDate,
            price,
            freightValue,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.order_items
          ) AS O
       WHERE
            O.rank = 1
       ''')
)
(
order_itemsSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.order_items")
)

# Order Payments
order_paymentsSilver = (
    spark.sql(f'''
       SELECT
            orderId,
            paymentSequential,
            paymentType,
            paymentInstallments,
            paymentValue,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.order_payments
          ) AS T
       WHERE
            T.rank = 1
       ''')
)
(
order_paymentsSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.order_payments")
)

order_reviewsSilver = (
    spark.sql(f'''
       SELECT
            reviewId,
            orderId,
            reviewScore,
            reviewCommentTitle,
            reviewCommentMessage,
            reviewCreationDate,
            reviewAnswerTimestamp,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.order_reviews
          ) AS R
       WHERE
            R.rank = 1
       ''')
)
(
order_reviewsSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.order_reviews")
)

# Orders
ordersSilver = (
    spark.sql(f'''
       SELECT
            orderId,
            customerId,
            orderStatus,
            orderPurchaseTimestamp,
            orderApprovedAt,
            orderDeliveredCarrierDate,
            orderDeliveredCustomerDate,
            orderEstimatedDeliveryDate,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.orders
          ) AS O
       WHERE
            O.rank = 1
       ''')
)
(
ordersSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.orders")
)

# Products
productsSilver = (
    spark.sql(f'''
       SELECT
            productId,
            productCategoryName,
            productNameLenght,
            productDescriptionLenght,
            productPhotosQty,
            productWeight_g,
            productLength_cm,
            productHeight_cm,
            productWidth_cm,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.products
           ) AS P
       WHERE
            P.rank = 1
       ''')
)
(
productsSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.products")
)

# Sellers
sellersSilver = (
    spark.sql(f'''
       SELECT
            sellerId,
            sellerCodePrefix,
            sellerCity,
            sellerState,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.sellers
          ) AS T
       WHERE
            T.rank = 1
       ''')
)
(
sellersSilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.sellers")
)

# Category
categorySilver = (
    spark.sql(f'''
       SELECT
            productCategory,
            productCategoryNameEnglish,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                dense_rank() over(order by DataRawLoad desc) as rank, * 
            FROM tutorial.original.category
          ) AS C
       WHERE
            C.rank = 1
        ''')
)
(
categorySilver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"tutorial.original.category")
)

In [0]:
%sql

-- Criando a camada Gold, gerando uma tabela juntando todos os dados

CREATE OR REPLACE TABLE tutorial.original.sales
USING DELTA PARTITIONED BY (estadoCliente) 
(
  SELECT
    CASE
      WHEN orders.orderStatus = 'shipped' THEN 'enviado'
      WHEN orders.orderStatus = 'canceled' THEN 'cancelado'
      WHEN orders.orderStatus = 'invoiced' THEN 'faturado'
      WHEN orders.orderStatus = 'created' THEN 'criado'
      WHEN orders.orderStatus = 'delivered' THEN 'entregue'
      WHEN orders.orderStatus = 'unavailable' THEN 'indisponível'
      WHEN orders.orderStatus = 'processing' THEN 'em processamento'
      WHEN orders.orderStatus = 'approved' THEN 'aprovado'
    END AS statusDoPedido,
    orders.orderPurchaseTimestamp AS horaCompraPedido,
    orders.orderApprovedAt AS horaPedidoAprovado,
    orders.orderEstimatedDeliveryDate AS dataEstimadaEntrega,
    DATEDIFF(
      orders.orderEstimatedDeliveryDate,
      orders.orderApprovedAt
    ) AS dataEntregaEmDias,
    order_reviews.reviewScore AS notaProduto,
    order_reviews.reviewAnswerTimestamp AS dataComentarioSobreProduto,
    CASE
      WHEN order_payments.paymentType = 'credit_card' THEN 'cartao_de_credito'
      WHEN order_payments.paymentType = 'boleto' THEN 'boleto'
      WHEN order_payments.paymentType = 'not_defined' THEN 'não_definido'
      WHEN order_payments.paymentType = 'voucher' THEN 'voucher'
      WHEN order_payments.paymentType = 'debit_card' THEN 'cartao_de_debito'
    END AS meioDePagamento,
    order_payments.paymentInstallments AS parcelamento,
    order_payments.paymentValue AS valorPago,
    customers.customerCity AS cidadeCliente,
    customers.customerState AS estadoCliente
  FROM
    tutorial.original.orders
    LEFT JOIN tutorial.original.order_payments
        order_payments ON order_payments.orderId = orders.orderId
    LEFT JOIN tutorial.original.order_reviews 
        order_reviews ON order_reviews.orderId = orders.orderId
    LEFT JOIN tutorial.original.customers
        customers ON customers.customerId = orders.customerId
)

In [0]:
%sql
SELECT * FROM tutorial.original.sales LIMIT 10

# Visualizações

In [0]:
%sql

SELECT
  estadoCliente AS estados,
  meioDePagamento AS `meio de pagamento`,
  count(*) AS `percentual`
FROM
  tutorial.original.sales
WHERE
  meioDePagamento IS NOT NULL
  AND YEAR(horaPedidoAprovado) IS NOT NULL
  AND statusDoPedido = "entregue"
GROUP BY
  estadoCliente,
  meioDePagamento

Databricks visualization. Run in Databricks to view.

In [0]:
%sql 

SELECT
  T.estadoCliente AS estados,
  T.diasEntrega AS `média de dias para entrega de produto`
FROM
  (
    SELECT
      estadoCliente,
      ROUND(AVG(dataEntregaEmDias), 0) AS DiasEntrega
    FROM
      tutorial.original.sales
    WHERE
      meioDePagamento IS NOT NULL
      AND YEAR(horaPedidoAprovado) IS NOT NULL
      AND statusDoPedido <> "cancelado"
    GROUP BY
      estadoCliente
  ) AS T

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT
    estadoCliente as `Estado`,
    Year(horaPedidoAprovado) as `Ano`,
    Count(*) as `Numero de Vendas`
FROM
    tutorial.original.sales
WHERE
    statusDoPedido = "entregue"
And
    Year(horaPedidoAprovado) = "2018"
GROUP BY
    Estado, Ano


Databricks visualization. Run in Databricks to view.

In [0]:
%sql

SELECT
  ROUND(SUM(valorPago)/Count(*), 2) as `Ticket Médio`,
  Month(horaPedidoAprovado)
FROM
  tutorial.original.sales
WHERE
  statusDoPedido = "entregue" AND Year(horaPedidoAprovado) = "2017"
GROUP BY
  Month(horaPedidoAprovado)

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT
            customerId,
            customerUniqueId,
            customerZipCodePrefix,
            customerCity,
            customerState,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.customers
          ) AS C
       WHERE
            C.rank = 1

In [0]:
%sql
SELECT
            orderId,
            orderItemId,
            productId,
            sellerId,
            shippingLimitDate,
            price,
            freightValue,
            CAST(DataRawLoad AS TIMESTAMP) AS DataRawLoad
       FROM
          (
            SELECT 
                DENSE_RANK() OVER(ORDER BY DataRawLoad DESC) AS rank, * 
            FROM tutorial.original.order_items
          ) AS O
       WHERE
            O.rank = 1
       ;