## 5.2 Programming Exercise: Create a Small Data Warehouse

### Import Libraries and Start Spark Session

In [1]:
import os
import glob
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .appName("Week 5 Assignment") \
        .config("spark.sql.warehouse.dir", "./warehouse") \
        .getOrCreate()

### 1) Gazetteer Data

#### a) Create Unmanaged Tables

In [3]:
# Define Paths to Data
path_2017 = "data/gazetteer/2017/"
path_2018 = "data/gazetteer/2018/"

# Create Empty Arrays to Store Table Names
tables = []

# Get Tables Names in 2017 Directory (Same as 2018 Directory)
for file in glob.glob(path_2017 + "*.csv"):
    tables.append(os.path.splitext(os.path.basename(file))[0])

# Show Tables
print(tables)

['secondary_schools', 'congressional_district', 'urban_areas', 'elementary_schools', 'tracts', 'places', 'counties', 'core_based_statistical_areas', 'zip_code_tabulation_areas', 'county_subdivisions', 'unified_school_districts']


In [4]:
# Function to Create New Table
def create_new_table(table_name):
    
    # Load Paths for 2017 and 2018 Files
    file_2017 = path_2017 + table_name + ".csv"
    file_2018 = path_2018 + table_name + ".csv"
    
    # Load Data into Dataframes
    df_2017 = spark.read.load(file_2017, format="csv", sep=",", inferSchema=True, header=True)
    df_2018 = spark.read.load(file_2018, format="csv", sep=",", inferSchema=True, header=True)
    
    # Create a Union
    df = df_2017.unionAll(df_2018)
    
    # Save Table to Warehouse
    df.write.saveAsTable(table_name)
    
    # Print Number of Rows
    print("Number of rows in {} table: {}".format(table_name, df.count()))

In [5]:
# Loop Through and Create New Tables in Warehouse
for table in tables:
    create_new_table(table)

Number of rows in secondary_schools table: 974
Number of rows in congressional_district table: 880
Number of rows in urban_areas table: 7202
Number of rows in elementary_schools table: 3926
Number of rows in tracts table: 148002
Number of rows in places table: 59151
Number of rows in counties table: 6440
Number of rows in core_based_statistical_areas table: 1890
Number of rows in zip_code_tabulation_areas table: 66288
Number of rows in county_subdivisions table: 73261
Number of rows in unified_school_districts table: 21779


In [6]:
# Loop Through and Get Count via SQL
for table in tables:
    print(table)
    spark.sql("SELECT COUNT(*) FROM {}".format(table)).show()

secondary_schools
+--------+
|count(1)|
+--------+
|     974|
+--------+

congressional_district
+--------+
|count(1)|
+--------+
|     880|
+--------+

urban_areas
+--------+
|count(1)|
+--------+
|    7202|
+--------+

elementary_schools
+--------+
|count(1)|
+--------+
|    3926|
+--------+

tracts
+--------+
|count(1)|
+--------+
|  148002|
+--------+

places
+--------+
|count(1)|
+--------+
|   59151|
+--------+

counties
+--------+
|count(1)|
+--------+
|    6440|
+--------+

core_based_statistical_areas
+--------+
|count(1)|
+--------+
|    1890|
+--------+

zip_code_tabulation_areas
+--------+
|count(1)|
+--------+
|   66288|
+--------+

county_subdivisions
+--------+
|count(1)|
+--------+
|   73261|
+--------+

unified_school_districts
+--------+
|count(1)|
+--------+
|   21779|
+--------+



#### b) Load and Query Tables

In [8]:
# Create SQL Statements
sql = """SELECT a.state as State, a.year as Year, a.cnt Unified, b.cnt Elementary, c.cnt as Secondary 
         FROM (SELECT state, year, count(*) as cnt
               FROM unified_school_districts
               WHERE state IN('NE','IA')
               GROUP BY state, year) a
         LEFT JOIN
           (SELECT state, year, count(*) as cnt
            FROM elementary_schools
            WHERE state IN('NE','IA')
            GROUP BY state, year) b
         ON a.state = b.state
         AND a.year = b.year
         LEFT JOIN
           (SELECT state, year, count(*) as cnt
            FROM secondary_schools
            WHERE state IN('NE','IA')
            GROUP BY state, year) c
         ON a.state = c.state
         AND a.year = c.year
         ORDER BY a.state, a.year"""

In [9]:
spark.sql(sql).show()

+-----+----+-------+----------+---------+
|State|Year|Unified|Elementary|Secondary|
+-----+----+-------+----------+---------+
|   IA|2017|    336|      null|     null|
|   IA|2018|    333|      null|     null|
|   NE|2017|    251|      null|     null|
|   NE|2018|    246|      null|     null|
+-----+----+-------+----------+---------+



### 2) Flight Data

In [10]:
# Load Flight Data
flights = spark.read.parquet("data/domestic-flights/flights.parquet")
airport_codes = spark.read.format("csv").options(header="true", inferSchema="true").load("data/airport-codes/airport-codes.csv")

# Save Table to Warehouse
flights.write.saveAsTable("flights")
airport_codes.write.saveAsTable("airport_codes")
    
# Print Number of Rows
print("Number of rows in {} table: {}".format("flights", flights.count()))
print("Number of rows in {} table: {}".format("airport_codes", airport_codes.count()))

Number of rows in flights table: 3606803
Number of rows in airport_codes table: 54591


In [11]:
spark.sql("SELECT COUNT(*) AS flight_counts FROM {}".format("flights")).show()

+-------------+
|flight_counts|
+-------------+
|      3606803|
+-------------+



In [12]:
spark.sql("SELECT COUNT(*) AS airport_codes_count FROM {}".format("airport_codes")).show()

+-------------------+
|airport_codes_count|
+-------------------+
|              54591|
+-------------------+



In [13]:
# SQL Statement
sql1 = """CREATE OR REPLACE TEMPORARY VIEW flights_origin AS
          SELECT
            a.*,
            b.type as origin_airport_type,
            b.name as origin_airport_name,
            b.elevation_ft as origin_airport_elevation_ft,
            b.iso_region as origin_airport_region,
            b.municipality as origin_airport_municipality,
            b.gps_code as origin_airport_gps_code,
            b.coordinates as origin_airport_coordinates
          FROM flights a
          LEFT JOIN airport_codes b
          ON a.origin_airport_code = b.iata_code"""

# Execute SQL Statement
spark.sql(sql1)

DataFrame[]

In [14]:
spark.table('flights_origin').printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)



In [15]:
# SQL Statement
sql2 = """CREATE OR REPLACE TEMPORARY VIEW flights_combined AS
          SELECT
            a.*,
            b.type as destination_airport_type,
            b.name as destination_airport_name,
            b.elevation_ft as destination_airport_elevation_ft,
            b.iso_region as destination_airport_region,
            b.municipality as destination_airport_municipality,
            b.gps_code as destination_airport_gps_code,
            b.coordinates as destination_airport_coordinates
          FROM flights_origin a
          LEFT JOIN airport_codes b
          ON a.origin_airport_code = b.iata_code"""

# Execute SQL Statement
spark.sql(sql2)

DataFrame[]

In [16]:
spark.table('flights_combined').printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type:

In [17]:
sql3 = """SELECT
            origin_airport_name AS Name,
            origin_airport_code AS `IATA Code`,
            SUM(passengers) AS `Total Inbound Passengers`,
            SUM(flights) AS `Total Inbound Flights`,
            AVG(passengers) AS `Average Inbound Passengers`,
            AVG(flights) AS `Average Inbound Flights`,
            DENSE_RANK() OVER (ORDER BY SUM(passengers) DESC) AS Rank
          FROM flights_combined
          WHERE flight_year = 2008
          GROUP BY origin_airport_name, origin_airport_code"""

# Execute SQL Statement
spark.sql(sql3).show(10)

+--------------------+---------+------------------------+---------------------+--------------------------+-----------------------+----+
|                Name|IATA Code|Total Inbound Passengers|Total Inbound Flights|Average Inbound Passengers|Average Inbound Flights|Rank|
+--------------------+---------+------------------------+---------------------+--------------------------+-----------------------+----+
|Hartsfield Jackso...|      ATL|                35435896|               395729|         4097.109029945658|      45.75430685628396|   1|
|Chicago O'Hare In...|      ORD|                26422032|               357181|        2799.5371900826444|     37.844988344988344|   2|
|Dallas Fort Worth...|      DFW|                22835496|               270055|         4659.354417465824|       55.1020199959192|   3|
|Los Angeles Inter...|      LAX|                19757561|               215359|         4029.688150112176|      43.92392412808485|   4|
|McCarran Internat...|      LAS|                