# Transformação da Camada Silver — Open Meteo
Neste notebook transformamos os dados brutos da camada Bronze em dados limpos e prontos para análise, compondo a camada Silver do Data Lake.


In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import col


In [2]:
builder = SparkSession.builder \
    .appName("SilverTransformOpenMeteo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


25/04/14 20:20:52 WARN Utils: Your hostname, obi-wan-kenote resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/14 20:20:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/kenote_ubuntu/projetos/Airflow/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/kenote_ubuntu/.ivy2/cache
The jars for the packages stored in: /home/kenote_ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7e266975-6a4f-4dfb-9e23-dc14a06ab2db;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 132ms :: artifacts dl 7ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3  

25/04/14 20:20:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Caminho absoluto da camada Bronze
bronze_path = "/home/kenote_ubuntu/projetos/Airflow/data/bronze/open_meteo"

# Leitura dos dados brutos (Delta)
df_bronze = spark.read.format("delta").load(bronze_path)

# Visualizando as primeiras linhas
df_bronze.show(5)


[Stage 0:>                                                          (0 + 3) / 3]                                                                                

25/04/14 20:21:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.






                                                                                

+---------+----------+--------+--------+
|     city|      date|temp_min|temp_max|
+---------+----------+--------+--------+
|São Paulo|2025-04-18|    17.3|    26.8|
|São Paulo|2025-04-19|    17.8|    25.5|
|São Paulo|2025-04-20|    16.0|    21.6|
|São Paulo|2025-04-21|    15.3|    20.8|
|São Paulo|2025-04-22|    14.9|    20.8|
+---------+----------+--------+--------+
only showing top 5 rows



In [4]:
# Renomeando colunas para padrão snake_case (caso necessário)
df_silver = df_bronze.select(
    col("city").alias("city"),
    col("date").alias("date"),
    col("temp_min").alias("temperature_min_c"),
    col("temp_max").alias("temperature_max_c")
)

# Conversão de tipos, se necessário
# (Spark já infere corretamente, mas vamos garantir que temperatura seja float)
df_silver = df_silver.withColumn("temperature_min_c", col("temperature_min_c").cast("double"))
df_silver = df_silver.withColumn("temperature_max_c", col("temperature_max_c").cast("double"))


In [5]:
#visualização dos dados tratados
df_silver.show(10)
df_silver.printSchema()


+-----------+----------+-----------------+-----------------+
|       city|      date|temperature_min_c|temperature_max_c|
+-----------+----------+-----------------+-----------------+
|   Cabedelo|2025-04-23|             25.8|             28.3|
|   Cabedelo|2025-04-24|             25.3|             28.2|
|   Cabedelo|2025-04-25|             25.0|             28.2|
|   Cabedelo|2025-04-26|             25.4|             28.4|
|   Cabedelo|2025-04-27|             25.8|             28.3|
|   Cabedelo|2025-04-28|             25.7|             28.3|
|   Cabedelo|2025-04-29|             26.4|             28.9|
|João Pessoa|2025-04-14|             24.3|             30.8|
|João Pessoa|2025-04-15|             25.7|             31.3|
|  São Paulo|2025-04-18|             17.3|             26.8|
+-----------+----------+-----------------+-----------------+
only showing top 10 rows

root
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- temperature_min_c: double (nullable = 

In [6]:
#Salvando a camada silves como Delta

# Caminho absoluto da camada Silver
silver_path = "/home/kenote_ubuntu/projetos/Airflow/data/silver/open_meteo"

# Salvando os dados limpos no formato Delta
df_silver.write.format("delta") \
    .mode("overwrite") \
    .save(silver_path)


25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/04/14 20:21:08 WARN MemoryManager: Total allocation exceeds 95.



In [7]:
#Verificando os dados salvos

# Lendo novamente para validar o conteúdo da Silver
df_check = spark.read.format("delta").load(silver_path)

# Exibindo os dados para garantir que tudo está correto
df_check.show(5)
df_check.printSchema()


+------+----------+-----------------+-----------------+
|  city|      date|temperature_min_c|temperature_max_c|
+------+----------+-----------------+-----------------+
|Bogotá|2025-04-17|             11.6|             20.6|
|Bogotá|2025-04-18|             13.4|             21.3|
|Bogotá|2025-04-19|             13.1|             21.6|
|Bogotá|2025-04-20|             13.3|             19.3|
|Bogotá|2025-04-21|             11.5|             19.0|
+------+----------+-----------------+-----------------+
only showing top 5 rows

root
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- temperature_min_c: double (nullable = true)
 |-- temperature_max_c: double (nullable = true)



# Conclusao

✅ Leitura dos dados da Bronze (caminho absoluto)
✅ Transformações de limpeza e padronização
✅ Salvar resultado da Silver como Delta
✅ Comentários bem didáticos
✅ Verificação final do conteúdo salvo