In [0]:
dbutils.widgets.text("snowflake_database","USER_RANJITH")
dbutils.widgets.text("snowflake_schema","NORTHWOODS_AIRLINE")
dbutils.widgets.text("snowflake_warehouse","INTERVIEW_WH")

In [0]:
snowflake_warehouse = dbutils.widgets.get('snowflake_warehouse').strip()
snowflake_database = dbutils.widgets.get('snowflake_database').strip()
snowflake_schema = dbutils.widgets.get('snowflake_schema').strip()

In [0]:
%run "/Users/ranjithreddythipparthi@gmail.com/config/ETL_Config"

In [0]:
airlines_df = spark.read.format('csv').option("header","true").option("inferSchema", "true").load(file_path+'airlines.csv')
airports_df = spark.read.format('csv').option("header","true").option("inferSchema", "true").load(file_path+'airports.csv')
flights_df = spark.read.option("header", "true").option("inferSchema", "true").csv(flights_path)

In [0]:
options = {
  "sfUrl": snowflake_url,
  "sfUser": snowflake_user,
  "sfPassword": snowflake_password,
  "sfDatabase": snowflake_database ,
  "sfSchema": snowflake_schema ,
  "sfWarehouse": snowflake_warehouse
}

In [0]:
airlines_df.write.format("snowflake").options(**options).option("dbtable","airlines").mode("overwrite").save()
airports_df.write.format("snowflake").options(**options).option("dbtable","airports").mode("overwrite").save()
flights_df.write.format("snowflake").options(**options).option("dbtable","flights").mode("overwrite").save()

In [0]:
airlines_df.createOrReplaceTempView("airlines")
airports_df.createOrReplaceTempView("airports")
flights_df.createOrReplaceTempView("flights")

In [0]:
%sql
select
  a.AIRLINE,
  c.Airport,
  f.month,
  count(*) as number_of_flights
from
  flights as f
  join airlines as a on f.AIRLINE = a.IATA_CODE
  join airports as c on f.ORIGIN_AIRPORT = c.IATA_CODE
group by
  a.AIRLINE,
  c.Airport,
  f.month
order by
  number_of_flights

AIRLINE,Airport,month,number_of_flights
Skywest Airlines Inc.,Central Wisconsin Airport,1,1
American Eagle Airlines Inc.,Gunnison-Crested Butte Regional Airport,3,1
Delta Air Lines Inc.,Newport News/Williamsburg International Airport,3,1
Skywest Airlines Inc.,Evansville Regional Airport,1,1
Skywest Airlines Inc.,Lehigh Valley International Airport,6,1
Delta Air Lines Inc.,Green Bay-Austin Straubel International Airport,1,1
Skywest Airlines Inc.,Chicago Midway International Airport,3,1
Atlantic Southeast Airlines,Phoenix Sky Harbor International Airport,1,1
US Airways Inc.,Wilmington International Airport,1,1
Skywest Airlines Inc.,Albany International Airport,3,1


In [0]:
%sql 
with query1 as (
  select
    a.AIRLINE,
    count(*) as on_time
  from
    flights as f
    join airlines as a on f.AIRLINE = a.IATA_CODE
  where
    f.year = 2015
    and (
      f.ARRIVAL_DELAY is null
      or f.ARRIVAL_DELAY = 0
    )
    and (
      f.DEPARTURE_DELAY is null
      or f.DEPARTURE_DELAY = 0
    )
    and (
      f.AIR_SYSTEM_DELAY is null
      or f.AIR_SYSTEM_DELAY = 0
    )
    and (
      f.SECURITY_DELAY is null
      or f.SECURITY_DELAY = 0
    )
    and (
      f.AIRLINE_DELAY is null
      or f.AIRLINE_DELAY = 0
    )
    and (
      f.LATE_AIRCRAFT_DELAY is null
      or f.LATE_AIRCRAFT_DELAY = 0
    )
    and (
      f.WEATHER_DELAY is null
      or f.WEATHER_DELAY = 0
    )group by
    a.airline
),
query2 as (
  select
    count(*) as total_flights,
    a.AIRLINE
  from
    flights as f
    join airlines as a on f.AIRLINE = a.IATA_CODE
  where
    CANCELLED = 0
  group by
    a.AIRLINE
)
select
  a.AIRLINE,
  a.on_time,
  b.total_flights,
  ((a.on_time / b.total_flights) * 100) as on_time_percentage
from
  query1 as a,
  query2 as b
where
  a.AIRLINE = b.AIRLINE

AIRLINE,on_time,total_flights,on_time_percentage
Skywest Airlines Inc.,7575,389773,1.9434388733955403
American Eagle Airlines Inc.,12593,197476,6.376977455488261
Virgin America,490,39938,1.2269016976313285
United Air Lines Inc.,5438,336715,1.6150156660677428
Frontier Airlines Inc.,490,58545,0.8369630198992228
Southwest Airlines Co.,14629,832428,1.7573892276569263
JetBlue Airways,4083,175084,2.33202348586964
Hawaiian Airlines Inc.,305,51467,0.5926127421454523
Atlantic Southeast Airlines,12482,383628,3.253672828886317
Alaska Airlines Inc.,661,114903,0.5752678346083219


In [0]:
%sql
select
  a.AIRLINE,
  count(*) as delays 
from
  flights as f
  join airlines as a on f.AIRLINE = a.IATA_CODE
where
  (
    f.ARRIVAL_DELAY is not null
    or f.ARRIVAL_DELAY != 0
  )
  or (
    f.DEPARTURE_DELAY is not null
    or f.DEPARTURE_DELAY != 0
  )
  or (
    f.AIR_SYSTEM_DELAY is not null
    or f.AIR_SYSTEM_DELAY != 0
  )
  or (
    f.SECURITY_DELAY is not null
    or f.SECURITY_DELAY != 0
  )
  or (
    f.AIRLINE_DELAY is not null
    or f.AIRLINE_DELAY != 0
  )
  or (
    f.LATE_AIRCRAFT_DELAY is null
    or f.LATE_AIRCRAFT_DELAY != 0
  )
  or (
    f.WEATHER_DELAY is not null
    or f.WEATHER_DELAY != 0
  )
group by
  a.airline
order by delays DESC

AIRLINE,delays
Southwest Airlines Co.,845173
Delta Air Lines Inc.,585399
American Airlines Inc.,424882
Skywest Airlines Inc.,397038
Atlantic Southeast Airlines,395816
United Air Lines Inc.,341961
American Eagle Airlines Inc.,210150
US Airways Inc.,198715
JetBlue Airways,178962
Alaska Airlines Inc.,115374


In [0]:
%sql
select 
  a.AIRPORT,
  f.CANCELLATION_REASON
from
  flights as f
  join airports as a on f.ORIGIN_AIRPORT = a.IATA_CODE
where  f.CANCELLATION_REASON  is not null
group by  f.CANCELLATION_REASON,a.AIRPORT
  


AIRPORT,CANCELLATION_REASON
Indianapolis International Airport,A
Idaho Falls Regional Airport,B
Yeager Airport,A
Palm Beach International Airport,C
Abilene Regional Airport,B
Meadows Field,A
Eppley Airfield,C
Louis Armstrong New Orleans International Airport,A
Quad City International Airport,B
Fresno Yosemite International Airport,C


In [0]:
%sql with query1 as(
  select
    a.AIRPORT,
    case
      when f.ARRIVAL_DELAY is not null then "ARRIVAL_DELAY"
    end as ARRIVAL_DELAY,
    case
      when f.DEPARTURE_DELAY is not null then "DEPARTURE_DELAY"
    end as DEPARTURE_DELAY,
    case
      when f.AIR_SYSTEM_DELAY is not null then "AIR_SYSTEM_DELAY"
    end as AIR_SYSTEM_DELAY,
    case
      when f.SECURITY_DELAY is not null then "SECURITY_DELAY"
    end as SECURITY_DELAY,
    case
      when f.AIRLINE_DELAY is not null then "AIRLINE_DELAY"
    end as AIRLINE_DELAY,
    case
      when f.LATE_AIRCRAFT_DELAY is not null then "LATE_AIRCRAFT_DELAY"
    end as LATE_AIRCRAFT_DELAY,
    case
      when f.WEATHER_DELAY is not null then "WEATHER_DELAY"
    end as WEATHER_DELAY
  from
    flights as f
    join airports as a on f.ORIGIN_AIRPORT = a.IATA_CODE
  where
    (
      f.ARRIVAL_DELAY is not null
      or f.ARRIVAL_DELAY != 0
    )
    or (
      f.DEPARTURE_DELAY is not null
      or f.DEPARTURE_DELAY != 0
    )
    or (
      f.AIR_SYSTEM_DELAY is not null
      or f.AIR_SYSTEM_DELAY != 0
    )
    or (
      f.SECURITY_DELAY is not null
      or f.SECURITY_DELAY != 0
    )
    or (
      f.AIRLINE_DELAY is not null
      or f.AIRLINE_DELAY != 0
    )
    or (
      f.LATE_AIRCRAFT_DELAY is null
      or f.LATE_AIRCRAFT_DELAY != 0
    )
    or (
      f.WEATHER_DELAY is not null
      or f.WEATHER_DELAY != 0
    )
),
query2 as (
  select
    f.airport,
    coalesce(f.ARRIVAL_DELAY, "ARRIVAL_DELAY") as ARRIVAL_DELAY,
    count(f.ARRIVAL_DELAY) as count_ARRIVAL_DELAY,
    coalesce(f.DEPARTURE_DELAY, "DEPARTURE_DELAY") as DEPARTURE_DELAY,
    count(f.DEPARTURE_DELAY) as count_DEPARTURE_DELAY,
    coalesce(f.AIR_SYSTEM_DELAY, "AIR_SYSTEM_DELAY") as AIR_SYSTEM_DELAY,
    count(f.AIR_SYSTEM_DELAY) as count_AIR_SYSTEM_DELAY,
    coalesce(f.SECURITY_DELAY, "SECURITY_DELAY") as SECURITY_DELAY,
    count(f.SECURITY_DELAY) as count_SECURITY_DELAY,
    coalesce(f.AIRLINE_DELAY, "AIRLINE_DELAY") as AIRLINE_DELAY,
    count(f.AIRLINE_DELAY) as count_AIRLINE_DELAY,
    coalesce(f.LATE_AIRCRAFT_DELAY, "LATE_AIRCRAFT_DELAY") as LATE_AIRCRAFT_DELAY,
    count(f.LATE_AIRCRAFT_DELAY) as count_LATE_AIRCRAFT_DELAY,
    coalesce(f.WEATHER_DELAY, "WEATHER_DELAY") as WEATHER_DELAY,
    count(f.WEATHER_DELAY) as count_WEATHER_DELAY
  from
    query1 as f
  group by
    f.airport,
    f.ARRIVAL_DELAY,
    f.DEPARTURE_DELAY,
    f.AIR_SYSTEM_DELAY,
    f.SECURITY_DELAY,
    f.AIRLINE_DELAY,
    f.LATE_AIRCRAFT_DELAY,
    f.WEATHER_DELAY
),
query3 as (
  select
    f.airport,
    f.ARRIVAL_DELAY,
    cast(sum(f.count_ARRIVAL_DELAY) as string) as count_ARRIVAL_DELAY,
    f.DEPARTURE_DELAY,
    cast(sum(f.count_DEPARTURE_DELAY) as string) as count_DEPARTURE_DELAY,
    f.AIR_SYSTEM_DELAY,
    cast(sum(f.count_AIR_SYSTEM_DELAY) as string) as count_AIR_SYSTEM_DELAY,
    f.SECURITY_DELAY,
    cast(sum(f.count_SECURITY_DELAY) as string) as count_SECURITY_DELAY,
    f.AIRLINE_DELAY,
    cast(sum(f.count_AIRLINE_DELAY) as string) as count_AIRLINE_DELAY,
    f.LATE_AIRCRAFT_DELAY,
    cast(sum(f.count_LATE_AIRCRAFT_DELAY) as string) as count_LATE_AIRCRAFT_DELAY,
    f.WEATHER_DELAY,
    cast(sum(f.count_WEATHER_DELAY) as string) as count_WEATHER_DELAY
  from
    query2 as f
  group by
    f.airport,
    f.ARRIVAL_DELAY,
    f.DEPARTURE_DELAY,
    f.AIR_SYSTEM_DELAY,
    f.SECURITY_DELAY,
    f.AIRLINE_DELAY,
    f.LATE_AIRCRAFT_DELAY,
    f.WEATHER_DELAY
)
-- query4 as (
--   select
--     f.airport,
--     CONCAT(
--       collect_set(f.ARRIVAL_DELAY),
--       collect_set(f.count_ARRIVAL_DELAY),
--       collect_set(f.DEPARTURE_DELAY),
--       collect_set(f.count_DEPARTURE_DELAY),
--       collect_set(f.AIR_SYSTEM_DELAY),
--       collect_set(f.count_AIR_SYSTEM_DELAY),
--       collect_set(f.SECURITY_DELAY),
--       collect_set(f.count_SECURITY_DELAY),
--       collect_set(f.AIRLINE_DELAY),
--       collect_set(f.count_AIRLINE_DELAY),
--       collect_set(f.LATE_AIRCRAFT_DELAY),
--       collect_set(f.count_LATE_AIRCRAFT_DELAY),
--       collect_set(f.WEATHER_DELAY),
--       collect_set(f.WEATHER_DELAY)
--     ) as Delay_reasons
--   from
--     query3 as f
--   group by
--     f.airport
-- )
select
  *
from
  query3

airport,ARRIVAL_DELAY,count_ARRIVAL_DELAY,DEPARTURE_DELAY,count_DEPARTURE_DELAY,AIR_SYSTEM_DELAY,count_AIR_SYSTEM_DELAY,SECURITY_DELAY,count_SECURITY_DELAY,AIRLINE_DELAY,count_AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,count_LATE_AIRCRAFT_DELAY,WEATHER_DELAY,count_WEATHER_DELAY
Hattiesburg-Laurel Regional Airport,ARRIVAL_DELAY,409,DEPARTURE_DELAY,410,AIR_SYSTEM_DELAY,83,SECURITY_DELAY,83,AIRLINE_DELAY,83,LATE_AIRCRAFT_DELAY,83,WEATHER_DELAY,83
Newark Liberty International Airport,ARRIVAL_DELAY,71100,DEPARTURE_DELAY,71435,AIR_SYSTEM_DELAY,16202,SECURITY_DELAY,16202,AIRLINE_DELAY,16202,LATE_AIRCRAFT_DELAY,16202,WEATHER_DELAY,16202
Southwest Georgia Regional Airport,ARRIVAL_DELAY,632,DEPARTURE_DELAY,632,AIR_SYSTEM_DELAY,112,SECURITY_DELAY,112,AIRLINE_DELAY,112,LATE_AIRCRAFT_DELAY,112,WEATHER_DELAY,112
Eagle County Regional Airport,ARRIVAL_DELAY,986,DEPARTURE_DELAY,991,AIR_SYSTEM_DELAY,201,SECURITY_DELAY,201,AIRLINE_DELAY,201,LATE_AIRCRAFT_DELAY,201,WEATHER_DELAY,201
Atlantic City International Airport,ARRIVAL_DELAY,2685,DEPARTURE_DELAY,2690,AIR_SYSTEM_DELAY,650,SECURITY_DELAY,650,AIRLINE_DELAY,650,LATE_AIRCRAFT_DELAY,650,WEATHER_DELAY,650
Flagstaff Pulliam Airport,ARRIVAL_DELAY,1176,DEPARTURE_DELAY,1176,AIR_SYSTEM_DELAY,179,SECURITY_DELAY,179,AIRLINE_DELAY,179,LATE_AIRCRAFT_DELAY,179,WEATHER_DELAY,179
Tucson International Airport,ARRIVAL_DELAY,11073,DEPARTURE_DELAY,11115,AIR_SYSTEM_DELAY,1751,SECURITY_DELAY,1751,AIRLINE_DELAY,1751,LATE_AIRCRAFT_DELAY,1751,WEATHER_DELAY,1751
Pocatello Regional Airport,ARRIVAL_DELAY,466,DEPARTURE_DELAY,466,AIR_SYSTEM_DELAY,28,SECURITY_DELAY,28,AIRLINE_DELAY,28,LATE_AIRCRAFT_DELAY,28,WEATHER_DELAY,28
Natrona County International Airport,ARRIVAL_DELAY,1374,DEPARTURE_DELAY,1375,AIR_SYSTEM_DELAY,166,SECURITY_DELAY,166,AIRLINE_DELAY,166,LATE_AIRCRAFT_DELAY,166,WEATHER_DELAY,166
Dallas/Fort Worth International Airport,ARRIVAL_DELAY,173137,DEPARTURE_DELAY,173892,AIR_SYSTEM_DELAY,40492,SECURITY_DELAY,40492,AIRLINE_DELAY,40492,LATE_AIRCRAFT_DELAY,40492,WEATHER_DELAY,40492


In [0]:
%sql with query1 as(
  select
    distinct a.AIRPORT,
    case
      when f.ARRIVAL_DELAY is not null then "ARRIVAL_DELAY"
    end as DELAY_REASON1,
    case
      when f.DEPARTURE_DELAY is not null then "DEPARTURE_DELAY"
    end as DELAY_REASON2,
    case
      when f.AIR_SYSTEM_DELAY is not null then "AIR_SYSTEM_DELAY"
    end as DELAY_REASON3,
    case
      when f.SECURITY_DELAY is not null then "SECURITY_DELAY"
    end as DELAY_REASON4,
    case
      when f.AIRLINE_DELAY is not null then "AIRLINE_DELAY"
    end as DELAY_REASON5,
    case
      when f.LATE_AIRCRAFT_DELAY is not null then "LATE_AIRCRAFT_DELAY"
    end as DELAY_REASON6,
    case
      when f.WEATHER_DELAY is not null then "WEATHER_DELAY"
    end as DELAY_REASON7
  from
    flights as f
    join airports as a on f.ORIGIN_AIRPORT = a.IATA_CODE
  where
    (
      f.ARRIVAL_DELAY is not null
      or f.ARRIVAL_DELAY != 0
    )
    or (
      f.DEPARTURE_DELAY is not null
      or f.DEPARTURE_DELAY != 0
    )
    or (
      f.AIR_SYSTEM_DELAY is not null
      or f.AIR_SYSTEM_DELAY != 0
    )
    or (
      f.SECURITY_DELAY is not null
      or f.SECURITY_DELAY != 0
    )
    or (
      f.AIRLINE_DELAY is not null
      or f.AIRLINE_DELAY != 0
    )
    or (
      f.LATE_AIRCRAFT_DELAY is null
      or f.LATE_AIRCRAFT_DELAY != 0
    )
    or (
      f.WEATHER_DELAY is not null
      or f.WEATHER_DELAY != 0
    )
),
query2 as (
  select
    f.AIRPORT,
    CONCAT(
      collect_set(f.DELAY_REASON1),
      collect_set(f.DELAY_REASON2),
      collect_set(f.DELAY_REASON3),
      collect_set(f.DELAY_REASON4),
      collect_set(f.DELAY_REASON5),
      collect_set(f.DELAY_REASON6),
      collect_set(f.DELAY_REASON7)
    ) as Delay_reasons
  from
    query1 as f
  group by
    f.AIRPORT
)
select
  *
from
  query2

AIRPORT,Delay_reasons
Aberdeen Regional Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Abilene Regional Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Abraham Lincoln Capital Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Adak Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Akron-Canton Regional Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Albany International Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Albert J. Ellis Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Albuquerque International Sunport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Alexandria International Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"
Alpena County Regional Airport,"List(ARRIVAL_DELAY, DEPARTURE_DELAY, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY)"


In [0]:
%sql
select
  AIRLINE,
  ORIGIN_AIRPORT,
  DESTINATION_AIRPORT
from
  (
    select
      a.AIRLINE,
      f.ORIGIN_AIRPORT,
      f.DESTINATION_AIRPORT,
      row_number() over(
        partition by case
          when f.ORIGIN_AIRPORT < f.DESTINATION_AIRPORT then f.ORIGIN_AIRPORT
          else f.DESTINATION_AIRPORT
        end,
        case
          when f.ORIGIN_AIRPORT > f.DESTINATION_AIRPORT then f.ORIGIN_AIRPORT
          else f.DESTINATION_AIRPORT
        end
        order by
          f.ORIGIN_AIRPORT
      ) as rnum
    from
      flights as f
      join airlines as a on f.AIRLINE = a.IATA_CODE
  ) t
where
  rnum = 1

AIRLINE,ORIGIN_AIRPORT,DESTINATION_AIRPORT
Spirit Air Lines,ACY,MCO
Spirit Air Lines,ACY,MYR
Alaska Airlines Inc.,ADQ,ANC
Southwest Airlines Co.,AMA,DAL
Skywest Airlines Inc.,APN,DTW
Delta Air Lines Inc.,ATL,BZN
Atlantic Southeast Airlines,ATL,GRB
Atlantic Southeast Airlines,ATL,GSP
Delta Air Lines Inc.,ATL,HDN
Delta Air Lines Inc.,ATL,LGA


In [0]:
# Total number of flights by airline and airport on a monthly basis 
# On time percentage of each airline for the year 2015
# Airlines with the largest number of delays 
# Cancellation reasons by airport 
# Delay reasons by airport 
# Airline with the most unique routes 

In [0]:
Total_number_of_flights_by_airline_and_airport_on_a_monthly_basis  ="""select
  a.AIRLINE,
  c.Airport,
  f.month,
  count(*) as number_of_flights
from
  flights as f
  join airlines as a on f.AIRLINE = a.IATA_CODE
  join airports as c on f.ORIGIN_AIRPORT = c.IATA_CODE
group by
  a.AIRLINE,
  c.Airport,
  f.month
order by
  number_of_flights"""
flights_monthly_basis = spark.sql(Total_number_of_flights_by_airline_and_airport_on_a_monthly_basis)
flights_monthly_basis.write.format("snowflake").options(**options).option("dbtable","Total_Flights_Monthly").mode("overwrite").save()

In [0]:
On_time_percentage_airline ="""with query1 as (
  select
    a.AIRLINE,
    count(*) as on_time
  from
    flights as f
    join airlines as a on f.AIRLINE = a.IATA_CODE
  where
    f.year = 2015
    and (
      f.ARRIVAL_DELAY is null
      or f.ARRIVAL_DELAY = 0
    )
    and (
      f.DEPARTURE_DELAY is null
      or f.DEPARTURE_DELAY = 0
    )
    and (
      f.AIR_SYSTEM_DELAY is null
      or f.AIR_SYSTEM_DELAY = 0
    )
    and (
      f.SECURITY_DELAY is null
      or f.SECURITY_DELAY = 0
    )
    and (
      f.AIRLINE_DELAY is null
      or f.AIRLINE_DELAY = 0
    )
    and (
      f.LATE_AIRCRAFT_DELAY is null
      or f.LATE_AIRCRAFT_DELAY = 0
    )
    and (
      f.WEATHER_DELAY is null
      or f.WEATHER_DELAY = 0
    )group by
    a.airline
),
query2 as (
  select
    count(*) as total_flights,
    a.AIRLINE
  from
    flights as f
    join airlines as a on f.AIRLINE = a.IATA_CODE
  where
    CANCELLED = 0
  group by
    a.AIRLINE
)
select
  a.AIRLINE,
  a.on_time,
  b.total_flights,
  ((a.on_time / b.total_flights) * 100) as on_time_percentage
from
  query1 as a,
  query2 as b
where
  a.AIRLINE = b.AIRLINE"""
On_time_percentage = spark.sql(On_time_percentage_airline)
On_time_percentage.write.format("snowflake").options(**options).option("dbtable","On_time_airline_percentage").mode("overwrite").save()

In [0]:
Airlines_largest_number_delays  ="""select
  a.AIRLINE,
  count(*) as delays 
from
  flights as f
  join airlines as a on f.AIRLINE = a.IATA_CODE
where
  (
    f.ARRIVAL_DELAY is not null
    or f.ARRIVAL_DELAY != 0
  )
  or (
    f.DEPARTURE_DELAY is not null
    or f.DEPARTURE_DELAY != 0
  )
  or (
    f.AIR_SYSTEM_DELAY is not null
    or f.AIR_SYSTEM_DELAY != 0
  )
  or (
    f.SECURITY_DELAY is not null
    or f.SECURITY_DELAY != 0
  )
  or (
    f.AIRLINE_DELAY is not null
    or f.AIRLINE_DELAY != 0
  )
  or (
    f.LATE_AIRCRAFT_DELAY is null
    or f.LATE_AIRCRAFT_DELAY != 0
  )
  or (
    f.WEATHER_DELAY is not null
    or f.WEATHER_DELAY != 0
  )
group by
  a.airline
order by delays DESC"""
largest_number_delays = spark.sql(Airlines_largest_number_delays)
largest_number_delays.write.format("snowflake").options(**options).option("dbtable","Airlines_largest_number_delays").mode("overwrite").save()

In [0]:
Cancellation_reasons_by_airport  ="""select 
  a.AIRPORT,
  f.CANCELLATION_REASON
from
  flights as f
  join airports as a on f.ORIGIN_AIRPORT = a.IATA_CODE
where  f.CANCELLATION_REASON  is not null
group by  f.CANCELLATION_REASON,a.AIRPORT"""
Cancellation_reasons = spark.sql(Cancellation_reasons_by_airport)
Cancellation_reasons.write.format("snowflake").options(**options).option("dbtable","Cancellation_reasons").mode("overwrite").save()

In [0]:
Delay_reasons_by_airport  ="""with query1 as(
  select
    distinct a.AIRPORT,
    case
      when f.ARRIVAL_DELAY is not null then "ARRIVAL_DELAY"
    end as DELAY_REASON1,
    case
      when f.DEPARTURE_DELAY is not null then "DEPARTURE_DELAY"
    end as DELAY_REASON2,
    case
      when f.AIR_SYSTEM_DELAY is not null then "AIR_SYSTEM_DELAY"
    end as DELAY_REASON3,
    case
      when f.SECURITY_DELAY is not null then "SECURITY_DELAY"
    end as DELAY_REASON4,
    case
      when f.AIRLINE_DELAY is not null then "AIRLINE_DELAY"
    end as DELAY_REASON5,
    case
      when f.LATE_AIRCRAFT_DELAY is not null then "LATE_AIRCRAFT_DELAY"
    end as DELAY_REASON6,
    case
      when f.WEATHER_DELAY is not null then "WEATHER_DELAY"
    end as DELAY_REASON7
  from
    flights as f
    join airports as a on f.ORIGIN_AIRPORT = a.IATA_CODE
  where
    (
      f.ARRIVAL_DELAY is not null
      or f.ARRIVAL_DELAY != 0
    )
    or (
      f.DEPARTURE_DELAY is not null
      or f.DEPARTURE_DELAY != 0
    )
    or (
      f.AIR_SYSTEM_DELAY is not null
      or f.AIR_SYSTEM_DELAY != 0
    )
    or (
      f.SECURITY_DELAY is not null
      or f.SECURITY_DELAY != 0
    )
    or (
      f.AIRLINE_DELAY is not null
      or f.AIRLINE_DELAY != 0
    )
    or (
      f.LATE_AIRCRAFT_DELAY is null
      or f.LATE_AIRCRAFT_DELAY != 0
    )
    or (
      f.WEATHER_DELAY is not null
      or f.WEATHER_DELAY != 0
    )
),
query2 as (
  select
    f.AIRPORT,
    CONCAT(
      collect_set(f.DELAY_REASON1),
      collect_set(f.DELAY_REASON2),
      collect_set(f.DELAY_REASON3),
      collect_set(f.DELAY_REASON4),
      collect_set(f.DELAY_REASON5),
      collect_set(f.DELAY_REASON6),
      collect_set(f.DELAY_REASON7)
    ) as Delay_reasons
  from
    query1 as f
  group by
    f.AIRPORT
)
select
  *
from
  query2"""
Delay_reasons = spark.sql(Delay_reasons_by_airport)
Delay_reasons.write.format("snowflake").options(**options).option("dbtable","Delay_reasons").mode("overwrite").save()

In [0]:
Airline_with_the_most_unique_routes  ="""select
  AIRLINE,
  ORIGIN_AIRPORT,
  DESTINATION_AIRPORT
from
  (
    select
      a.AIRLINE,
      f.ORIGIN_AIRPORT,
      f.DESTINATION_AIRPORT,
      row_number() over(
        partition by case
          when f.ORIGIN_AIRPORT < f.DESTINATION_AIRPORT then f.ORIGIN_AIRPORT
          else f.DESTINATION_AIRPORT
        end,
        case
          when f.ORIGIN_AIRPORT > f.DESTINATION_AIRPORT then f.ORIGIN_AIRPORT
          else f.DESTINATION_AIRPORT
        end
        order by
          f.ORIGIN_AIRPORT
      ) as rnum
    from
      flights as f
      join airlines as a on f.AIRLINE = a.IATA_CODE
  ) t
where
  rnum = 1"""
unique_routes = spark.sql(Airline_with_the_most_unique_routes)
unique_routes.write.format("snowflake").options(**options).option("dbtable","Airline_most_unique_routes").mode("overwrite").save()