In [1]:
val dbUserName = dbutils.secrets.get("snowflakeSecrets", "userid")
val dbPassword = dbutils.secrets.get("snowflakeSecrets", "password")

val snowflakeURL = dbutils.widgets.get("snowflakeUrl")
val snowflakeDB = dbutils.widgets.get("snowflakeDBName")
val snowflakeWarehouse = dbutils.widgets.get("snowflakeWarehouse")
val snowflakeSchema = dbutils.widgets.get("snowflakeSchema")

val snowflakeConfig = Map(
  "sfUrl" -> snowflakeURL,
  "sfUser" -> dbUserName,
  "sfPassword" -> dbPassword,
  "sfDatabase" -> snowflakeDB,
  "sfWarehouse" -> snowflakeWarehouse,
  "sfSchema" -> snowflakeSchema
)

In [2]:
val filesBasePath = dbutils.widgets.get("fileBasePath")

In [3]:
import org.apache.spark.sql.types._

val airportsSchema = StructType(Array(
  StructField("IATA_CODE", StringType, true),
  StructField("AIRPORT", StringType, true),
  StructField("CITY", StringType, true),
  StructField("STATE", StringType, true),
  StructField("COUNTRY", StringType, true),
  StructField("LATITUDE", DoubleType, true),
  StructField("LONGITUDE", DoubleType, true)
))

val flightsSchema = StructType(Array(
  StructField("YEAR", IntegerType, true),
  StructField("MONTH", IntegerType, true),
  StructField("DAY", IntegerType, true),
  StructField("DAY_OF_WEEK", IntegerType, true),
  StructField("AIRLINE", StringType, true),
  StructField("FLIGHT_NUMBER", StringType, true),
  StructField("TAIL_NUMBER", StringType, true),
  StructField("ORIGIN_AIRPORT", StringType, true),
  StructField("DESTINATION_AIRPORT", StringType, true),
  StructField("SCHEDULED_DEPARTURE", IntegerType, true),
  StructField("DEPARTURE_TIME", StringType, true),
  StructField("DEPARTURE_DELAY", IntegerType, true),
  StructField("TAXI_OUT", IntegerType, true),
  StructField("WHEELS_OFF", StringType, true),
  StructField("SCHEDULED_TIME", IntegerType, true),
  StructField("ELAPSED_TIME", IntegerType, true),
  StructField("AIR_TIME", IntegerType, true),
  StructField("DISTANCE", IntegerType, true),
  StructField("WHEELS_ON", IntegerType, true),
  StructField("TAXI_IN", IntegerType, true),
  StructField("SCHEDULED_ARRIVAL", IntegerType, true),
  StructField("ARRIVAL_TIME", StringType, true),
  StructField("ARRIVAL_DELAY", StringType, true),
  StructField("DIVERTED", IntegerType, true),
  StructField("CANCELLED", IntegerType, true),
  StructField("CANCELLATION_REASON", StringType, true),
  StructField("AIR_SYSTEM_DELAY", IntegerType, true),
  StructField("SECURITY_DELAY", IntegerType, true),
  StructField("AIRLINE_DELAY", IntegerType, true),
  StructField("LATE_AIRCRAFT_DELAY", IntegerType, true),
  StructField("WEATHER_DELAY", IntegerType, true)
))

In [4]:

val airlinesDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(filesBasePath+"airlines.csv")
val airportsDF = spark.read.format("csv").option("header", "true").schema(airportsSchema).load(filesBasePath+"airports.csv")
val flightsDF = spark.read.format("csv").option("header", "true").schema(flightsSchema).load(filesBasePath+"flights")


In [5]:
import org.apache.spark.sql.SaveMode

airlinesDF.write.format("snowflake").options(snowflakeConfig).option("dbtable", "AIRLINES").mode(SaveMode.Overwrite).save()
airportsDF.write.format("snowflake").options(snowflakeConfig).option("dbtable", "AIRPORTS").mode(SaveMode.Overwrite).save()
flightsDF.write.format("snowflake").options(snowflakeConfig).option("dbtable", "FLIGHTS").mode(SaveMode.Overwrite).save()



Created temp tables in spark for each file. This will be used to create views that will be written  to tables in snowflake. This approach is used because snowflake free edition does not support materialized views. Using just regular views is expensive

In [7]:
airlinesDF.createOrReplaceTempView("airlines")
airportsDF.createOrReplaceTempView("airports")
flightsDF.createOrReplaceTempView("flights")

In [8]:
val flightsByAirlineAndAirport = spark.sql(s"""
SELECT A.AIRLINE, COUNT(*) AS TOTAL_NUM_FLIGHTS, AP.AIRPORT, F.MONTH
FROM FLIGHTS F JOIN AIRLINES A ON F.AIRLINE = A.IATA_CODE
JOIN AIRPORTS AP ON F.ORIGIN_AIRPORT = AP.IATA_CODE
GROUP BY A.AIRLINE, AP.AIRPORT, F.MONTH
ORDER BY TOTAL_NUM_FLIGHTS DESC
""")

flightsByAirlineAndAirport.write.format("snowflake").options(snowflakeConfig).option("dbtable", "TOTAL_FLIGHTS_BY_AIRLINE_MONTHLY").mode(SaveMode.Overwrite).save()

display(flightsByAirlineAndAirport)

AIRLINE,TOTAL_NUM_FLIGHTS,AIRPORT,MONTH
Delta Air Lines Inc.,21635,Hartsfield-Jackson Atlanta International Airport,8
Delta Air Lines Inc.,21591,Hartsfield-Jackson Atlanta International Airport,7
Delta Air Lines Inc.,20962,Hartsfield-Jackson Atlanta International Airport,3
Delta Air Lines Inc.,20891,Hartsfield-Jackson Atlanta International Airport,5
Delta Air Lines Inc.,20814,Hartsfield-Jackson Atlanta International Airport,6
Delta Air Lines Inc.,20235,Hartsfield-Jackson Atlanta International Airport,4
Delta Air Lines Inc.,18542,Hartsfield-Jackson Atlanta International Airport,1
Delta Air Lines Inc.,17443,Hartsfield-Jackson Atlanta International Airport,2
American Airlines Inc.,13163,Dallas/Fort Worth International Airport,7
American Airlines Inc.,12862,Dallas/Fort Worth International Airport,8


In [9]:
import net.snowflake.spark.snowflake.Utils

var onTimePercentageForAirline = spark.sql(s"""
SELECT TABLE2.AIRLINE, TABLE2.IATA_CODE, TABLE2.YEAR,TOTAL_FLIGHTS, TOTAL_ONTIME,  ROUND((TOTAL_ONTIME/TOTAL_FLIGHTS)*100) AS PERECTAGE_ON_TIME
FROM (
    SELECT COUNT(*) AS TOTAL_ONTIME, AIRLINE, YEAR
    FROM FLIGHTS F 
    WHERE DEPARTURE_DELAY <= 0
    GROUP BY AIRLINE, YEAR) AS TABLE1
JOIN (
    SELECT YEAR, COUNT(*) AS TOTAL_FLIGHTS, F.AIRLINE AS IATA_CODE, A.AIRLINE
    FROM FLIGHTS F JOIN AIRLINES A
    ON F.AIRLINE = A.IATA_CODE
    GROUP BY YEAR, F.AIRLINE, A.AIRLINE) AS TABLE2
WHERE TABLE1.YEAR = TABLE2.YEAR 
  AND TABLE1.AIRLINE = TABLE2.IATA_CODE
  ORDER BY PERECTAGE_ON_TIME DESC;
""")

onTimePercentageForAirline.write.format("snowflake").options(snowflakeConfig).option("dbtable", "PERCENTAGE_FLIGHTS_ON_TIME").mode(SaveMode.Overwrite).save()

display(onTimePercentageForAirline)
//Utils.runQuery(snowflakeConfig, """SELECT * FROM PERCENTAGE_FLIGHTS_ON_TIME""")

AIRLINE,IATA_CODE,YEAR,TOTAL_FLIGHTS,TOTAL_ONTIME,PERECTAGE_ON_TIME
Alaska Airlines Inc.,AS,2015,115374,85092,74.0
Hawaiian Airlines Inc.,HA,2015,51592,37675,73.0
Skywest Airlines Inc.,OO,2015,397038,270050,68.0
US Airways Inc.,US,2015,198715,132260,67.0
Atlantic Southeast Airlines,EV,2015,395816,259001,65.0
Delta Air Lines Inc.,DL,2015,585399,381692,65.0
American Airlines Inc.,AA,2015,424882,262103,62.0
Virgin America,VX,2015,40337,24662,61.0
JetBlue Airways,B6,2015,178962,107784,60.0
American Eagle Airlines Inc.,MQ,2015,210150,123597,59.0


In [10]:

var airlineDelays = spark.sql(s"""
SELECT   A.AIRLINE, COUNT(*) AS TOTAL_DELAYS
FROM FLIGHTS F JOIN AIRLINES A
ON F.AIRLINE = A.IATA_CODE
WHERE F.DEPARTURE_DELAY >= 0
GROUP BY  A.AIRLINE
ORDER BY 2 DESC;
""")
airlineDelays.write.format("snowflake").options(snowflakeConfig).option("dbtable", "AIRLINE_DELAYS_COUNT").mode(SaveMode.Overwrite).save()

display(airlineDelays)

AIRLINE,TOTAL_DELAYS
Southwest Airlines Co.,453503
Delta Air Lines Inc.,245266
United Air Lines Inc.,203770
American Airlines Inc.,174631
Atlantic Southeast Airlines,140626
Skywest Airlines Inc.,138698
American Eagle Airlines Inc.,84893
JetBlue Airways,76045
US Airways Inc.,71793
Spirit Air Lines,39396


In [11]:
val airportCancelationReasons = spark.sql(s"""
SELECT A.AIRPORT, A.IATA_CODE, 
  CASE
    WHEN F.CANCELLATION_REASON = 'A' THEN 'Airline/Carrier'
    WHEN F.CANCELLATION_REASON = 'B' THEN 'Weather'
    WHEN F.CANCELLATION_REASON = 'C' THEN 'Nationl Air System'
    WHEN F.CANCELLATION_REASON = 'D' THEN 'Security'
    ELSE 'Not Provided'
  END AS CANCELLATION_REASON,
COUNT(*) AS CANCEL_REASON_COUNT
 FROM FLIGHTS F JOIN AIRPORTS A
ON F.ORIGIN_AIRPORT = A.IATA_CODE
GROUP BY A.AIRPORT, CANCELLATION_REASON, A.IATA_CODE
""")

airportCancelationReasons.write.format("snowflake").options(snowflakeConfig).option("dbtable", "AIRPORT_CANCELATION_REASONS").mode(SaveMode.Overwrite).save()

val cancellationReasons = spark.read.format("snowflake") 
                          .options(snowflakeConfig) 
                          .option("query", "SELECT * FROM AIRPORT_CANCELATION_REASONS WHERE NOT CANCELLATION_REASON = 'Not Provided' ") 
                          .load()


display(cancellationReasons)

AIRPORT,IATA_CODE,CANCELLATION_REASON,CANCEL_REASON_COUNT
Arcata Airport,ACV,Weather,22
Aspen-Pitkin County Airport,ASE,Weather,169
Chicago Midway International Airport,MDW,Nationl Air System,53
Charleston International Airport/Charleston AFB,CHS,Nationl Air System,59
Range Regional Airport (Chisholm-Hibbing Airport),HIB,Weather,10
Dane County Regional Airport,MSN,Weather,109
Orlando International Airport,MCO,Airline/Carrier,319
Richmond International Airport,RIC,Airline/Carrier,96
Central Illinois Regional Airport at Bloomington-Normal,BMI,Weather,61
Atlantic City International Airport,ACY,Weather,19


In [12]:
val airportDelayReasons = spark.sql(s"""
SELECT A.AIRPORT, SUM(F.AIR_SYSTEM_DELAY) AS TOTAL_AIR_SYSTEM_DETALY_TIME, SUM(F.SECURITY_DELAY) AS TOTAL_SECURITY_DELAY_TIME, 
SUM(F.AIRLINE_DELAY) AS TOTAL_AIRLINE_DELAY_TIME, SUM(F.LATE_AIRCRAFT_DELAY) AS TOTAL_AIRCRAFT_DELAY_TIME, SUM(F.WEATHER_DELAY) AS TOTAL_WEATHER_DELAY_TIME
FROM FLIGHTS F JOIN AIRPORTS A
ON F.ORIGIN_AIRPORT = A.IATA_CODE
GROUP BY A.AIRPORT
""")

airportDelayReasons.write.format("snowflake").options(snowflakeConfig).option("dbtable", "AIRPORT_DELAY_REASONS").mode(SaveMode.Overwrite).save()

display(airportDelayReasons)

AIRPORT,TOTAL_AIR_SYSTEM_DETALY_TIME,TOTAL_SECURITY_DELAY_TIME,TOTAL_AIRLINE_DELAY_TIME,TOTAL_AIRCRAFT_DELAY_TIME,TOTAL_WEATHER_DELAY_TIME
Melbourne International Airport,2517,0,2238,3645,534
San Diego International Airport (Lindbergh Field),108525,613,133996,245908,10694
Eppley Airfield,28769,19,40232,55248,5949
Kahului Airport,7465,253,36581,53516,2290
Austin-Bergstrom International Airport,64982,148,91687,151162,13757
Port Columbus International Airport,56214,121,68634,92683,13045
Waco Regional Airport,1617,15,3298,11656,106
Sacramento International Airport,43174,38,70175,149926,4294
Meadows Field,5142,0,6057,10632,441
Brownsville/South Padre Island International Airport,4098,0,5295,9813,1409


In [13]:
val airlineRoutes = spark.sql(s"""
SELECT DISTINCT A.AIRLINE, CONCAT(F.ORIGIN_AIRPORT, '-', F.DESTINATION_AIRPORT) AS ROUTE 
FROM FLIGHTS F JOIN AIRLINES A 
ON F.AIRLINE = A.IATA_CODE 
""")

airlineRoutes.write.format("snowflake").options(snowflakeConfig).option("dbtable", "AIRLINE_ROUTES").mode(SaveMode.Overwrite).save()

val airlineUniqueRoutes = spark.read.format("snowflake") 
                          .options(snowflakeConfig) 
                          .option("query", "SELECT AIRLINE,  COUNT(ROUTE) AS UNIQUE_ROUTES FROM AIRLINE_ROUTES GROUP BY AIRLINE ORDER BY UNIQUE_ROUTES") 
                          .load()

display(airlineUniqueRoutes)


AIRLINE,UNIQUE_ROUTES
Hawaiian Airlines Inc.,56
Virgin America,65
Alaska Airlines Inc.,241
Spirit Air Lines,308
Frontier Airlines Inc.,313
JetBlue Airways,324
US Airways Inc.,340
American Eagle Airlines Inc.,403
United Air Lines Inc.,641
American Airlines Inc.,656
