In [37]:
from FlightRadar24.api import FlightRadar24API

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *

In [38]:
fr_api = FlightRadar24API()
#spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.master("local[8]").getOrCreate()

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
def extract_zones_toDF():
    def process_zones(tmp, result, root):
        for el in tmp:
            result.append([str(el), str(root), float(tmp[el]['tl_y']), float(tmp[el]['tl_x']), float(tmp[el]['br_y']), float(tmp[el]['br_x'])])
            
            if 'subzones' in tmp[el]:
                process_zones(tmp[el]['subzones'], result, el)
                
    array_zones = []
    zones = fr_api.get_zones()
    process_zones(zones,array_zones, None)

    schema_zones = StructType([
    StructField("Subzone", StringType(), True),
    StructField("Zone", StringType(), True),
    StructField("br_x", FloatType(), True),
    StructField("br_y", FloatType(), True),
    StructField("tl_x", FloatType(), True),
    StructField("tl_y", FloatType(), True)
    ])
    
    df_zones = spark.createDataFrame(array_zones, schema=schema_zones)
    df_zones = df_zones.withColumn("Subzone", initcap(col('Subzone')))
    df_zones = df_zones.withColumn("Zone", initcap(col('Zone')))

    return df_zones

In [None]:
def extract_airlines_toDF():
    def convert_airlines(lst):
        for airline in lst:
            airline['Code'] = str(airline['Code'])
            airline['ICAO'] = str(airline['ICAO'])
            airline['Name'] = str(airline['Name'])
        return lst 

    schema_airlines = StructType([
        StructField("Code", StringType(), True),
        StructField("ICAO", StringType(), True),
        StructField("Name", StringType(), True)
    ])
    airlines = fr_api.get_airlines()
    airlines = convert_airlines(airlines)
    
    df_airlines = spark.createDataFrame(airlines, schema=schema_airlines)
    return df_airlines

La colonne 'alt' contient qq données comme "-1" qui sont catégorisées comme string et pas int.
La colonne 'lon' et 'lan contient qq données catégorisées comme int et pas float comme la majorité.

In [None]:
def extract_airports_toDF():
    def convert_airports(lst):
        for airport in lst:
            airport['name'] = str(airport['name'])
            airport['iata'] = str(airport['iata'])
            airport['icao'] = str(airport['icao'])
            airport['country'] = str(airport['country'])
            airport['lat'] = float(airport['lat'])
            airport['lon'] = float(airport['lon'])
            airport['alt'] = int(airport['alt'])
        return lst
        
    airports = fr_api.get_airports()
    airports_typed_columns = convert_airports(airports)

    schema_airports = StructType([
    StructField("name", StringType(), True),
    StructField("iata", StringType(), True),
    StructField("icao", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", FloatType(), True),
    StructField("lon", FloatType(), True),
    StructField("alt", IntegerType(), True)
    ])
    df_airports = spark.createDataFrame(airports_typed_columns, schema=schema_airports)

    return df_airports

In [None]:
def extract_flights_toDF():
   def convert_flights(lst):
      for flight in lst:
         flight = flight.__dict__
         flight['id'] = str(flight['id'])
         flight['icao_24bit'] = str(flight['icao_24bit'])
         flight['latitude'] = float(flight['latitude'])
         flight['longitude'] = float(flight['longitude'])
         flight['heading'] = int(flight['heading'])
         flight['altitude'] = int(flight['altitude'])
         flight['ground_speed'] = int(flight['ground_speed'])
         flight['squawk'] = str(flight['squawk'])
         flight['aircraft_code'] = str(flight['aircraft_code'])
         flight['registration'] = str(flight['registration'])
         flight['time'] = int(flight['time'])
         flight['origin_airport_iata'] = str(flight['origin_airport_iata'])
         flight['destination_airport_iata'] = str(flight['destination_airport_iata'])
         flight['number'] = str(flight['number'])
         flight['airline_iata'] = str(flight['airline_iata'])
         flight['on_ground'] = int(flight['on_ground'])
         flight['vertical_speed'] = int(flight['vertical_speed'])
         flight['callsign'] = str(flight['callsign'])
         flight['airline_icao'] = str(flight['airline_icao'])
         
         details = fr_api.get_flight_details(flight['id'])
         
         if type(details) is dict and isinstance(details['time']['real']['departure'], type(None)) != True \
            and isinstance(details['time']['real']['arrival'], type(None)) != True :
               flight['time_depart'] = int(details['time']['real']['departure'])
               flight['time_arrive'] = int(details['time']['real']['arrival'])

      return lst

   flights = fr_api.get_flights()
   flights = convert_flights(flights)
   
   schema = StructType([
   StructField("id", StringType(), True),
   StructField("icao_24bit", StringType(), True),
   StructField("latitude", FloatType(), True),
   StructField("longitude", FloatType(), True),
   StructField("heading", IntegerType(), True),
   StructField("altitude", IntegerType(), True),
   StructField("ground_speed", IntegerType(), True),
   StructField("squawk", StringType(), True),
   StructField("aircraft_code", StringType(), True),
   StructField("registration", StringType(), True),
   StructField("time", IntegerType(), True),
   StructField("origin_airport_iata", StringType(), True),
   StructField("destination_airport_iata", StringType(), True),
   StructField("number", StringType(), True),
   StructField("airline_iata", StringType(), True),
   StructField("on_ground", IntegerType(), True),
   StructField("vertical_speed", IntegerType(), True),
   StructField("callsign", StringType(), True),
   StructField("airline_icao", StringType(), True),
   StructField("time_depart", IntegerType(), True),
   StructField("time_arrive", IntegerType(), True)
   ])

   df_flights = spark.createDataFrame(flights, schema=schema)
   return df_flights

Chercher pour anomalies sur df_airlines

In [None]:
def check_column_quality(df, columnName ):
    vals = df.select(columnName).count()
    distinct_vals = df.select(columnName).distinct().count()
    null_vals = df.filter((df[columnName] == "") | (df[columnName] == "N/A") | (df[columnName] == None) ).count()
    print(f"Column {(columnName)} values : {(vals)}")
    print(f"Column {(columnName)} distinct values : {(distinct_vals)}")
    print(f"Column {(columnName)} null values : {(null_vals)}\n")

In [None]:
def check_df_quality(df, df_name):
    vals = df.count() 
    distinct_vals = df.distinct().count() 
    print(f"Dataframe {(df_name)}  values : {(vals)}")
    print(f"Dataframe {(df_name)} distinct values : {(distinct_vals)}\n")
    
    for col in df.dtypes:
        check_column_quality(df, col[0])
    print("------------------------------------------------")

In [None]:
df_flights = extract_flights_toDF()
df_airports = extract_airports_toDF()
df_airlines = extract_airlines_toDF()
df_zones = extract_zones_toDF()

In [None]:
df_flights.cache()
df_airports.cache()
df_airlines.cache()

#df_flights.show(5,False)

DataFrame[Code: string, ICAO: string, Name: string]

In [36]:
df_flights.show()


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    rai

Py4JError: An error occurred while calling o672.showString

In [16]:
import datetime

today = datetime.datetime.today()
cleaned_timestamp = str(today).translate(str.maketrans("", "", ".-: "))

In [17]:
def save_hdfs(df, path, file_prefix):
    df.write.format("csv").option("header", "true").mode("overwrite").save(path+"/tech_year=" + str(today.year) + "/" +
                "tech_month=" + str(today.year) + '-' + str(today.month) + "/" +
                 "tech_day=" + str(today.year) + '-' + str(today.month) + "-" + str(today.day) + "/" +
                 file_prefix + cleaned_timestamp + ".csv"
                )


In [18]:
check_df_quality(df_airlines, "Airlines")
check_df_quality(df_airports, "Airports")
check_df_quality(df_flights, "Flights")

                                                                                

Dataframe Airlines  values : 1988
Dataframe Airlines distinct values : 1988

Column Code values : 1988
Column Code distinct values : 937
Column Code null values : 950

Column ICAO values : 1988
Column ICAO distinct values : 1988
Column ICAO null values : 0

Column Name values : 1988
Column Name distinct values : 1973
Column Name null values : 0

------------------------------------------------
Dataframe Airports  values : 5094
Dataframe Airports distinct values : 5094

Column name values : 5094
Column name distinct values : 5076
Column name null values : 0



                                                                                

Column iata values : 5094
Column iata distinct values : 5094
Column iata null values : 0

Column icao values : 5094
Column icao distinct values : 5092
Column icao null values : 0

Column country values : 5094
Column country distinct values : 228
Column country null values : 0

Column lat values : 5094
Column lat distinct values : 5078
Column lat null values : 0

Column lon values : 5094
Column lon distinct values : 5086
Column lon null values : 0

Column alt values : 5094
Column alt distinct values : 2044
Column alt null values : 0

------------------------------------------------
Dataframe Flights  values : 1500
Dataframe Flights distinct values : 1500

Column id values : 1500
Column id distinct values : 1500
Column id null values : 0

Column icao_24bit values : 1500
Column icao_24bit distinct values : 1482
Column icao_24bit null values : 19

Column latitude values : 1500
Column latitude distinct values : 1498
Column latitude null values : 0

Column longitude values : 1500
Column long

In [19]:
save_hdfs(df_flights, "Flights/rawzone", "flights")
save_hdfs(df_airports, "Airports/rawzone", "airports")
save_hdfs(df_airlines, "Airlines/rawzone", "airlines")

                                                                                

### Tache nr 1: La compagnie avec le + de vols en cours

In [20]:
df_flights = df_flights.filter((df_flights.airline_icao != 'N/A') & (df_flights.on_ground == 0))

airlines = df_flights.groupby(['airline_icao']) \
    .count() \
    .sort('count', ascending=False)

df_airlines.filter(df_airlines.ICAO == airlines.first()['airline_icao']).show()


+----+----+---------------+
|Code|ICAO|           Name|
+----+----+---------------+
|  UA| UAL|United Airlines|
+----+----+---------------+



In [21]:
save_hdfs(airlines, "Airlines/processedzone", "airlines_most_flights")
airlines.createOrReplaceTempView("airlines_most_flights")

### Tache nr 2: Pour chaque continent, la compagnie avec le + de vols régionaux actifs (continent d'origine == continent de destination)

In [22]:
df_airports_continents = df_airports.select("name", "country", "iata", "icao" ).withColumn("continent", \
    when(df_airports.icao.like('A%'), 'Oceania')
    .when(df_airports.icao.like('B%'), 'Europe')
    .when(df_airports.icao.like('C%'), 'America')
    .when(df_airports.icao.like('D%'), 'Africa')
    .when(df_airports.icao.like('E%'), 'Europe')
    .when(df_airports.icao.like('F%'), 'Africa')
    .when(df_airports.icao.like('G%'), 'Africa')
    .when(df_airports.icao.like('H%'), 'Africa')
    .when(df_airports.icao.like('K%'), 'America')
    .when(df_airports.icao.like('L%'), 'Europe')
    .when(df_airports.icao.like('M%'), 'America')
    .when(df_airports.icao.like('N%'), 'Oceania')
    .when(df_airports.icao.like('O%'), 'Asia')
    .when(df_airports.icao.like('P%'), 'Oceania')
    .when(df_airports.icao.like('R%'), 'Asia')
    .when(df_airports.icao.like('S%'), 'America')
    .when(df_airports.icao.like('T%'), 'America')
    .when(df_airports.icao.like('U%'), 'Asia')
    .when(df_airports.icao.like('V%'), 'Asia')
    .when(df_airports.icao.like('W%'), 'Asia')
    .when(df_airports.icao.like('Y%'), 'Oceania')
    .when(df_airports.icao.like('Z%'), 'Asia'))#.show()

In [23]:
df_airports_continents.createOrReplaceTempView("airports_continents")
save_hdfs(df_airports_continents, "Airports/processedzone", "airports_continents")

                                                                                

In [24]:
df_flights_filtered = df_flights.filter((df_flights.origin_airport_iata != 'N/A') & (df_flights.on_ground == 0) \
                                                    & (df_flights.destination_airport_iata != 'N/A') )

In [25]:
filtered_flights_origin = df_flights_filtered.join(df_airports_continents, df_flights.origin_airport_iata ==  df_airports_continents.iata,"inner")
filtered_flights_dest = df_flights_filtered.join(df_airports_continents, df_flights.destination_airport_iata ==  df_airports_continents.iata,"inner")
filtered_flights_origin_compact = filtered_flights_origin.select("id", "origin_airport_iata", "name", "country", "continent", "airline_iata").alias("origin")
filtered_flights_dest_compact = filtered_flights_dest.select("id", "destination_airport_iata", "name", "country", "continent", "airline_iata").alias("dest")

In [26]:
filtered_flights_origin_compact.createOrReplaceTempView("flights_origin_continent")
filtered_flights_dest_compact.createOrReplaceTempView("flights_destination_continent")
save_hdfs(filtered_flights_origin_compact, "Flights/processedzone", "flights_origin_continent")
save_hdfs(filtered_flights_dest_compact, "Flights/processedzone", "flights_dest_continent")

                                                                                

In [27]:
joined_flights = filtered_flights_origin_compact.join(filtered_flights_dest_compact,  
            filtered_flights_origin_compact.id ==  filtered_flights_dest_compact.id,"inner") \
            .where(col("origin.continent") == col("dest.continent"))

joined_flights.createOrReplaceTempView("flights_joined_continent")

In [28]:
groupe = joined_flights.groupby([ 'origin.airline_iata']).count().withColumnRenamed("count", "nb_flights").sort('count', ascending=False)

In [29]:
airlines_most_flights = df_airlines.join(groupe, groupe.airline_iata == df_airlines.Code).sort('nb_flights', ascending=False).drop("Code")
save_hdfs(airlines_most_flights, "Airlines/processedzone", "airlines_most_flights")

### Tache nr 3: Le vol en cours avec le trajet le plus long

In [30]:
df_flights_flt = df_flights.select("id","latitude", "longitude", "origin_airport_iata", "destination_airport_iata", "on_ground", \
    "aircraft_code", "registration") \
    .filter((df_flights.origin_airport_iata != 'N/A') & (df_flights.on_ground == 0) & (df_flights.destination_airport_iata != 'N/A'))

df_flights_flt_origin = df_flights_flt.join(df_airports, df_flights_flt.origin_airport_iata ==  df_airports.iata,"inner") \
    .select("id", "origin_airport_iata", "name", "country", "lat", "lon",  "aircraft_code", "registration")\
        .withColumnRenamed("lat","lat_org").withColumnRenamed("lon","lon_org").withColumnRenamed( "name","name_org").withColumnRenamed("country","country_org")
df_flights_flt_dest = df_flights_flt.join(df_airports, df_flights_flt.destination_airport_iata ==  df_airports.iata,"inner") \
    .select("id", "destination_airport_iata", "name", "country", "lat", "lon")\
        .withColumnRenamed("lat","lat_dest").withColumnRenamed("lon","lon_dest").withColumnRenamed( "name","name_dest").withColumnRenamed("country","country_dest")

df_flights_flt_joined = df_flights_flt_origin.join(df_flights_flt_dest, df_flights_flt_origin.id ==  df_flights_flt_dest.id,"inner").drop("id")


In [32]:
from scipy.spatial import distance
distance_udf = udf(lambda lat, lon: float(distance.euclidean(lat, lon)), FloatType())

df_flights_flt_res = df_flights_flt_joined.withColumn('Length', distance_udf(array(col("lat_org"), col("lon_org")), \
     array(col("lat_dest"), col("lon_dest")))) \
    .orderBy(col('Length').desc())

#df_flights_flt_res.show(5, False)
save_hdfs(df_flights_flt_res, "Flights/processedzone", "current_farest_flight")

                                                                                

### Tache nr 4: Pour chaque continent, la longueur de vol moyenne

In [33]:
df_processed_airports = df_airports_continents.drop("alt", "country")

df_flights_filtered_cleaned = df_flights_filtered.drop("squawk", "callsign")\
                            .filter((df_flights.origin_airport_iata != 'N/A') & (df_flights.destination_airport_iata != 'N/A') )

filtered_flights_origin = df_flights_filtered_cleaned.join(df_processed_airports, df_flights.origin_airport_iata ==  df_processed_airports.iata,"inner")
filtered_flights_origin = filtered_flights_origin.alias("df_origin").withColumnRenamed("name","name_airport_origin").withColumnRenamed("iata","iata_airport_origin"). \
        withColumnRenamed("continent","continent_airport_origin").withColumnRenamed("icao","icao_airport_origin"). \
            withColumnRenamed("latitude","lat_org").withColumnRenamed("longitude","lon_org") \
                 .select("id", "origin_airport_iata", "airline_iata", "on_ground", "vertical_speed", "airline_icao", "name_airport_origin", \
                    "iata_airport_origin", "icao_airport_origin", "continent_airport_origin", "lat_org", "lon_org", "ground_speed", "time_depart", "time_arrive")


filtered_flights_dest = df_flights_filtered_cleaned.join(df_processed_airports, df_flights.destination_airport_iata ==  df_processed_airports.iata,"inner")
filtered_flights_dest = filtered_flights_dest.alias("df_dest") \
    .withColumnRenamed("name","name_airport_dest").withColumnRenamed("iata","iata_airport_dest"). \
        withColumnRenamed("continent","continent_airport_dest").withColumnRenamed("icao","icao_airport_dest"). \
            withColumnRenamed("latitude","lat_dest").withColumnRenamed("longitude","lon_dest") \
                .select("id", "destination_airport_iata", "name_airport_dest", \
                    "iata_airport_dest", "icao_airport_dest", "continent_airport_dest", "lat_dest", "lon_dest")


filtered_flights = filtered_flights_origin.join(filtered_flights_dest, col("df_origin.id") ==  col("df_dest.id"),"inner") \
    .filter(col("continent_airport_dest") == col("continent_airport_origin")).drop(col("df_dest.id"))

#filtered_flights.show(2, False)

In [34]:
def get_duration_flight(df):
    return df.withColumn("duration", hour(from_unixtime(df.time_arrive - df.time_depart )))

df4 = filtered_flights.transform(get_duration_flight).filter("duration is not null").orderBy(col("duration").desc())

In [35]:
save_hdfs(df4, "Flights/processedzone", "current_longest_flight")

23/06/06 12:33:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1572513 ms exceeds timeout 120000 ms
23/06/06 12:33:36 WARN SparkContext: Killing executors is not supported by current scheduler.
23/06/06 12:33:40 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49026)
Traceback (most recent call last):
  File "/usr/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/home/gorgeous/Documents/.venv/lib/python3.8/site-packages/pyspark/accumulators.py", line 257, in accum_up