# Notebook camada Silver

In [18]:
import findspark
import os
findspark.init()

from pyspark.sql import SparkSession

# Criar uma sessão do Spark
spark = SparkSession.builder.appName("Notebook_Silver").getOrCreate()

from pyspark.sql.functions import col, regexp_replace, upper, lpad
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

## Função para tratar Dados

In [19]:
# Função para tratar o zip code e garantir que tenha 5 dígitos
def tratar_zip_code(df, column_name):
    return df.withColumn(
        column_name, 
        lpad(col(column_name), 5, "0").cast(StringType())  # Preenche com 0 à esquerda se necessário
    )

In [20]:
from pyspark.sql.functions import col, upper, regexp_replace

def tratamento_dados(df, columns):
    """
    Remove acentos e pontuações de uma lista de colunas de texto em um DataFrame do PySpark
    e substitui underscores por espaços.

    Parâmetros:
        df (DataFrame): O DataFrame contendo as colunas a serem tratadas.
        columns (list): Lista de nomes das colunas a serem processadas.

    Retorna:
        DataFrame: O DataFrame com as colunas tratadas.
    """
    for column_name in columns:
        # Converter para maiúsculas
        df = df.withColumn(column_name, upper(col(column_name)))

        # Remover acentos
        df = df.withColumn(
            column_name,
            regexp_replace(
                col(column_name),
                "[áàâãäéèêëíìîïóòôõöúùûüç]",  # Caracteres acentuados
                ""
            )
        )

        # Substituir underscores por espaços
        df = df.withColumn(
            column_name,
            regexp_replace(
                col(column_name),
                "_",  # Substitui underscore por espaço
                " "
            )
        )

        # Remover pontuações
        df = df.withColumn(
            column_name,
            regexp_replace(
                col(column_name),
                "[^a-zA-Z0-9\\s]",  # Remove tudo que não for letra, número ou espaço
                ""
            )
        )


    return df


### Camadas de Origem e Destino

In [21]:
camada_origem = "bronze"
camada_destino = "silver"
os.makedirs(camada_origem, exist_ok=True)

### Tratando tabela customers

In [22]:
# Extraindo tabela da Bronze
customers = spark.read.parquet("bronze/customers/")

# Tratando colunas
customers = tratamento_dados(customers, ["customer_city", "customer_state"])

# Aplicando a função para tratar o zip code
customers = tratar_zip_code(customers, "customer_zip_code_prefix")

# Convertendo colunas para os tipos corretos
customers = (
    customers
    .withColumn("customer_id", col("customer_id").cast(StringType()))
    .withColumn("customer_unique_id", col("customer_unique_id").cast(StringType()))
    .withColumn("customer_zip_code_prefix", col("customer_zip_code_prefix").cast(StringType()))
    .withColumn("customer_city", col("customer_city").cast(StringType()))
    .withColumn("customer_state", col("customer_state").cast(StringType()))
)

# Selecionando explicitamente todas as colunas
customers = customers.select(
    "customer_id",
    "customer_unique_id",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state"
)

# Remove linhas duplicadas
customers = customers.dropDuplicates()

# Salvando na Camada
customers.write.mode("overwrite").parquet(f"{camada_destino}/customers/")

25/02/24 11:35:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Tratando tabela geolocation

In [23]:
# Extraindo tabela da Bronze
geolocation = spark.read.parquet("bronze/geolocation/")

# Tratando colunas
geolocation = tratamento_dados(geolocation, ["geolocation_city", "geolocation_state"])

# Aplicando a função para tratar o zip code
geolocation = tratar_zip_code(geolocation, "geolocation_zip_code_prefix")

# Realizando o cast das colunas
geolocation = (
    geolocation
    .withColumn("geolocation_zip_code_prefix", col("geolocation_zip_code_prefix").cast(StringType()))
    .withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()))
    .withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()))
    .withColumn("geolocation_city", col("geolocation_city").cast(StringType()))
    .withColumn("geolocation_state", col("geolocation_state").cast(StringType()))
)

# Selecionando as colunas necessárias
geolocation = geolocation.select(
    "geolocation_zip_code_prefix",
    "geolocation_lat",
    "geolocation_lng",
    "geolocation_city",
    "geolocation_state"
)

# Remove linhas duplicadas
geolocation = geolocation.dropDuplicates()

# Salvando na camada destino
geolocation.write.mode("overwrite").parquet(f"{camada_destino}/geolocation/")


25/02/24 11:35:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/24 11:35:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/24 11:35:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/24 11:35:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/02/24 11:35:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/02/24 11:35:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/02/24 11:35:36 WARN MemoryManager: Total allocation exceeds 95.

### Tratando tabela Order Items

In [24]:
# Extraindo tabela da Bronze
order_items = spark.read.parquet("bronze/order_items/")

# Tratando colunas
order_items = tratamento_dados(order_items, [])

order_items = (
    order_items
    .withColumn("order_id", col("order_id").cast(StringType()))
    .withColumn("order_item_id", col("order_item_id").cast(IntegerType()))
    .withColumn("product_id", col("product_id").cast(StringType()))
    .withColumn("seller_id", col("seller_id").cast(StringType()))
    .withColumn("shipping_limit_date", col("shipping_limit_date").cast(TimestampType()))
    .withColumn("price", col("price").cast(DoubleType()))
    .withColumn("freight_value", col("freight_value").cast(DoubleType()))
)

# Selecionando colunas necessárias
order_items = order_items.select("order_id", "order_item_id", "product_id", "seller_id", "shipping_limit_date", "price", "freight_value")

# Remove linhas duplicadas
order_items = order_items.dropDuplicates()

# Salvando na Camada
order_items.write.mode("overwrite").parquet(f"{camada_destino}/order_items/")

25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/02/24 11:35:38 WARN MemoryManager: Total allocation exceeds 95.

### Tratando tabela Order Payments

In [25]:
# Extraindo tabela da Bronze
order_payments = spark.read.parquet("bronze/order_payments/")

# Tratando colunas
order_payments = tratamento_dados(order_payments, ["payment_type"])

order_payments = (
    order_payments
    .withColumn("order_id", col("order_id").cast(StringType()))
    .withColumn("payment_sequential", col("payment_sequential").cast(IntegerType()))
    .withColumn("payment_type", col("payment_type").cast(StringType()))
    .withColumn("payment_installments", col("payment_installments").cast(IntegerType()))
    .withColumn("payment_value", col("payment_value").cast(DoubleType()))
)

# Selecionando colunas necessárias
order_payments = order_payments.select("order_id", "payment_sequential", "payment_type", "payment_installments", "payment_value")

# Remove linhas duplicadas
order_payments = order_payments.dropDuplicates()

# Salvando na Camada
order_payments.write.mode("overwrite").parquet(f"{camada_destino}/order_payments/")

                                                                                

### Tratando tabela Order Reviews

In [26]:
# Extraindo tabela da Bronze
order_reviews = spark.read.parquet("bronze/order_reviews/")

# Tratando colunas
order_reviews = tratamento_dados(order_reviews, ["review_comment_title", "review_comment_message"])

order_reviews = (
    order_reviews
    .withColumn("order_id", col("order_id").cast(StringType()))
    .withColumn("review_id", col("review_id").cast(StringType()))
    .withColumn("review_score", col("review_score").cast(IntegerType()))
    .withColumn("review_comment_title", col("review_comment_title").cast(StringType()))
    .withColumn("review_comment_message", col("review_comment_message").cast(StringType()))
    .withColumn("review_creation_date", col("review_creation_date").cast(TimestampType()))
    .withColumn("review_answer_timestamp", col("review_answer_timestamp").cast(TimestampType()))
)

# Selecionando colunas necessárias
order_reviews = order_reviews.select("order_id", "review_id", "review_score", "review_comment_title", "review_comment_message", "review_creation_date", "review_answer_timestamp")

# Remove linhas duplicadas
order_reviews = order_reviews.dropDuplicates()

# Salvando na Camada
order_reviews.write.mode("overwrite").parquet(f"{camada_destino}/order_reviews/")

25/02/24 11:35:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/24 11:35:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/24 11:35:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/24 11:35:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/24 11:35:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Tratando tabela Orders

In [27]:
# Extraindo tabela da Bronze
orders = spark.read.parquet("bronze/orders/")

# Tratando colunas
orders = tratamento_dados(orders, ["order_status"])

orders = (
    orders
    .withColumn("order_id", col("order_id").cast(StringType()))
    .withColumn("customer_id", col("customer_id").cast(StringType()))
    .withColumn("order_status", col("order_status").cast(StringType()))
    .withColumn("order_purchase_timestamp", col("order_purchase_timestamp").cast(TimestampType()))
    .withColumn("order_approved_at", col("order_approved_at").cast(TimestampType()))
    .withColumn("order_delivered_carrier_date", col("order_delivered_carrier_date").cast(TimestampType()))
    .withColumn("order_delivered_customer_date", col("order_delivered_customer_date").cast(TimestampType()))
    .withColumn("order_estimated_delivery_date", col("order_estimated_delivery_date").cast(TimestampType()))
)

# Selecionando colunas necessárias
orders = orders.select("order_id", "customer_id", "order_status", "order_purchase_timestamp", "order_approved_at", "order_delivered_carrier_date", "order_delivered_customer_date", "order_estimated_delivery_date")

# Remove linhas duplicadas
orders = orders.dropDuplicates()

# Salvando na Camada
orders.write.mode("overwrite").parquet(f"{camada_destino}/orders/")

25/02/24 11:35:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/24 11:35:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/24 11:35:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/24 11:35:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/24 11:35:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Tratando tabela Product Category Name Translation

In [28]:
# Extraindo tabela da Bronze
product_category_name_translation = spark.read.parquet("bronze/product_category_name_translation/")

# Tratando colunas
product_category_name_translation = tratamento_dados(product_category_name_translation, ["product_category_name", "product_category_name_english"])

product_category_name_translation = (
    product_category_name_translation
    .withColumn("product_category_name", col("product_category_name").cast(StringType()))
    .withColumn("product_category_name_english", col("product_category_name_english").cast(StringType()))
)

# Selecionando colunas necessárias
product_category_name_translation = product_category_name_translation.select("product_category_name", "product_category_name_english")

# Remove linhas duplicadas
product_category_name_translation = product_category_name_translation.dropDuplicates()

# Salvando na Camada
product_category_name_translation.write.mode("overwrite").parquet(f"{camada_destino}/product_category_name_translation/")

### Tratando tabela Products

In [29]:
# Extraindo tabela da Bronze
products = spark.read.parquet("bronze/products/")

# Tratando colunas
products = tratamento_dados(products, ["product_category_name"])

# Realizando o cast das colunas
products = (
    products
    .withColumn("product_id", col("product_id").cast(StringType()))  
    .withColumn("product_category_name", col("product_category_name").cast(StringType()))
    .withColumn("product_name_lenght", col("product_name_lenght").cast(IntegerType()))
    .withColumn("product_description_lenght", col("product_description_lenght").cast(IntegerType()))
    .withColumn("product_photos_qty", col("product_photos_qty").cast(IntegerType()))
    .withColumn("product_weight_g", col("product_weight_g").cast(IntegerType()))
    .withColumn("product_length_cm", col("product_length_cm").cast(IntegerType()))
    .withColumn("product_height_cm", col("product_height_cm").cast(IntegerType()))
    .withColumn("product_width_cm", col("product_width_cm").cast(IntegerType()))
)

# Selecionando as colunas necessárias
products = products.select(
    "product_id",
    "product_category_name",
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
)

# Remove linhas duplicadas
products = products.dropDuplicates()

# Salvando na Camada
products.write.mode("overwrite").parquet(f"{camada_destino}/products/")


                                                                                

### Tratando tabela sellers

In [30]:
# Extraindo tabela da Bronze
sellers = spark.read.parquet("bronze/sellers/")

# Tratando colunas
sellers = tratamento_dados(sellers, ["seller_city", "seller_state"])

# Aplicando a função para tratar o zip code
sellers = tratar_zip_code(sellers, "seller_zip_code_prefix")

# Realizando o cast das colunas
sellers = (
    sellers
    .withColumn("seller_id", col("seller_id").cast(StringType()))  
    .withColumn("seller_zip_code_prefix", col("seller_zip_code_prefix").cast(StringType()))
    .withColumn("seller_city", col("seller_city").cast(StringType()))
    .withColumn("seller_state", col("seller_state").cast(StringType()))
)

# Selecionando as colunas necessárias
sellers = sellers.select(
    "seller_id",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state",
)

# Remove linhas duplicadas
sellers = sellers.dropDuplicates()

# Salvando na Camada
sellers.write.mode("overwrite").parquet(f"{camada_destino}/sellers/")
