In [1]:
#  Step 1: Import Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, explode, desc

#  Step 2: Create Spark Session
spark = SparkSession.builder \
    .appName("Flight Telemetry Analysis") \
    .master("local[*]") \
    .getOrCreate()


In [2]:
#  Step 3: Define File Paths
small_telemetry_path = "D:/Project/Spark_project/combined_file_small.txt"
large_telemetry_path = "D:/Project/Spark_project/combined_file_large.txt"
aircraft_info_path = "D:/GitHub/Apache_spark_/spark/basic-ac-db.json"


In [3]:
#  Step 4: Load JSON Files
df_flight = spark.read.option("multiline", "true").json(small_telemetry_path)
df_aircraft = spark.read.option("multiline", "true").json(aircraft_info_path)

# Show schema
df_flight.printSchema()
df_aircraft.printSchema()


root
 |-- dt: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- alt_baro: long (nullable = true)
 |    |-- alt_geom: long (nullable = true)
 |    |-- baro_rate: long (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- emergency: string (nullable = true)
 |    |-- flight: string (nullable = true)
 |    |-- gs: double (nullable = true)
 |    |-- gva: long (nullable = true)
 |    |-- hex: string (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |    |-- messages: long (nullable = true)
 |    |-- mlat: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- nac_p: long (nullable = true)
 |    |-- nac_v: long (nullable = true)
 |    |-- nav_altitude_mcp: long (nullable = true)
 |    |-- nav_heading: double (nullable = true)
 |    |-- nav_qnh: double (nullable = true)
 |    |-- nic: long (nullable = true)
 |    |-- nic_baro: long (nullable = true)
 |    |-- rc: long (n

In [4]:
df_flight.columns


['dt', 'payload', 'type']

In [5]:
df_flight.printSchema()


root
 |-- dt: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- alt_baro: long (nullable = true)
 |    |-- alt_geom: long (nullable = true)
 |    |-- baro_rate: long (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- emergency: string (nullable = true)
 |    |-- flight: string (nullable = true)
 |    |-- gs: double (nullable = true)
 |    |-- gva: long (nullable = true)
 |    |-- hex: string (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |    |-- messages: long (nullable = true)
 |    |-- mlat: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- nac_p: long (nullable = true)
 |    |-- nac_v: long (nullable = true)
 |    |-- nav_altitude_mcp: long (nullable = true)
 |    |-- nav_heading: double (nullable = true)
 |    |-- nav_qnh: double (nullable = true)
 |    |-- nic: long (nullable = true)
 |    |-- nic_baro: long (nullable = true)
 |    |-- rc: long (n

In [6]:
# Load full telemetry and extract all payload fields in Spark
df_spark_small = spark.read.json(small_telemetry_path)

df_spark_small_flat = df_spark_small.select(
    col("dt").alias("timestamp"),
    col("payload.hex").alias("hex"),
    col("payload.alt_baro").alias("alt_baro"),
    col("payload.alt_geom").alias("alt_geom"),
    col("payload.gs").alias("ground_speed"),
    col("payload.track").alias("track"),
    col("payload.baro_rate").alias("baro_rate"),
    col("payload.squawk").alias("squawk"),
    col("payload.emergency").alias("emergency"),
    col("payload.category").alias("category"),
    col("payload.nav_qnh").alias("nav_qnh"),
    col("payload.nav_altitude_mcp").alias("nav_altitude_mcp"),
    col("payload.nav_heading").alias("nav_heading"),
    col("payload.lat").alias("lat"),
    col("payload.lon").alias("lon"),
    col("payload.nic").alias("nic"),
    col("payload.rc").alias("rc"),
    col("payload.seen_pos").alias("seen_pos"),
    col("payload.version").alias("version"),
    col("payload.nic_baro").alias("nic_baro"),
    col("payload.nac_p").alias("nac_p"),
    col("payload.nac_v").alias("nac_v"),
    col("payload.sil").alias("sil"),
    col("payload.sil_type").alias("sil_type"),
    col("payload.gva").alias("gva"),
    col("payload.sda").alias("sda"),
    col("payload.mlat").alias("mlat"),
    col("payload.tisb").alias("tisb"),
    col("payload.messages").alias("messages"),
    col("payload.seen").alias("seen"),
    col("payload.rssi").alias("rssi"),
    col("payload.flight").alias("flight")
)

df_spark_small_flat.show(5)


+--------------------+------+--------+--------+------------+-----+---------+------+---------+--------+-------+----------------+-----------+---------+----------+---+---+--------+-------+--------+-----+-----+---+--------+---+---+----+----+--------+----+-----+--------+
|           timestamp|   hex|alt_baro|alt_geom|ground_speed|track|baro_rate|squawk|emergency|category|nav_qnh|nav_altitude_mcp|nav_heading|      lat|       lon|nic| rc|seen_pos|version|nic_baro|nac_p|nac_v|sil|sil_type|gva|sda|mlat|tisb|messages|seen| rssi|  flight|
+--------------------+------+--------+--------+------------+-----+---------+------+---------+--------+-------+----------------+-----------+---------+----------+---+---+--------+-------+--------+-----+-----+---+--------+---+---+----+----+--------+----+-----+--------+
|2025-01-31 00:00:...|ab35d3|   37000|   36475|       552.7| 41.3|        0|  5740|     none|      A5| 1013.6|           36992|        0.0|44.218048|-75.741316|  8|186|     0.4|      2|       1|    9

In [7]:
df_spark_small_flat.count()


1829647

In [8]:
# Query 1 :  

df_spark_small_flat.select("hex", "alt_baro", "ground_speed", "track", "lat", "lon") \
                   .show(5, truncate=False)


+------+--------+------------+-----+---------+----------+
|hex   |alt_baro|ground_speed|track|lat      |lon       |
+------+--------+------------+-----+---------+----------+
|ab35d3|37000   |552.7       |41.3 |44.218048|-75.741316|
|c03f37|23925   |352.3       |22.7 |44.900757|-75.38511 |
|c06a75|24725   |482.7       |47.5 |44.762677|-75.105152|
|c078ba|15175   |420.6       |48.1 |45.025955|-74.698661|
|c027da|30000   |343.8       |229.1|44.001297|-75.327695|
+------+--------+------------+-----+---------+----------+
only showing top 5 rows



In [9]:
# Query 2 : Filter rows where altitude > 30000
df_spark_small_flat.filter(col("alt_baro") > 30000) \
                   .select("hex", "alt_baro", "ground_speed", "lat", "lon") \
                   .show(5, truncate=False)


+------+--------+------------+---------+----------+
|hex   |alt_baro|ground_speed|lat      |lon       |
+------+--------+------------+---------+----------+
|ab35d3|37000   |552.7       |44.218048|-75.741316|
|ab35d3|37000   |552.7       |44.220064|-75.738875|
|c02fe7|34750   |586.9       |NULL     |NULL      |
|0c20a8|34475   |451.9       |43.909653|-74.10157 |
|ab35d3|37000   |552.7       |44.222076|-75.736411|
+------+--------+------------+---------+----------+
only showing top 5 rows



In [10]:
# Query 3 : Count Number of records per aircraft
df_spark_small_flat.groupBy("hex") \
                   .count() \
                   .orderBy(desc("count")) \
                   .show(5, truncate=False)





+------+-----+
|hex   |count|
+------+-----+
|c05efe|12062|
|c0481d|11536|
|a96805|11474|
|c08497|11169|
|c060b7|10633|
+------+-----+
only showing top 5 rows



In [11]:
# Query 4 : Average ground speed per aircraft
df_spark_small_flat.groupBy("hex") \
                   .agg(avg("ground_speed").alias("avg_ground_speed")) \
                   .orderBy(desc("avg_ground_speed")) \
                   .show(5, truncate=False)

+------+-----------------+
|hex   |avg_ground_speed |
+------+-----------------+
|3c458c|668.5544959128063|
|c0610d|667.7281179138303|
|406e0f|662.2459107806709|
|89644c|656.7955223880596|
|aac366|651.2352941176473|
+------+-----------------+
only showing top 5 rows



In [12]:
# Query 5: Distinct Squawk Codes
df_spark_small_flat.select("squawk").distinct().show(10)


+------+
|squawk|
+------+
|  0671|
|  3015|
|  3441|
|  5316|
|  6732|
|  6252|
|  4276|
|  7013|
|  6264|
|  1043|
+------+
only showing top 10 rows



In [13]:
# Query to add a columnn for speed in knots

from pyspark.sql.functions import round

df_spark_small_flat.withColumn("speed_knots", round(col("ground_speed") * 0.539957, 2)) \
    .select("hex", "ground_speed", "speed_knots") \
    .show(5)


+------+------------+-----------+
|   hex|ground_speed|speed_knots|
+------+------------+-----------+
|ab35d3|       552.7|     298.43|
|c03f37|       352.3|     190.23|
|c06a75|       482.7|     260.64|
|c078ba|       420.6|     227.11|
|c027da|       343.8|     185.64|
+------+------------+-----------+
only showing top 5 rows



In [None]:
# Query to fiter out rows missing GPS (Null values in lat/lon)
# Filter out rows with missing GPS data

df_spark_small_flat.filter(col("lat").isNotNull() & col("lon").isNotNull()) \
    .select("hex", "lat", "lon") \
    .show(5, truncate=False)


+------+---------+----------+
|hex   |lat      |lon       |
+------+---------+----------+
|ab35d3|44.218048|-75.741316|
|c03f37|44.900757|-75.38511 |
|c06a75|44.762677|-75.105152|
|c078ba|45.025955|-74.698661|
|c027da|44.001297|-75.327695|
+------+---------+----------+
only showing top 5 rows



In [23]:
# Create cleaned join keys (optional but recommended)
from pyspark.sql.functions import upper, trim



# Perform INNER JOIN to keep only matches
df_matched = df_spark_small_flat.join(
    df_aircraft,
    df_spark_small_flat.hex == df_aircraft.icao,
    how="left"  # Use "left" to keep all telemetry data
)

# Show result
df_matched.select("hex", "alt_baro", "manufacturer", "model", "reg") \
          .show(5, truncate=False)


+------+--------+------------+-----+----+
|hex   |alt_baro|manufacturer|model|reg |
+------+--------+------------+-----+----+
|c03f37|23925   |NULL        |NULL |NULL|
|c06a75|24725   |NULL        |NULL |NULL|
|c078ba|15175   |NULL        |NULL |NULL|
|ab35d3|37000   |NULL        |NULL |NULL|
|ab35d3|37000   |NULL        |NULL |NULL|
+------+--------+------------+-----+----+
only showing top 5 rows

