# Notebook - Silver
Notebook responsável por gerar a camada Silver do processo ETL.

A camada Silver consiste em armazenar os dados de forma limpa, ou seja, com os tipos adequados e prontos para utilização. 

No caso:
- Os dados de pedidos foram divididos em duas tabelas: 
  - **`silver.order`** (dados gerais)
  - **`silver.order_item`** (dados dos itens dos pedidos, incluindo guarnições)
- Os dados de restaurantes estão disponíveis na tabela **`silver.restaurant`**
- Os dados de usuários estão disponíveis na tabela **`silver.consumer`**
- Os dados do teste A/B estão estão disponíveis na tabela **`silver.ab_test_ref`**

In [0]:
# Import das bibliotecas
from pyspark.sql.functions import to_date, col, when, from_json, explode, lit
from pyspark.sql.types import *

In [0]:
# Definição das variáveis
catalog = "workspace"
schema_bronze = "bronze"
schema_silver = "silver"
volume = "files"

path_bronze = f"/Volumes/{catalog}/{schema_bronze}/{volume}"
path_silver = f"{catalog}.{schema_silver}"

In [0]:
# Criação da tabela silver.restaurant
df_restaurant = spark.read.csv(f"{path_bronze}/restaurant.csv.gz", header=True, inferSchema=True)
df_restaurant.write.mode("overwrite").saveAsTable(f"{path_silver}.restaurant")

In [0]:
# Criação da tabela silver.consumer
df_consumer = spark.read.csv(f"{path_bronze}/consumer.csv.gz", header=True, inferSchema=True)
df_consumer.write.mode("overwrite").saveAsTable(f"{path_silver}.consumer")

In [0]:
# Criação da tabela silver.ab_test_ref
df_ab_test_ref = spark.read.csv(f"{path_bronze}/ab_test_ref.csv", header=True, inferSchema=True)
df_ab_test_ref.write.mode("overwrite").saveAsTable(f"{path_silver}.ab_test_ref")

In [0]:
# Leitura dos dados de pedidos (Bronze)
df_order = spark.read.json(f"{path_bronze}/order.json.gz")

# Tratamento dos dados de customer_id e criação da coluna created_date (partição)
df_order = df_order \
    .withColumn("customer_id", when(col("customer_id").isNull(), lit("null")).otherwise(col("customer_id"))) \
    .withColumn("created_date", to_date("order_created_at"))

In [0]:
# Criação da tabela silver.order somente com os dados gerais
df_order_info = df_order.drop("items")
df_order_info.write.mode("overwrite").partitionBy("created_date").saveAsTable(f"{path_silver}.order")

In [0]:
# Definição do schema do item do pedido (incluindo guarnições)
item_schema = ArrayType(StructType([
    StructField("name", StringType()),
    StructField("addition", StructType([
        StructField("value", StringType()),
        StructField("currency", StringType())
    ])),
    StructField("discount", StructType([
        StructField("value", StringType()),
        StructField("currency", StringType())
    ])),
    StructField("quantity", DoubleType()),
    StructField("sequence", IntegerType()),
    StructField("unitPrice", StructType([
        StructField("value", StringType()),
        StructField("currency", StringType())
    ])),
    StructField("externalId", StringType()),
    StructField("totalValue", StructType([
        StructField("value", StringType()),
        StructField("currency", StringType())
    ])),
    StructField("customerNote", StringType()),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType()),
        StructField("addition", StructType([
            StructField("value", StringType()),
            StructField("currency", StringType())
        ])),
        StructField("discount", StructType([
            StructField("value", StringType()),
            StructField("currency", StringType())
        ])),
        StructField("quantity", DoubleType()),
        StructField("sequence", IntegerType()),
        StructField("unitPrice", StructType([
            StructField("value", StringType()),
            StructField("currency", StringType())
        ])),
        StructField("categoryId", StringType()),
        StructField("externalId", StringType()),
        StructField("totalValue", StructType([
            StructField("value", StringType()),
            StructField("currency", StringType())
        ])),
        StructField("categoryName", StringType()),
        StructField("integrationId", StringType())
    ]))),
    StructField("integrationId", StringType()),
    StructField("totalAddition", StructType([
        StructField("value", StringType()),
        StructField("currency", StringType())
    ])),
    StructField("totalDiscount", StructType([
        StructField("value", StringType()),
        StructField("currency", StringType())
    ]))
]))

In [0]:
# Explode do campo items
df_order_explode_items = df_order.select(
    col("order_id"),
    col("created_date"),
    explode(from_json(col("items"), item_schema)).alias("item")
)

In [0]:
# Criação do dataframe com os dados gerais dos itens (item_type = "item")
df_order_item = df_order_explode_items.select(
    col("order_id"),
    col("created_date"),
    col("item.name").alias("name"),
    col("item.addition.value").alias("addition_value"),
    col("item.addition.currency").alias("addition_currency"),
    col("item.discount.value").alias("discount_value"),
    col("item.discount.currency").alias("discount_currency"),
    col("item.quantity").alias("quantity"),
    col("item.sequence").alias("sequence"),
    col("item.unitPrice.value").alias("unit_price_value"),
    col("item.unitPrice.currency").alias("unit_price_currency"),
    col("item.externalId").alias("external_id"),
    col("item.totalValue.value").alias("total_value_value"),
    col("item.totalValue.currency").alias("total_value_currency"),
    col("item.customerNote").alias("customer_note"),
    col("item.integrationId").alias("integration_id"),
    col("item.totalAddition.value").alias("total_addition_value"),
    col("item.totalAddition.currency").alias("total_addition_currency"),
    col("item.totalDiscount.value").alias("total_discount_value"),
    col("item.totalDiscount.currency").alias("total_discount_currency"),
    lit(None).cast("string").alias("category_id"),
    lit(None).cast("string").alias("category_name"),
    lit("item").alias("item_type")
)

In [0]:
# Explode do campo garnishItems
df_order_explode_garnish_items = df_order_explode_items.select(
    col("order_id"),
    col("created_date"),
    explode("item.garnishItems").alias("garnish")
)

In [0]:
# Criação do dataframe com os dados dos itens de guarnição (item_type = "garnish")
df_order_garnish_item = df_order_explode_garnish_items.select(
    col("order_id"),
    col("created_date"),
    col("garnish.name").alias("name"),
    col("garnish.addition.value").alias("addition_value"),
    col("garnish.addition.currency").alias("addition_currency"),
    col("garnish.discount.value").alias("discount_value"),
    col("garnish.discount.currency").alias("discount_currency"),
    col("garnish.quantity").alias("quantity"),
    col("garnish.sequence").alias("sequence"),
    col("garnish.unitPrice.value").alias("unit_price_value"),
    col("garnish.unitPrice.currency").alias("unit_price_currency"),
    col("garnish.externalId").alias("external_id"),
    col("garnish.totalValue.value").alias("total_value_value"),
    col("garnish.totalValue.currency").alias("total_value_currency"),
    lit(None).cast("string").alias("customer_note"),
    col("garnish.integrationId").alias("integration_id"),
    lit(None).cast("string").alias("total_addition_value"),
    lit(None).cast("string").alias("total_addition_currency"),
    lit(None).cast("string").alias("total_discount_value"),
    lit(None).cast("string").alias("total_discount_currency"),
    col("garnish.categoryId").alias("category_id"),
    col("garnish.categoryName").alias("category_name"),
    lit("garnish").alias("item_type")
)

In [0]:
# União dos dois dataframes "item" e "garnish" e criação da tabela silver.order_item
df_order_all_items = df_order_item.unionByName(df_order_garnish_item)
df_order_all_items.write.mode("overwrite").partitionBy("created_date").saveAsTable(f"{path_silver}.order_item")