# Criar SparkSession com suporte ao MinIO

In [1]:
from pyspark.sql import SparkSession

# Credenciais do MinIO (mesmo valor do seu .env)
MINIO_ENDPOINT  = "http://minio:9000"
MINIO_ACCESS    = "minio"
MINIO_SECRET    = "minio123"

spark = (
    SparkSession.builder
    .appName("moviepulse-bronze-reader")

    # --- JARs necessÃ¡rios para falar com MinIO via protocolo S3 ---
    # hadoop-aws: integraÃ§Ã£o do Hadoop com S3/MinIO
    # aws-java-sdk-bundle: SDK Java da AWS (autenticaÃ§Ã£o, requests)
    # A versÃ£o 3.3.4 Ã© compatÃ­vel com Spark 3.5.x
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262"
    )

    # --- ConfiguraÃ§Ãµes S3A para apontar para o MinIO ---
    .config("spark.hadoop.fs.s3a.endpoint",               MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key",             MINIO_ACCESS)
    .config("spark.hadoop.fs.s3a.secret.key",             MINIO_SECRET)

    # path.style.access=true Ã© OBRIGATÃ“RIO para MinIO
    # (MinIO nÃ£o suporta virtual-hosted style como a AWS real)
    .config("spark.hadoop.fs.s3a.path.style.access",      "true")

    # ImplementaÃ§Ã£o S3A do Hadoop
    .config("spark.hadoop.fs.s3a.impl",                   "org.apache.hadoop.fs.s3a.S3AFileSystem")

    # Desabilita SSL (MinIO local nÃ£o tem certificado)
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")

    .getOrCreate()
)

# Reduz o volume de logs no notebook
spark.sparkContext.setLogLevel("WARN")

print(f" SparkSession criada! Spark version: {spark.version}")

âœ… SparkSession criada! Spark version: 3.5.0


# Verificar conectividade com o MinIO
Lista os arquivos no bucket Bronze antes de ler 

In [2]:
hadoop = spark._jvm.org.apache.hadoop
fs     = hadoop.fs.FileSystem.get(
    spark._jvm.java.net.URI.create("s3a://bronze"),
    spark._jsc.hadoopConfiguration()
)

path = hadoop.fs.Path("s3a://bronze/tmdb/trending_daily/")

try:
    files = fs.listFiles(path, True)  # True = recursivo
    count = 0
    while files.hasNext():
        f = files.next()
        print(f"ðŸ“„ {f.getPath()} | {f.getLen()} bytes")
        count += 1
    print(f"\n Total de arquivos encontrados: {count}")
except Exception as e:
    print(f" Erro ao acessar o MinIO: {e}")
    print("\n Verifique se:")
    print("   - O container minio estÃ¡ rodando")
    print("   - O bucket 'bronze' existe")
    print("   - As credenciais batem com o seu .env")

ðŸ“„ s3a://bronze/tmdb/trending_daily/dt=2025-12-15/events-20251215T041150Z.jsonl | 22721 bytes
ðŸ“„ s3a://bronze/tmdb/trending_daily/dt=2025-12-15/events-20251215T041458Z.jsonl | 22578 bytes
ðŸ“„ s3a://bronze/tmdb/trending_daily/dt=2025-12-15/events-20251215T044051Z.jsonl | 45182 bytes
ðŸ“„ s3a://bronze/tmdb/trending_daily/dt=2026-01-11/events-20260111T011148Z.jsonl | 22377 bytes
ðŸ“„ s3a://bronze/tmdb/trending_daily/dt=2026-01-12/events-20260112T010447Z.jsonl | 22876 bytes

âœ… Total de arquivos encontrados: 5


# Ler os arquivos JSONL da camada Bronze

In [3]:
BRONZE_PATH = "s3a://bronze/tmdb/trending_daily/"

df_bronze = (
    spark.read
    .option("recursiveFileLookup", "true")  # lÃª subpastas dt=YYYY-MM-DD
    .option("multiline", "false")            # JSONL = 1 JSON por linha
    .json(BRONZE_PATH)
)

print(f" DataFrame carregado!")
print(f"   Linhas  : {df_bronze.count()}")
print(f"   Colunas : {len(df_bronze.columns)}")

âœ… DataFrame carregado!
   Linhas  : 121
   Colunas : 15


## Explorar schema e dados

In [4]:
# Schema inferido automaticamente pelo Spark
df_bronze.printSchema()

root
 |-- _kafka: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- partition: long (nullable = true)
 |    |-- topic: string (nullable = true)
 |    |-- ts_ms: long (nullable = true)
 |-- adult: boolean (nullable = true)
 |-- event_type: string (nullable = true)
 |-- extraction_ts: string (nullable = true)
 |-- movie_id: long (nullable = true)
 |-- original_language: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- raw: struct (nullable = true)
 |    |-- adult: boolean (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- genre_ids: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- id: long (nullable = true)
 |    |-- media_type: string (nullable = true)
 |    |-- original_language: string (nullable = true)
 |    |-- original_title: string (nullable = true)
 |    |-- overview: string (nullable = true)
 |    |-- popularity: double (nullable

In [8]:
# Primeiras linhas com as colunas principais
df_bronze.select(
    "movie_id",
    "title",
    "source_date",
    "popularity",
    "vote_average",
    "vote_count",
    "original_language"
).show(20, truncate=False)

+--------+-------------------------------------+-----------+----------+------------+----------+-----------------+
|movie_id|title                                |source_date|popularity|vote_average|vote_count|original_language|
+--------+-------------------------------------+-----------+----------+------------+----------+-----------------+
|812583  |Vivo ou Morto: Um MistÃ©rio Knives Out|2025-12-15 |368.6188  |7.408       |482       |en               |
|812583  |Vivo ou Morto: Um MistÃ©rio Knives Out|2025-12-15 |368.6188  |7.408       |482       |en               |
|798645  |O Sobrevivente                       |2025-12-15 |429.9601  |6.776       |473       |en               |
|798645  |O Sobrevivente                       |2025-12-15 |429.9601  |6.776       |473       |en               |
|1084242 |Zootopia 2                           |2025-12-15 |431.2602  |7.669       |588       |en               |
|1084242 |Zootopia 2                           |2025-12-15 |431.2602  |7.669       |58

In [6]:
# EstatÃ­sticas descritivas das mÃ©tricas numÃ©ricas
df_bronze.select("popularity", "vote_average", "vote_count").describe().show()

+-------+------------------+-----------------+------------------+
|summary|        popularity|     vote_average|        vote_count|
+-------+------------------+-----------------+------------------+
|  count|               120|              120|               120|
|   mean|        151.303715|6.809258333333331|1287.3916666666667|
| stddev|147.91603799730422|1.438985896975602| 3218.911691171447|
|    min|            7.0898|              0.0|                 0|
|    max|          816.4153|            8.282|             33220|
+-------+------------------+-----------------+------------------+



In [7]:
# Registros por data de coleta â€” confirma que o pipeline estÃ¡ rodando
df_bronze.groupBy("source_date").count().orderBy("source_date").show()

+-----------+-----+
|source_date|count|
+-----------+-----+
|       NULL|    1|
| 2025-12-15|   80|
| 2026-01-11|   20|
| 2026-01-12|   20|
+-----------+-----+



# Checar qualidade dos dados (nulos)
Boa prÃ¡tica antes de avanÃ§ar para a camada Silver.

In [9]:
from pyspark.sql import functions as F

cols_to_check = ["movie_id", "title", "source_date", "popularity", "vote_average", "vote_count", "release_date"]

null_counts = df_bronze.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in cols_to_check
])

print("ðŸ“Š Contagem de nulos por coluna:")
null_counts.show()

ðŸ“Š Contagem de nulos por coluna:
+--------+-----+-----------+----------+------------+----------+------------+
|movie_id|title|source_date|popularity|vote_average|vote_count|release_date|
+--------+-----+-----------+----------+------------+----------+------------+
|       1|    1|          1|         1|           1|         1|           1|
+--------+-----+-----------+----------+------------+----------+------------+

