In [None]:
pip install numpy pandas seaborn matplotlib plotly SQLAlchemy psycopg2-binary

In [None]:
#Import related libaries
import numpy as np
import pandas as pd
import os
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit



In [None]:

spark = SparkSession.builder \
    .appName("PySpark PostgreSQL and Cassandra Example") \
    .master("local[*]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.auth.username", "admin") \
    .config("spark.cassandra.auth.password", "admin") \
    .getOrCreate()

In [7]:
df_postgres = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/demo") \
    .option("dbtable", "bos_air_traffic") \
    .option("user", "postgres") \
    .option("password", "1234qwer") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df_postgres.show()



+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+
|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|
+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+
|4b1881|SWR52   |   Switzerland|   1713398182|  1713398182| -71.0071| 42.3607|       -68.58|    false|   70.63|     20.02|        -0.65|   NULL|      -15.24|  NULL|false|              0|
|a0311d|DAL2420 | United States|   1713398182|  1713398182| -71.0167| 42.3573|         NULL|     true|    2.83|    236.25|         NULL|   NULL|        NULL|  NULL|false|              0|
|49530a|TAP215  |      Portugal|   1713397890|  1713397890| -71.0

In [6]:
df_cassandra = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="flight_data", keyspace="demo") \
    .load()

# Show the DataFrame to verify the content
df_cassandra.show()

+------+------------+-------------+--------+------------+--------+---------+---------+--------------+---------------+-------+-----+------+-------------+----------+--------+-------------+
|icao24|last_contact|baro_altitude|callsign|geo_altitude|latitude|longitude|on_ground|origin_country|position_source|sensors|  spi|squawk|time_position|true_track|velocity|vertical_rate|
+------+------------+-------------+--------+------------+--------+---------+---------+--------------+---------------+-------+-----+------+-------------+----------+--------+-------------+
|ade9c0|  1713400710|         NULL|JBU561  |        NULL|  42.369| -71.0166|     true| United States|              0|       |false|      |   1713400710|     33.75|    1.29|         NULL|
|ade9c0|  1713400977|         NULL|JBU561  |        NULL| 42.3701| -71.0157|     true| United States|              0|       |false|      |   1713400977|     28.12|    3.34|         NULL|
|ade9c0|  1713401036|         NULL|JBU561  |        NULL| 42.3696

In [14]:

from pyspark.sql.functions import col, row_number, length
from pyspark.sql.window import Window
from pyspark.sql.functions import lit

df_postgres_without_id = df_postgres.withColumn("Id", lit(None))

df_combined = df_postgres_without_id.unionByName(df_cassandra_with_id)

# Assume the Spark session and DataFrame definitions are already set up
# Define the window specification
windowSpec = Window.partitionBy("callsign").orderBy(col("last_contact").desc())

# Apply the window function
df_combined_with_row_number = df_combined.withColumn("row_num", row_number().over(windowSpec))

# Filter to get only the latest records, where 'callsign' is not null or empty, and select required columns
df_latest = df_combined_with_row_number.filter(
    (col("row_num") == 1) & 
    col("callsign").isNotNull() &  # Correct usage
    (length(col("callsign")) > 0)
).select(
    "callsign", "longitude", "latitude", "on_ground", "squawk"
)


df_latest.show()

+--------+---------+--------+---------+------+
|callsign|longitude|latitude|on_ground|squawk|
+--------+---------+--------+---------+------+
|AAL2148 | -71.0186| 42.3602|     true|2450.0|
|ASA311  | -71.0172| 42.3636|     true|  NULL|
|DAL1707 | -71.0245| 42.3639|     true|5751.0|
|DAL2420 | -71.0167| 42.3573|     true|  NULL|
|DAL385  | -70.9707| 42.3635|    false|  NULL|
|DAL937  | -71.0023| 42.3746|     true|2070.0|
|EJA377  | -71.0217| 42.3742|     true|2234.0|
|FDX3714 | -70.9791| 42.3796|    false|4075.0|
|JBU1246 | -71.0147| 42.3662|     true|3042.0|
|JBU1303 |  -71.016| 42.3676|     true|  NULL|
|JBU1559 | -71.0141| 42.3654|     true|1535.0|
|JBU1734 | -71.0143|  42.368|     true|3220.0|
|JBU2017 | -71.0137| 42.3661|     true|  NULL|
|JBU2052 | -71.0505|  42.271|    false|7371.0|
|JBU2185 | -71.0142| 42.3674|     true|  NULL|
|JBU2230 | -70.9704| 42.2293|    false|3440.0|
|JBU2754 | -71.0139| 42.3664|     true|6516.0|
|JBU33   | -71.0178| 42.3681|     true|  NULL|
|JBU561  | -7

In [None]:

df_latest.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/demo") \
    .option("dbtable", "bos_flight_data") \
    .option("user", "postgres") \
    .option("password", "1234qwer") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [16]:
df_latest.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append')  \
    .option("keyspace", "demo") \
    .option("table", "latest_flight_data") \
    .save()