In [None]:
from pyspark.sql import SparkSession

In [None]:
# Cria sessão Spark com configs para MinIO
spark = SparkSession.builder \
    .appName("SPTransPipeline") \
    .master("spark://spark:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

In [None]:
# Força as configs no Hadoop (garante que executores também tenham acesso)
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.access.key", "minioadmin")
hadoop_conf.set("fs.s3a.secret.key", "minioadmin")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")

In [None]:
# ✅ Leitura dos dados do MinIO
linhas_df = spark.read.parquet("s3a://sptrans-data/bronze/linhas")
posicao_df = spark.read.parquet("s3a://sptrans-data/bronze/posicao")


In [None]:
linhas_df.show(5)

In [None]:
posicao_df.show(5)

In [None]:
# ✅ Exemplo de join (ajuste conforme suas colunas)
# Supondo que ambas tenham uma coluna 'linha_id'
joined_df = posicao_df.join(linhas_df, on="linha_id", how="inner")

print("Resultado do join:")
joined_df.show(5)

# ✅ Salva no bucket MinIO (pasta silver)
joined_df.write.mode("overwrite").parquet("s3a://sptrans-data/silver/linhas_posicao")

print("✅ Dados salvos em s3a://sptrans-data/silver/linhas_posicao")

In [None]:
from pyspark.sql import functions as F

In [None]:
# ✅ 1. Selecionar colunas e renomear

linhas_df.select(
    F.col("cl").alias("codigo_linha"),
    F.col("tp").alias("terminal_partida"),
    F.col("ts").alias("terminal_saida")
).show(5)

In [None]:
# ✅ 2. Criar coluna derivada

linhas_df.withColumn(
    "rota",
    F.concat_ws(" -> ", F.col("tp"), F.col("ts"))
).show(5)

In [None]:
# ✅ 3. Filtrar dados
linhas_df.filter(F.col("tp") == "PÇA. DA SÉ").show(5)

In [None]:
# ✅ 4. Contar registros por grupo

posicao_df.groupBy("codigo_linha").agg(
    F.count("*").alias("total_posicoes")
).orderBy(F.desc("total_posicoes")).show(10)

In [None]:
## ✅ 5. Adicionar coluna com data formatada

posicao_df.withColumn(
    "hora",
    F.date_format(F.col("hr_atualizacao"), "HH:mm:ss")
).show(5)

In [None]:
# ✅ 6. Join com transformação

joined_df = posicao_df.join(linhas_df.withColumnRenamed("lt", "codigo_linha"), "codigo_linha", "inner")

joined_df.withColumn(
    "coord",
    F.concat_ws(", ", F.col("latitude"), F.col("longitude"))
).select("codigo_linha", "tp", "ts", "coord").show(10)