In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark.sql("USE SCHEMA project_bronze")

DataFrame[]

In [0]:
airlines_schema = StructType([
    StructField("iata_code", StringType(), True),
    StructField("airline", StringType(), True),
])

df_airlines = spark.read.format("csv") \
    .option("header", "true") \
    .schema(airlines_schema) \
    .load("/Volumes/workspace/project/project_volume/airlines.csv")

df_airlines.write.mode("overwrite").saveAsTable("airlines")

airports_schema = StructType([
    StructField("iata_code", StringType(), True),
    StructField("airport", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
])

df_airports = spark.read.format("csv") \
    .option("header", "true") \
    .schema(airports_schema) \
    .load("/Volumes/workspace/project/project_volume/airports.csv")

df_airports.write.mode("overwrite").saveAsTable("airports")


flights_schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("day", IntegerType(), True),
    StructField("day_of_week", IntegerType(), True),
    StructField("airline", StringType(), True),
    StructField("flight_number", IntegerType(), True),
    StructField("tail_number", StringType(), True),
    StructField("origin_airport", StringType(), True),
    StructField("destination_airport", StringType(), True),
    StructField("scheduled_departure", IntegerType(), True),
    StructField("departure_time", IntegerType(), True),
    StructField("departure_delay", IntegerType(), True),
    StructField("taxi_out", IntegerType(), True),
    StructField("wheels_off", IntegerType(), True),
    StructField("scheduled_time", IntegerType(), True),
    StructField("elapsed_time", IntegerType(), True),
    StructField("air_time", IntegerType(), True),
    StructField("distance", IntegerType(), True),
    StructField("wheels_on", IntegerType(), True),
    StructField("taxi_in", IntegerType(), True),
    StructField("scheduled_arrival", IntegerType(), True),
    StructField("arrival_time", IntegerType(), True),
    StructField("arrival_delay", IntegerType(), True),
    StructField("diverted", IntegerType(), True),
    StructField("cancelled", IntegerType(), True),
    StructField("cancellation_reason", StringType(), True),
    StructField("air_system_delay", IntegerType(), True),
    StructField("security_delay", IntegerType(), True),
    StructField("airline_delay", IntegerType(), True),
    StructField("late_aircraft_delay", IntegerType(), True),
    StructField("weather_delay", IntegerType(), True)
])

df_flights = spark.read.format("csv") \
    .option("header", "true") \
    .schema(flights_schema) \
    .load("/Volumes/workspace/project/project_volume/flights.csv")

def clean_data(df):
    df_cleaned = df.filter(col("origin_airport").rlike("^[A-Z]{3}$"))
    df_cleaned = df_cleaned.filter(col("destination_airport").rlike("^[A-Z]{3}$"))
    df_cleaned = df_cleaned.filter(col("distance") > 0)
    cols = [
    "departure_delay", "arrival_delay",
    "air_system_delay", "security_delay",
    "airline_delay", "late_aircraft_delay", "weather_delay"
    ]

    df_cleaned = df_cleaned.na.fill(0, cols)
    df_cleaned = df_cleaned.na.fill('NAN', "cancellation_reason")
    df_cleaned = df_cleaned.dropna()
    return df_cleaned

df_flights = clean_data(df_flights)

df_flights.write.mode("overwrite").saveAsTable("flights")

In [0]:
%sql
SELECT * FROM project_bronze.flights LIMIT 20;

year,month,day,day_of_week,airline,flight_number,tail_number,origin_airport,destination_airport,scheduled_departure,departure_time,departure_delay,taxi_out,wheels_off,scheduled_time,elapsed_time,air_time,distance,wheels_on,taxi_in,scheduled_arrival,arrival_time,arrival_delay,diverted,cancelled,cancellation_reason,air_system_delay,security_delay,airline_delay,late_aircraft_delay,weather_delay
2015,4,5,7,B6,601,N708JB,JFK,FLL,600,555,-5,15,610,179,175,147,1069,837,13,859,850,-9,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,605,N595JB,EWR,FLL,600,557,-3,15,612,176,164,141,1065,833,8,856,841,-15,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,154,N509JB,PBI,JFK,600,554,-6,11,605,157,160,142,1028,827,7,837,834,-3,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,892,N608JB,TPA,BOS,600,554,-6,11,605,176,176,158,1185,843,7,856,850,-6,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,776,N236JB,MSY,JFK,600,553,-7,10,603,174,169,151,1182,934,8,954,942,-12,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,794,N834JB,AUS,JFK,600,557,-3,13,610,217,202,182,1521,1012,7,1037,1019,-18,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,721,N520JB,BOS,PBI,600,555,-5,28,623,192,196,164,1197,907,4,912,911,-1,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,727,N296JB,BOS,BWI,600,558,-2,19,617,99,91,67,369,724,5,739,729,-10,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,735,N821JB,SFO,LGB,600,551,-9,19,610,79,83,60,354,710,4,719,714,-5,0,0,NAN,0,0,0,0,0
2015,4,5,7,B6,954,N554JB,SJU,FLL,600,634,34,10,644,169,160,145,1046,909,5,849,914,25,0,0,NAN,0,0,0,25,0
