In [6]:
from pyspark.sql import SparkSession

# Inicializar Spark Session
spark = SparkSession.builder \
 .appName("ETL Pipeline - Bronze to Silver") \
 .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar") \
 .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
 .config("spark.hadoop.fs.s3a.connection.maximum", "100") \
 .getOrCreate()

print(spark.version)

3.5.0


In [7]:
s3_path = "s3a://my-bucket-ry-01/raw-data/ipca/kafka/"

try:
    # Tentar ler os arquivos do diretório no S3
    files = spark.read.text(s3_path).collect()
    print(f"Arquivos no S3: {len(files)} arquivos encontrados.")
except Exception as e:
    print(f"Erro ao acessar o S3: {e}")


Erro ao acessar o S3: An error occurred while calling o59.text.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNeces

In [41]:
# Caminho dos dados brutos no S3 (camada Bronze)
bronze_path = "s3a://my-bucket-ry-01/raw-data/ipca/kafka/"

In [36]:
# Ler os dados brutos do S3
df_bronze = spark.read.json(bronze_path)


In [37]:
df_bronze.show()

+-----------+-------------+---------------+-----------+-------------+------------+----+----------+-------------+---------+
|CompraManha|    Data_Base|Data_Vencimento|PUBaseManha|PUCompraManha|PUVendaManha|Tipo|VendaManha|    dt_update|partition|
+-----------+-------------+---------------+-----------+-------------+------------+----+----------+-------------+---------+
|       5.87|1653868800000|  1723680000000|    3486.89|      3497.07|     3486.89|IPCA|      5.99|1743256494149|        0|
|       5.69|1653868800000|  2062800000000|    1911.52|      1940.53|     1911.52|IPCA|      5.81|1743256494150|        0|
|       5.37|1648425600000|  2062800000000|    1922.78|      1952.84|     1922.78|IPCA|      5.49|1743256494158|        0|
|       5.44|1648166400000|  2378419200000|    1108.45|      1139.19|     1108.45|IPCA|      5.56|1743256494159|        0|
|       5.63|1648080000000|  2378419200000|    1062.86|      1091.63|     1062.86|IPCA|      5.75|1743256494166|        0|
|       5.62|164

In [17]:
# Transformação para a Camada Silver (Limpeza e Transformação)

In [24]:
from pyspark.sql.functions import avg, count, from_unixtime, col

In [25]:
# Remover duplicações
df_silver = df_bronze.dropDuplicates()

In [26]:
# Tratar timestamps e converter para formato de data legível
df_silver = df_silver.withColumn("Data_Vencimento", from_unixtime(col("Data_Vencimento") /
1000, "yyyy-MM-dd")) \
 .withColumn("Data_Base", from_unixtime(col("Data_Base") / 1000, "yyyy-MM-dd")) \
 .withColumn("dt_update", from_unixtime(col("dt_update") / 1000, "yyyy-MM-dd HH:mm:ss"))

print("Fim - Tratar timestamps")

Fim - Tratar timestamps


In [27]:
# Tratar valores nulos (exemplo: preencher com 0)
df_silver = df_silver.fillna({
 "PUCompraManha": 0,
 "PUVendaManha": 0,
 "PUBaseManha": 0
})

print("Fim - Tratar valores nulos")

Fim - Tratar valores nulos


In [28]:
# Exibir os dados da camada Silver
df_silver.show()

+-----------+----------+---------------+-----------+-------------+------------+----+----------+-------------------+---------+
|CompraManha| Data_Base|Data_Vencimento|PUBaseManha|PUCompraManha|PUVendaManha|Tipo|VendaManha|          dt_update|partition|
+-----------+----------+---------------+-----------+-------------+------------+----+----------+-------------------+---------+
|       4.21|2021-05-21|     2045-05-15|    1281.33|      1318.18|     1282.41|IPCA|      4.33|2025-03-29 13:54:54|        0|
|       1.91|2021-02-03|     2024-08-15|    3214.85|      3228.73|     3215.42|IPCA|      2.03|2025-03-29 13:54:54|        0|
|       5.87|2022-05-30|     2024-08-15|    3486.89|      3497.07|     3486.89|IPCA|      5.99|2025-03-29 13:54:54|        0|
|       5.32|2021-11-08|     2035-05-15|    1817.75|      1846.84|     1817.75|IPCA|      5.44|2025-03-29 13:54:54|        0|
|       5.44|2022-03-25|     2045-05-15|    1108.45|      1139.19|     1108.45|IPCA|      5.56|2025-03-29 13:54:54|   

In [29]:
# Salvar a camada Silver de volta no S3
silver_path = "s3a://my-bucket-desafio-edu-01/processed-data/ipca/silver/"
df_silver.write.mode("overwrite").parquet(silver_path)

print("Fim - Salvar camada Silver no S3")

Fim - Salvar camada Silver no S3


In [31]:
# Transformação para a Camada Gold (Agregação e Enriquecimento)

In [32]:
from pyspark.sql.functions import avg, count

In [33]:
# Calcular métricas agregadas
df_gold = df_silver.groupBy("Tipo").agg(
 avg("PUCompraManha").alias("Media_PUCompraManha"),
 avg("PUVendaManha").alias("Media_PUVendaManha"),
 count("*").alias("Total_Registros")
)

In [34]:
# Exibir os dados agregados (Gold)
df_gold.show()

+----+-------------------+------------------+---------------+
|Tipo|Media_PUCompraManha|Media_PUVendaManha|Total_Registros|
+----+-------------------+------------------+---------------+
|IPCA|  2362.883066666666|2339.0757333333336|            150|
+----+-------------------+------------------+---------------+



In [35]:
# Salvar a camada Gold no S3
gold_path = "s3a://my-bucket-desafio-edu-01/analytics/ipca/gold/"
df_gold.write.mode("overwrite").parquet(gold_path)