In [None]:
pip install psycopg2-binary

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, LongType
import psycopg2

spark = SparkSession.builder.appName("FlightStream").getOrCreate()

schema = StructType() \
    .add("flight_id", StringType()) \
    .add("origin", StringType()) \
    .add("destination", StringType()) \
    .add("status", StringType()) \
    .add("departure_time", LongType()) \
    .add("arrival_time", LongType())

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("subscribe", "flights") \
    .option("startingOffsets", "latest") \
    .load()

flights = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

def foreach_batch(df, epoch_id):
    conn = psycopg2.connect(
        dbname="flights_project", user="admin", password="admin", host="postgres_general"
    )
    cur = conn.cursor()
    for row in df.collect():
        cur.execute("""
            INSERT INTO flights (flight_id, origin, destination, status, departure_time, arrival_time)
            VALUES (%s,%s,%s,%s,%s,%s)
            ON CONFLICT (flight_id) DO UPDATE
            SET status = EXCLUDED.status, arrival_time = EXCLUDED.arrival_time;
        """, (row.flight_id, str(row.origin), str(row.destination), row.status, row.departure_time, row.arrival_time))
    conn.commit()
    cur.close()
    conn.close()

query = flights.writeStream.foreachBatch(foreach_batch).start()
query.awaitTermination()
