In [1]:
import os, json, sys
from typing import List
from fastapi import FastAPI
from pydantic import BaseModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml import PipelineModel

os.environ["PYSPARK_PYTHON"]        = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
os.environ["HADOOP_HOME"] = r"C:\hadoop"  
os.environ["PATH"]        = os.environ["HADOOP_HOME"] + r"\bin;" + os.environ["PATH"]
os.environ["AWS_ACCESS_KEY_ID"]     = "AKIA2O2UYWBHSGHSH6WD"
os.environ["AWS_SECRET_ACCESS_KEY"] = "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"
os.environ["AWS_DEFAULT_REGION"]    = "eu-north-1"
# — SparkSession & S3A config —
spark = (SparkSession.builder
    .appName("FlightDelayService")
    .master("local[*]")
    .config("spark.pyspark.python",        sys.executable)
    .config("spark.pyspark.driver.python", sys.executable)
    .config("spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.499"
    )
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.access.key",    os.environ["AWS_ACCESS_KEY_ID"])
    .config("spark.hadoop.fs.s3a.secret.key",    os.environ["AWS_SECRET_ACCESS_KEY"])
    .config("spark.hadoop.fs.s3a.endpoint",      "s3.eu-north-1.amazonaws.com")
    .config("spark.hadoop.fs.s3a.path.style.access","true")
    .getOrCreate()
)

In [2]:
clf_model  = PipelineModel.load("s3a://ceng476/models/model_classification_v1")
norm_model = PipelineModel.load("s3a://ceng476/models/model_normal_v1")
ext_model  = PipelineModel.load("s3a://ceng476/models/model_extreme_v1")

In [3]:
app = FastAPI(title="Flight Delay Prediction API")

class FlightEvent(BaseModel):
    year: int
    month: int
    carrier: str
    carrier_name: str
    airport: str
    airport_name: str
    arr_flights: float
    arr_del15: float
    carrier_ct: float
    weather_ct: float
    nas_ct: float
    security_ct: float
    late_aircraft_ct: float
    arr_cancelled: float
    arr_diverted: float
    arr_delay: float
    carrier_delay: float
    weather_delay: float
    nas_delay: float
    security_delay: float
    late_aircraft_delay: float


In [4]:
@app.post("/predict")
def predict(events: List[FlightEvent]):
    # 1) JSON → Spark DataFrame
    rows = [e.dict() for e in events]
    df = spark.createDataFrame(rows)

    # 2) ETL aynen notebook’taki gibi
    df2 = (df
        .withColumn("is_summer",        when((col("month")>=6)&(col("month")<=8),1).otherwise(0))
        .withColumn("is_winter",        when((col("month")==12)|(col("month")<=2),1).otherwise(0))
        .withColumn("is_holiday_month", when((col("month")==12)|(col("month")==7),1).otherwise(0))
        .withColumn("delay_ratio",      when(col("arr_flights")>0, col("arr_del15")/col("arr_flights")).otherwise(0))
        .withColumn("avg_delay_per_flight",
            when(col("arr_flights")>0, col("arr_delay")/col("arr_flights")).otherwise(0))
    )

    # 3) Sınıflandırma ve regresyon
    step1       = clf_model.transform(df2)
    norm_df     = step1.filter(col("prediction")==0).drop("prediction")
    extreme_df  = step1.filter(col("prediction")==1).drop("prediction")
    norm_preds  = norm_model.transform(norm_df)
    ext_preds   = ext_model.transform(extreme_df)
    combined    = norm_preds.unionByName(ext_preds, allowMissingColumns=True) \
                             .withColumnRenamed("prediction","arr_delay_pred")

    # 4) JSON’a çevirip döndür
    result = combined.select("arr_delay_pred").toPandas()["arr_delay_pred"].tolist()
    return {"predictions": result}