---
# Algoritmos para Big Data – Projeto
## Parte 5: Previsão de Atrasos em Streaming com Kafka

**Dataset:** Previsões em tempo real simuladas

**Autores:**
- Henrique Niza (131898)
- Paulo Francisco Pinto (128962)
- Rute Roque (128919)

In [29]:
# 1. Spark Setup
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, IntegerType

spark = SparkSession.builder \
    .appName("FlightDelayStreamingKafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()


In [30]:
# 2. Carregar modelo treinado
model_path = "../models/flight_delay_rf_model"
model = PipelineModel.load(model_path)

print(f"Modelo carregado de: {model_path}")

Modelo carregado de: ../models/flight_delay_rf_model


In [32]:
schema = StructType() \
    .add("Airline", StringType()) \
    .add("Origin", StringType()) \
    .add("Dest", StringType()) \
    .add("CRSDepTime", IntegerType()) \
    .add("Distance", DoubleType()) \
    .add("DepDelay", DoubleType()) \
    .add("Month", IntegerType()) \
    .add("DepTimeBlk", StringType()) \
    .add("DepDel15", IntegerType()) \
    .add("DayofMonth", IntegerType()) \
    .add("DayOfWeek", IntegerType())


In [33]:
# 4. Leitura do stream Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "flights") \
    .option("startingOffsets", "latest").load()

In [34]:
# 5. Parse das mensagens JSON
flights_df = kafka_stream.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

In [None]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

msg = {
    "Airline": "Delta Air Lines Inc.",
    "Origin": "ATL",
    "Dest": "JFK",
    "CRSDepTime": 830,
    "Distance": 760.0,
    "DepDelay": 5.0,
    "Month": 5,
    "DepTimeBlk": "0800-0859",
    "DepDel15": 0,
    "DayofMonth": 24,
    "DayOfWeek": 6 
}

producer.send('flights', msg)
producer.flush()
print("Mensagem enviada!")


Mensagem enviada!


In [36]:
flights_df = kafka_stream.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")


In [37]:
# 6. Aplicar o modelo treinado para gerar previsões
predicoes = model.transform(flights_df)

In [39]:
# 7. Escrever as previsões para o console (apenas para debug)
query = predicoes.select("Airline", "Origin", "Dest", "DepDelay", "prediction") \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

query.awaitTermination(300)

25/05/24 23:45:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/zn/qww_x74n5_d1b332p060685c0000gn/T/temporary-40b430f6-a6ba-4c23-b595-e02cfa5da626. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/24 23:45:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/24 23:45:34 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------+----+--------+----------+
|Airline|Origin|Dest|DepDelay|prediction|
+-------+------+----+--------+----------+
+-------+------+----+--------+----------+



False