In [1]:
import os
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, IntegerType  # Importação adicionada
from pyspark.sql.functions import col, sum, array, when
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
spark = SparkSession.builder \
    .appName("Movies_70_26") \
    .master("spark://spark:7077") \
    .config("spark.executor.memory", "6g")  \
    .config("spark.executor.cores", "1") \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.11.901") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eee3714a-6da3-4754-a68e-1cad88c15089;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 224ms :: artifacts dl 8ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.1 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|d

In [3]:
def log_null_counts(df, df_name):
    """Conta a quantidade de valores nulos por coluna em um DataFrame"""
    logger.info(f"Verificando nulos em {df_name}...")
    null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).collect()[0]
    for column in df.columns:
        logger.info(f"Coluna {column}: {null_counts[column]} nulos")

def handle_nulls(df):
    """Realiza o tratamento de nulos para todas as colunas"""
    logger.info("Iniciando tratamento de nulos...")
    
    # Remove registros com ID nulo
    initial_count = df.count()
    df = df.filter(col("id").isNotNull())
    logger.info(f"Removidos {initial_count - df.count()} registros com ID nulo")

    # Remove registros com Title nulo
    initial_count = df.count()
    df = df.filter(col("title").isNotNull())
    logger.info(f"Removidos {initial_count - df.count()} registros com titulo nulo")

    # Preenche valores nulos para cada coluna
    fill_values = {
        'overview': 'Sem informação.',
        'release_date': '1900-01-01',
        'vote_average': '0.0'
    }
    
    df = df.na.fill(fill_values)
    
    # Trata array vazio para genre_ids
    df = df.withColumn("genre_ids", 
        when(col("genre_ids").isNull(), array().cast(ArrayType(IntegerType())))
        .otherwise(col("genre_ids")))
    
    logger.info("Tratamento de nulos concluído")
    return df


In [4]:
try:
    logger.info("Iniciando processo de ingestão Bronze -> Silver...")
    
    # Leitura dos dados
    logger.info("Lendo arquivos da camada bronze...")
    df = spark.read.parquet("s3a://bronze/movie_*")
    
    # Log inicial
    logger.info(f"Total de registros brutos: {df.count()}")
    log_null_counts(df, "Dataset Bruto")
    
    # Tratamento de nulos
    df_clean = handle_nulls(df)
    
    # Deduplicação
    logger.info("Removendo duplicatas...")
    initial_count = df_clean.count()
    df_deduplicado = df_clean.dropDuplicates(["id"])
    final_count = df_deduplicado.count()
    
    logger.info(f"Registros removidos por duplicidade: {initial_count - final_count}")
    log_null_counts(df_deduplicado, "Dataset Processado")
    
    # Escrita dos dados
    logger.info("Escrevendo na camada silver...")
    df_deduplicado.write \
        .format("parquet") \
        .mode("overwrite") \
        .save("s3a://silver/movies_70_26/")
    
    # Verificação final
    logger.info("Verificando escrita...")
    df_read = spark.read.parquet("s3a://silver/movies_70_26/")
    logger.info(f"Total de registros escritos: {df_read.count()}")
    df_read.show(5, truncate=False)

except Exception as e:
    logger.error(f"Erro durante o processamento: {str(e)}")
    raise

finally:
    logger.info("Encerrando sessão Spark...")
    spark.stop()

logger.info("Ingestão concluída com sucesso!")

INFO:__main__:Iniciando processo de ingestão Bronze -> Silver...
INFO:__main__:Lendo arquivos da camada bronze...
25/02/28 17:06:13 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
INFO:__main__:Total de registros brutos: 748896                                 
INFO:__main__:Verificando nulos em Dataset Bruto...
INFO:__main__:Coluna id: 0 nulos                                                
INFO:__main__:Coluna title: 0 nulos
INFO:__main__:Coluna overview: 0 nulos
INFO:__main__:Coluna release_date: 0 nulos
INFO:__main__:Coluna vote_average: 0 nulos
INFO:__main__:Coluna genre_ids: 0 nulos
INFO:__main__:Iniciando tratamento de nulos...
INFO:__main__:Removidos 0 registros com ID nulo                                 
INFO:__main__:Removidos 0 registros com titulo nulo                             
INFO:__main__:Tratamento de nulos concluído
INFO:__main__:Removendo duplicatas...
INFO:__main__:Registros removidos por

+---+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------+---------------+
|id |title              |overview                                                                                                                                                                                                                                                                                                                                     |release_date|vote_average|genre_ids      |
+---+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

INFO:__main__:Ingestão concluída com sucesso!
