In [None]:
from src.extract import *
from src.transform import *
from FlightRadar24.api import FlightRadar24API
from pyspark.sql import Window

In [None]:
not_cleaned_data = '/home/sadi/Bureau/exaltKata/spark_pipeline/data/without_cleaning'
cleaned_data = '/home/sadi/Bureau/exaltKata/spark_pipeline/data/clean'
flights_path = os.path.join(not_cleaned_data, 'flights.json')
airports_path = os.path.join(cleaned_data, 'airports.json')
airlines_path = os.path.join(cleaned_data, 'airlines.json')

In [None]:
fr_api = FlightRadar24API()
flights = fr_api.get_flights()
#get flights
flight_details = get_flight_details(flights, fr_api)
save_original_file(flight_details, flights_path)

In [None]:
#get airports
airports = fr_api.get_airports()
save_original_file(airports, airports_path)

In [None]:
#get airlines
airlines = fr_api.get_airlines()
save_original_file(airlines, airlines_path)

### Transform flights DataFrame


In [None]:
flights_df = load_original_data(flights_path)

In [None]:
flights_clean_df = transform_flights_df(flights_df)

In [None]:
flights_clean_df.printSchema()

##### compagny most active flights in the world

In [None]:
flights_clean_df.select('airline_short_name', 'status_icon')\
    .filter(F.col("status_icon") == "green")\
    .groupBy(F.col('airline_short_name'), F.col('status_icon'))\
    .count()\
    .orderBy(F.col('count').desc())\
    .show(5)

##### companies with most regional active flights

In [None]:
w = Window.partitionBy('airport_country').orderBy(col('count').desc())

flights_clean_df.filter((F.col('status_icon') == 'green') & F.col('sameCountry') == True)\
  .select('airline_short_name', 'status_icon',F.col('destination_airport_country_name').alias('airport_country'), 'sameCountry')\
  .groupBy('airport_country', 'airline_short_name')\
  .count()\
  .withColumn( 'max', F.row_number().over(w))\
  .filter(col("max") == 1).drop("max") \
  .show()

#### Active flight longest route

In [None]:

flights_clean_df \
    .filter(F.col("status_icon") == "green")\
    .select('aircraft_code', 'airline_short_name', 'distance - km')\
    .distinct()\
    .orderBy(col('distance - km').desc())\
    .show(1)

#### Avg distance by country

In [None]:
flights_clean_df.select('aircraft_code', 'airline_short_name', 'origin_airport_country_name', 'distance - km')\
    .groupBy('origin_airport_country_name')\
    .avg('distance - km')\
    .show()

##### Airplane model by country

In [None]:
w = Window.partitionBy('origin_airport_country_name').orderBy(col('count').desc())

flights_clean_df.select(col('origin_airport_country_name'), 'aircraft_model')\
    .groupBy('origin_airport_country_name', 'aircraft_model')\
    .count()\
    .withColumn('r_number', F.row_number().over(w))\
    .filter(col('r_number') == 1)\
    .drop(col('r_number'))\
    .show()


##### By country, most populaire airport destination

In [None]:
w = Window.partitionBy('destination_airport_country_name').orderBy(col('count').desc())

flights_clean_df.select('destination_airport_country_name', 'destination_airport_name')\
.dropna('any')\
.groupBy('destination_airport_country_name', 'destination_airport_name')\
.count()\
.withColumn('r_number', F.row_number().over(w))\
    .filter(col('r_number') == 1)\
    .drop('r_number')\
.show()

##### inbound/outbound

In [None]:
airports_df = load_original_data(airports_path)
airports_df.count()

In [None]:
airports_df.printSchema()

In [None]:
inbound_df = flights_clean_df.select(flights_clean_df.destination_airport_name)\
    .groupBy('destination_airport_name')\
    .count()\
    .withColumnRenamed('count', 'inbound')

outbound_df = flights_clean_df.select(flights_clean_df.origin_airport_name)\
    .groupBy('origin_airport_name')\
    .count()\
    .withColumnRenamed('count', 'outbound')



In [None]:
in_out_bound_df = inbound_df.join(outbound_df, inbound_df.destination_airport_name == outbound_df.origin_airport_name)\
    .drop(col('origin_airport_name'))\
    .withColumnRenamed('destination_airport_name', 'airport_name')\
    .withColumn('diff', col('inbound') - col('outbound'))\
    .orderBy(col('diff').desc())
in_out_bound_df.show()

##### By country, avg active flight speed

In [None]:
flights_clean_df.filter(col('status_icon') == 'green')\
.select('origin_airport_country_name','ground_speed')\
    .groupBy('origin_airport_country_name')\
        .avg('ground_speed')\
            .show()