In [9]:
# LINHAS: Bronze -> Silver (Parquet, sem Delta)

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, explode, current_timestamp, to_date, lit

# --- Paths ---
today = datetime.now().strftime("%Y/%m/%d")
BRONZE_PATH = f"s3a://bronze/linhas/{today}/"   # ajuste se seu prefixo for outro (ex.: linhas_ref)
SILVER_PATH = "s3a://silver/dim_linhas/"

    

print(f"Lendo Bronze de: {BRONZE_PATH}")
print(f"Gravando Silver em: {SILVER_PATH}")

Lendo Bronze de: s3a://bronze/linhas/2025/11/07/
Gravando Silver em: s3a://silver/dim_linhas/


In [10]:
spark = (
    SparkSession.builder.appName("BronzeToSilver_Linhas_Parquet")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "admin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .getOrCreate()
)

In [11]:
# --- Schema do JSON de linhas (array na raiz) ---
schema_item = StructType([
    StructField("cl", IntegerType(), True),   # código interno da linha
    StructField("lc", BooleanType(), True),
    StructField("lt", StringType(), True),    # código visível (ex.: "1012")
    StructField("sl", IntegerType(), True),   # sentido
    StructField("tl", IntegerType(), True),   # tipo linha (qdo existir)
    StructField("tp", StringType(), True),    # terminal origem
    StructField("ts", StringType(), True),    # terminal destino
])
schema_array = ArrayType(schema_item)

In [12]:
# --- Leitura robusta (array na raiz) ---
# Se seus arquivos estão em múltiplas pastas, o /**/*.json ajuda.
raw = spark.sparkContext.wholeTextFiles(f"{BRONZE_PATH.rstrip('/')}/**/*.json").toDF(["path", "raw"])
df_arr = raw.selectExpr("path", "from_json(raw, CAST(NULL AS ARRAY<STRUCT<cl:INT,lc:BOOLEAN,lt:STRING,sl:INT,tl:INT,tp:STRING,ts:STRING>>)) as arr")
df_lin = df_arr.select("path", explode(col("arr")).alias("r"))

Py4JJavaError: An error occurred while calling o43.partitions.
: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern s3a://bronze/linhas/2025/11/07/**/*.json matches 0 files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:340)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:279)
	at org.apache.spark.input.WholeTextFileInputFormat.setMinPartitions(WholeTextFileInputFormat.scala:52)
	at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:54)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Input Pattern s3a://bronze/linhas/2025/11/07/**/*.json matches 0 files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:315)
	... 25 more


In [7]:
# --- Limpeza/seleção (snapshot do dia) ---
df_clean = (
    df_lin
    .select(
        col("r.cl").alias("line_id"),
        col("r.lt").alias("line_code"),
        col("r.sl").alias("sentido"),
        col("r.tl").alias("tipo_linha"),
        col("r.tp").alias("terminal_origem"),
        col("r.ts").alias("terminal_destino"),
    )
    .dropDuplicates(["line_id", "line_code", "sentido"])       # evita duplicatas do mesmo snapshot
    .withColumn("dt", to_date(current_timestamp()))            # partição de snapshot diário
    .withColumn("ingest_ts", current_timestamp())
)


NameError: name 'df_lin' is not defined

In [6]:
# Enriquecimento
df_clean = (
    df_exploded.select(
        col("linha.c").alias("codigo_linha_texto"),
        col("linha.cl").cast("int").alias("codigo_linha"),
        col("tipo_linha"),
        col("linha.sl").alias("sentido"),
        col("linha.lt0").alias("terminal_inicial"),
        col("linha.lt1").alias("terminal_final"),
        col("veiculo.p").alias("codigo_veiculo"),
        col("veiculo.a").alias("acessibilidade"),
        to_timestamp(col("veiculo.ta")).alias("ultima_atualizacao"),
        col("veiculo.py").alias("latitude"),
        col("veiculo.px").alias("longitude"),
        to_timestamp(col("hr")).alias("hora_referencia"),
    )
    .dropDuplicates(["codigo_veiculo", "hora_referencia"])
    .withColumn("data_ref", to_date(col("ultima_atualizacao")))
    .withColumn("data_coleta", to_date(col("hora_referencia")))
    .withColumn("data_ingestao", to_date(current_timestamp()))
    .withColumn("ingest_timestamp", current_timestamp())
)

In [7]:
# --- Escrita em Parquet (snapshot diário) ---
# Para snapshot do dia, normalmente usamos overwrite NA PARTIÇÃO do dia.
# Se preferir append puro, troque para .mode("append").
(
    df_clean
    .write
    .mode("overwrite")
    .partitionBy("dt")
    .parquet(SILVER_PATH)
)

print("✅ Linhas: transformação concluída e salva em Parquet na camada Silver.")


✅ Transformação concluída e salva em Parquet na camada Silver.


In [8]:
# --- Validação rápida ---
df_result = spark.read.parquet(SILVER_PATH)
df_result.orderBy(col("dt").desc(), col("line_id")).show(10, truncate=False)

+------------+----------+-------------------+--------------+
|codigo_linha|data_ref  |hora_referencia    |codigo_veiculo|
+------------+----------+-------------------+--------------+
|32975       |2025-11-03|2025-11-03 00:08:00|31111         |
|32975       |2025-11-03|2025-11-03 00:16:00|31111         |
|32975       |2025-11-03|2025-11-03 23:54:00|31111         |
|32975       |2025-11-03|2025-11-03 00:08:00|31117         |
|32975       |2025-11-03|2025-11-03 00:16:00|31117         |
|32975       |2025-11-03|2025-11-03 23:54:00|31117         |
|32975       |2025-11-03|2025-11-03 00:08:00|31171         |
|32975       |2025-11-03|2025-11-03 00:16:00|31171         |
|32975       |2025-11-03|2025-11-03 23:54:00|31171         |
|32975       |2025-11-03|2025-11-03 00:08:00|31186         |
+------------+----------+-------------------+--------------+
only showing top 10 rows

