# Schema Bronze

Autor: Thiago Vilarinho Lemes\
Projeto: ETL no Databricks utilizando Catalog \
Data: 14/09/2025

In [62]:
from databricks.connect import DatabricksSession
from pyspark.sql import functions as F
import os
from dotenv import load_dotenv
import pandas as pd

In [63]:
# Carrega as variáveis de ambiente do arquivo .env

load_dotenv()

catalog_name = os.getenv("NAME_CATALOG")

schema_bronze = os.getenv("NAME_SCHEMA_BRONZE")
table_bronze = os.getenv("NAME_TABLE_BRONZE")

schema_silver = os.getenv("NAME_SCHEMA_SILVER")
table_silver = os.getenv("NAME_TABLE_SILVER")

schema_gold = os.getenv("NAME_SCHEMA_GOLD")
table_gold = os.getenv("NAME_TABLE_GOLD")

schema_raw = os.getenv("NAME_SCHEMA_RAW")
bucket_raw = os.getenv("NAME_STORAGE")

In [64]:
# Cria a sessão Spark usando Databricks Connect
spark = DatabricksSession.builder.getOrCreate()

In [65]:
# Seleciona o catalog e schema a serem utilizados
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_bronze}")

In [66]:
try:
    # Carrega os datasets dos arquivos Parquet
    df_1 = pd.read_parquet("../../dataset/netflix_v01.parquet")
    df_2 = pd.read_parquet("../../dataset/netflix_v02.parquet")

    # Transforma os DataFrames do Pandas em DataFrames do Spark
    df_1 = spark.createDataFrame(df_1)
    df_2 = spark.createDataFrame(df_2)
    print("Arquivos Parquet carregados com sucesso.")
except Exception as e:
    print("Erro ao carregar os arquivos Parquet:", e)
    raise

Arquivos Parquet carregados com sucesso.


In [57]:
# Verifica o schema e o tamanho dos DataFrames
print("DataFrame 1:")
print("Tamanho:", df_1.count(), "linhas")
print("*" * 20)
print(df_1.show(5))
print("\n")
print("DataFrame 2:")
print("Tamanho:", df_2.count(), "linhas")
print("*" * 20)
print(df_2.show(5))

DataFrame 1:
Tamanho: 8807 linhas
********************
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                NULL|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           NULL|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Ju

Analise Exploratória

In [70]:
# Quantidade de linhas e colunas dos DataFrames
print("Dataframe 1")
print(f"Quantidade de linha: {df_1.count()} linhas")
print(f"Quantidade de colunas: {len(df_1.columns)} colunas")
print("*" * 50)
print("Dataframe 2")
print(f"Quantidade de linha: {df_2.count()} linhas")
print(f"Quantidade de colunas: {len(df_2.columns)} colunas")

Dataframe 1
Quantidade de linha: 8807 linhas
Quantidade de colunas: 12 colunas
**************************************************
Dataframe 2
Quantidade de linha: 8702 linhas
Quantidade de colunas: 12 colunas


In [None]:
# Nome das colunas
print("Colunas dos DataFrames 1")
print(df_1.columns)

print("*" * 139)

print("Colunas dos DataFrames 2")
print(df_2.columns)

print("*" * 139)

# Verifica se os DataFrames possuem as mesmas colunas para realizar o merge
if set(df_1.columns) != set(df_2.columns):
    raise ValueError("Os DataFrames possuem colunas diferentes.")
else:
    print("Os DataFrames possuem as mesmas colunas.")

Colunas dos DataFrames 1
['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration', 'listed_in', 'description']
*******************************************************************************************************************************************
Colunas dos DataFrames 2
['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration', 'listed_in', 'description']
*******************************************************************************************************************************************
Os DataFrames possuem as mesmas colunas.


In [None]:
# 1° Forma - Realiza o merge dos datasets
# df_combined = df_1.join(df_2, on="id", how="outer")  # outer join mantém todas as linhas

In [None]:
# 2° Forma - Realiza o merge dos datasets
df = df_1.unionByName(df_2)

In [59]:
print("DataFrame:")
print("Tamanho:", df.count(), "linhas")
print("*" * 21)
df.show(5, truncate=False)

DataFrame:
Tamanho: 17509 linhas
*********************
+-------+-------+---------------------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+------------+------+---------+-------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|show_id|type   |title                |director       |cast                                                                                                                                                                                                                                                        

In [60]:
try:
    df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_raw}.{bucket_raw}")
    print("DataFrames carregados com sucesso!")
except Exception as e:
    print(f"Erro ao carregar DataFrames: {e}")

DataFrames carregados com sucesso!
