In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m755.1 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0mta [36m0:00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a392f49845313e65f24b7f9bf2a9f7dbea190e242c682bc7df14d93ad2265b3d
  Stored in directory: /home/jovyan/.cache/pip/wheels/95/13/41/f7f135ee114175605fb4f0a89e7389f3742aa6c1e1a5bcb657
Successfully built pyspark
Installing collect

In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, desc, row_number , pandas_udf , expr , udf, max
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import math

# Initialiser Spark
spark = SparkSession.builder.appName("FlightRadarAnalysis").getOrCreate()

# Charger les données
flights_df = spark.read.parquet("/home/jovyan/work/flights/*/*/*/*/*.parquet").na.replace(["N/A","NaN",""],None)
airlines_df = spark.read.parquet("/home/jovyan/work/airlines").na.replace(["N/A","NaN",""],None)
airports_df = spark.read.parquet("/home/jovyan/work/airports").na.replace(["N/A","NaN",""],None)
zones_df = spark.read.parquet("/home/jovyan/work/zones")






#### 1 La compagnie avec le + de vols en cours

In [52]:
grouped_active_flights = flights_df.filter("on_ground==0").filter(col("airline_icao").isNotNull()).groupBy("airline_icao").agg({"id": "count"}).withColumnRenamed("count(id)", "count")
max_count = grouped_active_flights.agg({"count": "max"}).collect()[0][0]
most_active_airline=grouped_active_flights.filter(col("count") == max_count)


most_active_airline.collect()

[Row(airline_icao='QTR', count=8641)]

#### 2 Compagnie avec le plus de vols régionaux actifs par continent:

In [53]:
def get_continent(latitude, longitude, zones):
    for continent, value in zones.items():
        if float(value["tl_x"]) <= float(latitude) <= float(value["br_x"]) and float(value["br_y"]) <= float(longitude) <= float(value["tl_y"]):
            return continent
    return None

zones = zones_df.rdd.map(lambda row: (row["zone"], {"tl_x": row["tl_x"], "br_x": row["br_x"], "br_y": row["br_y"], "tl_y": row["tl_y"]})).collectAsMap()

spark.udf.register("get_continent_udf", lambda latitude, longitude: get_continent(latitude, longitude, zones), StringType())




<function __main__.<lambda>(latitude, longitude)>

In [54]:
airports_with_continents = airports_df.withColumn("continent", expr("get_continent_udf(latitude, longitude)"))

flights_with_airports = flights_df.join(airports_with_continents, flights_df.origin_airport_iata == airports_with_continents.iata, "left") \
    .select(flights_df["*"], airports_with_continents["continent"].alias("origin_continent"))

flights_with_airports = flights_with_airports.join(airports_with_continents, flights_with_airports.destination_airport_iata == airports_with_continents.iata, "left") \
    .select(flights_with_airports["*"], airports_with_continents["continent"].alias("destination_continent"))

regional_flights = flights_with_airports.filter(col("origin_continent") == col("destination_continent"))
regional_flights.groupBy("origin_continent", "airline_iata").count().withColumnRenamed("count", "flight_count") \
    .withColumn("max_count", F.row_number().over(Window.partitionBy("origin_continent").orderBy(desc("flight_count")))) \
    .filter(col("max_count") == 1) \
    .show()


+----------------+------------+------------+---------+
|origin_continent|airline_iata|flight_count|max_count|
+----------------+------------+------------+---------+
|          africa|          U2|         490|        1|
+----------------+------------+------------+---------+



#### 3 .Le vol en cours avec le trajet le plus distant


In [43]:
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in kilometers
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)
    
    a = math.sin(delta_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    
    distance = R * c
    return int(distance)

haversine_udf = udf(haversine, IntegerType())




In [44]:

origin_airports = airports_df.select(col("iata").alias("origin_iata"), col("latitude").alias("origin_latitude"), col("longitude").alias("origin_longitude"))
destination_airports = airports_df.select(col("iata").alias("destination_iata"), col("latitude").alias("destination_latitude"), col("longitude").alias("destination_longitude"))

flights_with_airports_org_des = flights_df.distinct() \
    .join(origin_airports, flights_df.origin_airport_iata == origin_airports.origin_iata, "left") \
    .join(destination_airports, flights_df.destination_airport_iata == destination_airports.destination_iata, "left")

flights_with_airports_org_des = flights_with_airports_org_des.filter(
    col("origin_latitude").isNotNull() & col("origin_longitude").isNotNull() &
    col("destination_latitude").isNotNull() & col("destination_longitude").isNotNull()
)
# Add a column for the distance using the Haversine formula
flights_with_airports_distance = flights_with_airports_org_des.withColumn("distance", haversine_udf(
    col("origin_latitude").cast("double"), col("origin_longitude").cast("double"),
    col("destination_latitude").cast("double"), col("destination_longitude").cast("double")
))

flights_with_airports_distance.filter(col("on_ground")!= "0").filter(col("distance").isNotNull()).orderBy(desc("distance")).first()

Row(id='36294d7b', icao_24bit='76CCE1', latitude='40.6866', longitude='-74.174', heading='25', altitude='0', ground_speed='0', aircraft_code='A359', registration='9V-SGA', time=1721124698, origin_airport_iata='SIN', destination_airport_iata='EWR', number='SQ22', airline_iata='SQ', on_ground='1', vertical_speed='0', callsign='SIA22', airline_icao='SIA', origin_iata='SIN', origin_latitude='1.350189', origin_longitude='103.9944', destination_iata='EWR', destination_latitude='40.692501', destination_longitude='-74.168602', distance=15336)

#### 3 Le vol en cours avec le trajet le plus long

In [45]:
flights_with_airports_distance.filter(col("on_ground")!= "0").filter(col("distance").isNotNull()).orderBy(desc("time")).first()

Row(id='362b78a9', icao_24bit='7105B2', latitude='53.3533', longitude='-2.2819', heading='28', altitude='0', ground_speed='14', aircraft_code='B78X', registration='HZ-AR33', time=1721128570, origin_airport_iata='JED', destination_airport_iata='MAN', number='SV123', airline_iata='SV', on_ground='1', vertical_speed='0', callsign='SVA123', airline_icao='SVA', origin_iata='JED', origin_latitude='21.67956', origin_longitude='39.156528', destination_iata='MAN', destination_latitude='53.353741', destination_longitude='-2.27495', distance=4957)

#### Pour chaque continent, la distance du vol moyenne

In [46]:
avg_flight_length = flights_with_airports_distance.join(
    flights_with_airports,
    flights_with_airports_distance["id"] == flights_with_airports["id"],
    "inner"
).select(
    flights_with_airports_distance["*"],
    flights_with_airports["origin_continent"]
)

avg_flight_ldistance_per_continent = avg_flight_length.groupBy("origin_continent") \
    .agg(F.avg("distance").alias("avg_distance_km")) \
    .orderBy("origin_continent")

avg_flight_ldistance_per_continent.show()

+----------------+------------------+
|origin_continent|   avg_distance_km|
+----------------+------------------+
|            NULL|7587.7126084256515|
|          africa| 7372.251713344654|
|        atlantic| 7267.141355067504|
|          europe| 4369.982864403703|
|   northatlantic|            3131.0|
|    southamerica|            5465.0|
+----------------+------------------+



#### 4. Pour chaque continent, le temps du vol moyenne

In [47]:
average_flight_duration_per_continent = flights_df.withColumn("continent",expr("get_continent_udf(latitude, longitude)"))\
    .groupBy("continent").avg("time").withColumnRenamed("avg(time)", "average_time")
avg_flight_distance_per_continent.show()

+----------------+--------------------+
|origin_continent|            avg_time|
+----------------+--------------------+
|            NULL|1.7211224011026514E9|
|          africa|1.7211224006078608E9|
|        atlantic|1.7211223912878788E9|
|    southamerica|      1.7211224018E9|
+----------------+--------------------+



#### 5. L'entreprise constructeur d'avions avec le plus de vols actifs


In [23]:
most_active_aircraft = flights_df.filter("on_ground==0").groupBy("aircraft_code").agg({"id": "count"}).withColumnRenamed("count(id)", "count")\
        .orderBy(col("count").desc()).first()
print(most_active_aircraft)

Row(aircraft_code='B77W', count=841)


In [27]:
flights_df.first()

Row(id='356a8712', icao_24bit='A25CE6', latitude='43.7712', longitude='-96.1506', heading='308', altitude='65100', ground_speed='6', aircraft_code='BALL', registration='N251TH', time=1721122281, origin_airport_iata=None, destination_airport_iata=None, number=None, airline_iata=None, on_ground='0', vertical_speed='0', callsign='HBAL684', airline_icao=None)

# 6. Pour chaque pays de compagnie aérienne, le top 3 des modèles d'avion en usage


In [50]:
top_3_aircrafts_per_country = flights_df.groupBy(["airline_iata","aircraft_code"]).agg({"id": "count"}).withColumnRenamed("count(id)", "count") \
        .orderBy(F.col("count").desc())

windows=Window.partitionBy("airline_iata").orderBy(F.col("count").desc())

top_3_aircrafts_per_country=top_3_aircrafts_per_country.withColumn("row_number", F.row_number().over(windows)).filter(F.col("row_number") <= 3).drop("row_number")

top_3_aircrafts_per_country.show()

+------------+-------------+-----+
|airline_iata|aircraft_code|count|
+------------+-------------+-----+
|        NULL|         GLEX|  864|
|        NULL|         GLID|  842|
|        NULL|         BALL|  525|
|          2C|         B77L|    2|
|          2F|         C208|   19|
|          2S|         B773|   16|
|          2W|         A333|  105|
|          2W|         A359|    2|
|          3K|         A320|   51|
|          3L|         A320|   64|
|          3S|         B77L|  652|
|          3U|         A332|  105|
|          3U|         A333|   35|
|          3V|         B744|   50|
|          4G|         B737|   55|
|          4S|         B738|    9|
|          4Y|         A332|   45|
|          4Z|         E135|   14|
|          5J|         A339|   81|
|          5J|         A21N|   41|
+------------+-------------+-----+
only showing top 20 rows

