# Stream dos dados da camada Raw para Trusted

In [1]:
import pytz
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import explode, col

In [2]:
# Inicializa a sessão Spark com suporte para streaming
spark = SparkSession.builder.appName("RawToTrustedStreaming").getOrCreate()

# Configura o acesso ao MinIO
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "datalake")
hadoop_conf.set("fs.s3a.secret.key", "datalake")
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

In [3]:
# Define o caminho de entrada e saída com base na data
fuso_horario_brasilia = pytz.timezone('America/Sao_Paulo')
brasilia_time = datetime.now(fuso_horario_brasilia)
today = brasilia_time.strftime('%Y-%m-%d')

raw_path = f"s3a://raw/busdata/{today}"
routes_trusted_path = f"s3a://trusted/busroutes/{today}/"
positions_trusted_path = f"s3a://trusted/buspositions/{today}/"

In [4]:
# Define o schema dos dados para evitar inferência em streaming
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType

schema = StructType([
    StructField("l", ArrayType(StructType([
        StructField("cl", StringType()),
        StructField("sl", StringType()),
        StructField("c", StringType()),
        StructField("lt0", StringType()),
        StructField("lt1", StringType()),
        StructField("qv", StringType()),
        StructField("vs", ArrayType(StructType([
            StructField("p", StringType()),
            StructField("ta", StringType()),
            StructField("py", DoubleType()),
            StructField("px", DoubleType())
        ])))
    ])))
])

In [5]:
# Leitura inicial do primeiro arquivo para gravar o df_bus_lines apenas uma vez
initial_file = spark.read.schema(schema).json(raw_path).limit(1)
df_lines = initial_file.select(explode(col("l")).alias("linha"))
df_bus_lines = df_lines.select(
    col("linha.cl").alias("codigo_trajeto"),
    col("linha.sl").alias("sentido"),
    col("linha.c").alias("letreiro"),
    col("linha.lt0").alias("terminal_primario"),
    col("linha.lt1").alias("terminal_secundario"),
    col("linha.qv").alias("qnt_veiculos")
)

# Salva o df_bus_lines apenas uma vez no MinIO
df_bus_lines.write.mode("overwrite").json(routes_trusted_path)

In [6]:
# Leitura dos arquivos no modo de streaming apenas para o processamento das posições dos veículos
df_raw_stream = spark.readStream.schema(schema).json(raw_path)

# Processamento das posições dos veículos em modo de streaming
df_lines_stream = df_raw_stream.select(explode(col("l")).alias("linha"))
df_vehicles = df_lines_stream.select(
    col("linha.cl").alias("codigo_trajeto"),
    col("linha.sl").alias("sentido"),
    explode(col("linha.vs")).alias("vehicle")
)

df_vehicles_position = df_vehicles.select(
    col("codigo_trajeto"),
    col("sentido"),
    col("vehicle.p").alias("prefixo_veiculo"),
    col("vehicle.py").alias("latitude"),
    col("vehicle.px").alias("longitude"),
    col("vehicle.ta").alias("horario_posicao"),
)

# Grava as posições dos veículos no caminho de saída no modo de streaming
df_vehicles_position.writeStream \
    .format("json") \
    .option("path", positions_trusted_path) \
    .option("checkpointLocation", positions_trusted_path + "/_checkpoint") \
    .outputMode("append") \
    .start()

# Mantém o streaming ativo
print('Stream em Execução')
spark.streams.awaitAnyTermination()


Stream em Execução


StreamingQueryException: Query [id = 1cc83bd6-a993-4909-a672-2d6184a75fe7, runId = ec10596b-7935-46a5-8b29-62fa4f1fdb62] terminated with exception: No such file or directory: s3a://raw/busdata/2024-11-04/Busdata_1328.json

In [7]:
spark.stop()