In [0]:
bronze_path = "/Volumes/ecommerce_cat/ecommerce_schema/bronze"
display(dbutils.fs.ls(bronze_path))

In [0]:
# Listar todos os arquivos dentro do folder 2
bronze_folder2_path = "/Volumes/ecommerce_cat/ecommerce_schema/bronze/2/"
files = dbutils.fs.ls(bronze_folder2_path)
for f in files:
    print(f.name)


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("EDA_Bronze_Folder2").getOrCreate()

# Lista de CSVs a explorar
csv_files = [
    "olist_customers_dataset.csv",
    "olist_geolocation_dataset.csv",
    "olist_order_items_dataset.csv",
    "olist_order_payments_dataset.csv",
    "olist_order_reviews_dataset.csv",
    "olist_orders_dataset.csv",
    "olist_products_dataset.csv",
    "olist_sellers_dataset.csv",
    "product_category_name_translation.csv"
]

bronze_folder2_path = "/Volumes/ecommerce_cat/ecommerce_schema/bronze/2/"

for f in csv_files:
    path = bronze_folder2_path + f
    print(f"\n### Explorando CSV: {f} ###\n")
    
    df = spark.read.option("header", True).option("inferSchema", True).csv(path)
    df.printSchema()   # mostrar tipos de dados
    df.show(5)         # primeiras 5 linhas
    print(f"Total de linhas: {df.count()}")
    
    # Contagem de nulos por coluna
    from pyspark.sql.functions import col, sum
    df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

In [0]:
from pyspark.sql.functions import col, when

# Caminho Bronze e Silver
bronze_path = "/Volumes/ecommerce_cat/ecommerce_schema/bronze/2/"
silver_path = "/Volumes/ecommerce_cat/ecommerce_schema/silver/"

# Lista de CSVs e nomes das tabelas
csv_files = {
    "olist_customers_dataset.csv": "customers",
    "olist_geolocation_dataset.csv": "geolocation",
    "olist_order_items_dataset.csv": "order_items",
    "olist_order_payments_dataset.csv": "order_payments",
    "olist_order_reviews_dataset.csv": "order_reviews",
    "olist_orders_dataset.csv": "orders",
    "olist_products_dataset.csv": "products",
    "olist_sellers_dataset.csv": "sellers",
    "product_category_name_translation.csv": "product_category_translation"
}

# Função para limpeza mínima
def clean_df(df_name, df):
    if df_name == "order_reviews":
        # substituir nulos em comentários por "sem_review"
        df = df.fillna({
            "review_comment_title": "sem_review",
            "review_comment_message": "sem_review"
        })
    if df_name == "products":
        # substituir zeros em campos numéricos por NULL
        numeric_cols = ["product_name_lenght", "product_description_lenght", 
                        "product_photos_qty", "product_weight_g", 
                        "product_length_cm", "product_height_cm", "product_width_cm"]
        for c in numeric_cols:
            df = df.withColumn(c, when(col(c)==0, None).otherwise(col(c)))
    if df_name == "orders":
        # manter nulos nas datas
        pass
    return df

# Loop para ler, limpar e salvar cada CSV como Delta
for csv_file, table_name in csv_files.items():
    print(f"Lendo {csv_file} ...")
    df = spark.read.option("header", "true").option("inferSchema", "true") \
                    .csv(f"{bronze_path}{csv_file}")
    
    # Limpeza mínima
    df = clean_df(table_name, df)
    
    # Salvar em Delta no Silver
    silver_table_path = f"{silver_path}{table_name}"
    df.write.format("delta").mode("overwrite").save(silver_table_path)
    
    print(f"Tabela {table_name} salva em Delta no Silver: {silver_table_path}\n")