# 5.2 Programming Exercise: Create a Small Data Warehouse

In this exercise, you will create a small data warehouse using Spark to save data as if it were a table in a typical relational database. Once you create this data warehouse, you can query the tables you created using Structured Query Language (SQL).

For this exercise, you will execute your Spark SQL within a Python program, but if you are using a typical Hadoop distribution, there are many ways you can connect those tables to existing tools as if it were a normal, relational database. Spark SQL natively supports reading and writing data managed by Apache Hive. Spark can act as a distributed SQL engine allowing you to connect to any tool with JDBC/ODBC support. You can also integrate Spark with big data tools like Apache Phoenix and normal relational databases.

For this exercise, you will be creating tables using U.S. Gazetteer files provided by the United States Census Bureau. These files provide a listing of geographic areas for selected areas. You can find the Gazetteer files for 2017 and 2018 in the data directory under the gazetteer folder. These directories contain data for congressional districts, core statistical areas, counties, county subdivisions, schools, census tracts, urban areas, zip codes, and places of interest. You will combine the data from 2017 and 2018, and create tables with the filename of the source (e.g., places.csv is saved in the places table).



# 1. Gazetteer Data

## a. Create Unmanaged Tables



The first step of this assignment involves loading the data from the CSV files, combining the file with the file for the other year, and saving it to disk as a table. The following code should provide a template to help you combine tables and save them to the warehouse directory. 

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
#import pandas

In [2]:
# Create file paths including filenames
file_2017_path = r'/home/ram/share/650/dsc650-master/data/gazetteer/2017/places.csv'

file_2018_path = r'/home/ram/share/650/dsc650-master/data/gazetteer/2018/places.csv'

warehouse_dir = r'/home/ram/Documents/spark-warehouse/'

In [3]:
spark = SparkSession.builder.appName('week5') \
        .config("spark.sql.warehouse.dir", warehouse_dir) \
        .getOrCreate()

In [4]:
df1 = spark.read.load(
  file_2017_path,
  format='csv',
  sep=',',
  inferSchema=True,
  header=True
)

df2 = spark.read.load(
  file_2018_path,
  format='csv',
  sep=',',
  inferSchema=True,
  header=True
)


In [5]:
df = df1.unionAll(df2)
df.head(3)

[Row(state='AL', geoid=100100, ansi_code=2582661, name='Abanda CDP', legal_stat_area_desc='57', functional_status='S', land_area_meters_sq=7764034, water_area_meters_sq=34284, land_area_miles_sq=2.998, water_area_miles_sq=0.013000000000000001, latitude=33.091627, longitude=-85.527029, year=2017),
 Row(state='AL', geoid=100124, ansi_code=2403054, name='Abbeville city', legal_stat_area_desc='25', functional_status='A', land_area_meters_sq=40255362, water_area_meters_sq=107642, land_area_miles_sq=15.543, water_area_miles_sq=0.042, latitude=31.564689, longitude=-85.259124, year=2017),
 Row(state='AL', geoid=100460, ansi_code=2403063, name='Adamsville city', legal_stat_area_desc='25', functional_status='A', land_area_meters_sq=65211854, water_area_meters_sq=14122, land_area_miles_sq=25.178, water_area_miles_sq=0.005, latitude=33.605748, longitude=-86.974649, year=2017)]

In [6]:
df.write.saveAsTable('places', path = warehouse_dir)

In [7]:
#df.write.saveAsTable('places', mode = 'overwrite', path = '/home/ram/share/650/spark-warehouse/')

For each CSV file in the 2017 and 2018 directories, load the data into Spark, combine it with the corresponding data from the other year and save it to disk.

In [8]:
import os

files_2017_path = r'/home/ram/share/650/dsc650-master/data/gazetteer/2017'

files_2018_path = r'/home/ram/share/650/dsc650-master/data/gazetteer/2018'

#os.listdir("/home/ram/share/650/dsc650-master/data/gazetteer/2017")
fileslist_2017 = os.listdir(files_2017_path)
fileslist_2018 = os.listdir(files_2018_path)

In [9]:
# getting list of files in the directory
import glob
print(glob.glob("/home/ram/share/650/dsc650-master/data/gazetteer/2017/*.csv"))

['/home/ram/share/650/dsc650-master/data/gazetteer/2017/congressional_district.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/core_based_statistical_areas.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/counties.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/county_subdivisions.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/elementary_schools.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/places.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/secondary_schools.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/tracts.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/unified_school_districts.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/urban_areas.csv', '/home/ram/share/650/dsc650-master/data/gazetteer/2017/zip_code_tabulation_areas.csv']


In [10]:
# list of files in 2017 directory
fileslist_2017

['congressional_district.csv',
 'core_based_statistical_areas.csv',
 'counties.csv',
 'county_subdivisions.csv',
 'elementary_schools.csv',
 'places.csv',
 'secondary_schools.csv',
 'tracts.csv',
 'unified_school_districts.csv',
 'urban_areas.csv',
 'zip_code_tabulation_areas.csv']

In [11]:
# list of files in 2018 directory
fileslist_2018

['congressional_district.csv',
 'core_based_statistical_areas.csv',
 'counties.csv',
 'county_subdivisions.csv',
 'elementary_schools.csv',
 'places.csv',
 'secondary_schools.csv',
 'tracts.csv',
 'unified_school_districts.csv',
 'urban_areas.csv',
 'zip_code_tabulation_areas.csv']

In [12]:
def saveastable(file, warehouse_dir):
    """
    Combines csv files from different directories (2017 & 2018) and creates a spark table
    inputs: 2 - filename, warehouse directory
    output: none
    expectation: combines files and creates parqurt tables warehouse directory
    """
    
    file1_path = os.path.join(files_2017_path,file)
    file2_path = os.path.join(files_2018_path,file)
    df1 = spark.read.load(
      file1_path,
      format='csv',
      sep=',',
      inferSchema=True,
      header=True
    )

    df2 = spark.read.load(
      file2_path,
      format='csv',
      sep=',',
      inferSchema=True,
      header=True
    )

    df = df1.unionAll(df2)
    
    tablename = os.path.splitext(i)[0]
    tblwarehouse_dir = os.path.join(warehouse_dir,tablename)
    df.write.saveAsTable(tablename, mode = 'overwrite', path = tblwarehouse_dir )
    print(" Table created for - ",tablename)

In [13]:
# loops through each file and merges the 2017 & 2018 data for same file and saves in warehouse directory
table_names = []
for i in fileslist_2017:
    filename = i
    tablename = os.path.splitext(i)[0]
    table_names.append(tablename)
    file_path_name = os.path.join(files_2017_path,i)
    print(filename)    
    print(tablename)
    print(file_path_name)
    #call function to merge and create unified parquet file and save it in warehouse directory
    saveastable(filename, warehouse_dir)
        

congressional_district.csv
congressional_district
/home/ram/share/650/dsc650-master/data/gazetteer/2017/congressional_district.csv
 Table created for -  congressional_district
core_based_statistical_areas.csv
core_based_statistical_areas
/home/ram/share/650/dsc650-master/data/gazetteer/2017/core_based_statistical_areas.csv
 Table created for -  core_based_statistical_areas
counties.csv
counties
/home/ram/share/650/dsc650-master/data/gazetteer/2017/counties.csv
 Table created for -  counties
county_subdivisions.csv
county_subdivisions
/home/ram/share/650/dsc650-master/data/gazetteer/2017/county_subdivisions.csv
 Table created for -  county_subdivisions
elementary_schools.csv
elementary_schools
/home/ram/share/650/dsc650-master/data/gazetteer/2017/elementary_schools.csv
 Table created for -  elementary_schools
places.csv
places
/home/ram/share/650/dsc650-master/data/gazetteer/2017/places.csv
 Table created for -  places
secondary_schools.csv
secondary_schools
/home/ram/share/650/dsc650-m

Once you have finished saving all of the files as tables, verify that you have loaded the files properly by loading the tables into Spark, and performing a simple row count on each table.

In [14]:
# I did not use this code, as I have already created tables in the above step
def create_external_table(table_name):
    table_dir = os.path.join(warehouse_dir, table_name)
    return spark.catalog.createExternalTable(table_name, table_dir)

def create_external_tables():
    for table_name in table_names:
        create_external_table(table_name)

In [15]:
# show existing tables
table_names

#for i in tablelist:
#    create_external_tables() 

['congressional_district',
 'core_based_statistical_areas',
 'counties',
 'county_subdivisions',
 'elementary_schools',
 'places',
 'secondary_schools',
 'tracts',
 'unified_school_districts',
 'urban_areas',
 'zip_code_tabulation_areas']

In [16]:
#Getting list of existing tables in spark warehouse
spark.catalog.listTables()

[Table(name='congressional_district', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='core_based_statistical_areas', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='counties', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='county_subdivisions', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='elementary_schools', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='places', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='secondary_schools', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='tracts', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='unified_school_districts', database='default', description=None, tableType='EXTERNAL', isTempor

In [17]:
spark.sql("select count(*) as row_count from places").show()

+---------+
|row_count|
+---------+
|    59151|
+---------+



In [18]:
# printing count of rows from each of the merged tables
for i in table_names:
    sqlstring = "select count(*) as row_count from " + i
    print("Count of rows from table - ", i)
    spark.sql(sqlstring).show()

Count of rows from table -  congressional_district
+---------+
|row_count|
+---------+
|      880|
+---------+

Count of rows from table -  core_based_statistical_areas
+---------+
|row_count|
+---------+
|     1890|
+---------+

Count of rows from table -  counties
+---------+
|row_count|
+---------+
|     6440|
+---------+

Count of rows from table -  county_subdivisions
+---------+
|row_count|
+---------+
|    73261|
+---------+

Count of rows from table -  elementary_schools
+---------+
|row_count|
+---------+
|     3926|
+---------+

Count of rows from table -  places
+---------+
|row_count|
+---------+
|    59151|
+---------+

Count of rows from table -  secondary_schools
+---------+
|row_count|
+---------+
|      974|
+---------+

Count of rows from table -  tracts
+---------+
|row_count|
+---------+
|   148002|
+---------+

Count of rows from table -  unified_school_districts
+---------+
|row_count|
+---------+
|    21779|
+---------+

Count of rows from table -  urban_areas
+-

## b. Load and Query Tables


Now that we have saved the data to external tables, we will load the tables back into Spark and create a report using Spark SQL. For this report, we will create a report on school districts for the states of Nebraska and Iowa using the elementary_schools, secondary_schools and unified_school_districts tables. 

Using Spark SQL, create a report with the following information.

In [19]:
spark.sql("select * from elementary_schools").show(3)

+-----+------+--------------------+---------------------+----------------------+-------------------+--------------------+------------------+--------------------+---------+-------------------+----+
|state| geoid|                name|lowest_grade_provided|highest_grade_provided|land_area_meters_sq|water_area_meters_sq|land_area_miles_sq| water_area_miles_sq| latitude|          longitude|year|
+-----+------+--------------------+---------------------+----------------------+-------------------+--------------------+------------------+--------------------+---------+-------------------+----+
|   AL|100195|Pike Road City Sc...|                   KG|                    12|           81830289|              758331|            31.595|               0.293|32.274993| -86.02787099999999|2017|
|   AZ|400004|Clarkdale-Jerome ...|                   PK|                     8|          211351941|              380983| 81.60300000000001|               0.147|34.748857|-112.11202800000001|2017|
|   AZ|400005|S

In [20]:
spark.sql("select * from secondary_schools").show(3)

+-----+------+--------------------+---------------------+----------------------+-------------------+--------------------+------------------+-------------------+------------------+-------------------+----+
|state| geoid|                name|lowest_grade_provided|highest_grade_provided|land_area_meters_sq|water_area_meters_sq|land_area_miles_sq|water_area_miles_sq|          latitude|          longitude|year|
+-----+------+--------------------+---------------------+----------------------+-------------------+--------------------+------------------+-------------------+------------------+-------------------+----+
|   AZ|400082|Colorado River Un...|                    9|                    12|         2711174277|            74672820|           1046.79|             28.831|35.105214000000004|-114.46783400000001|2017|
|   AZ|400450|Agua Fria Union H...|                    9|                    12|          244640927|              740286|            94.456|0.28600000000000003|33.481427000000004| 

In [21]:
spark.sql("select * from unified_school_districts").show(3)

+-----+------+--------------------+---------------------+----------------------+-------------------+--------------------+------------------+-------------------+---------+----------+----+
|state| geoid|                name|lowest_grade_provided|highest_grade_provided|land_area_meters_sq|water_area_meters_sq|land_area_miles_sq|water_area_miles_sq| latitude| longitude|year|
+-----+------+--------------------+---------------------+----------------------+-------------------+--------------------+------------------+-------------------+---------+----------+----+
|   AL|100001|Fort Rucker Schoo...|                   KG|                    12|          233007250|             2735224|            89.965|              1.056|31.409737|-85.745807|2017|
|   AL|100003|Maxwell AFB Schoo...|                   KG|                    12|            8446722|              566857|             3.261|0.21899999999999997|32.380944|-86.363749|2017|
|   AL|100005|Albertville City ...|                   KG|        

In [22]:
# reading spark tables and printing schema
elementary_schools_df = spark.read.table("elementary_schools")
print("Number of rows ", elementary_schools_df.count())
elementary_schools_df.printSchema()

Number of rows  3926
root
 |-- state: string (nullable = true)
 |-- geoid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lowest_grade_provided: string (nullable = true)
 |-- highest_grade_provided: integer (nullable = true)
 |-- land_area_meters_sq: long (nullable = true)
 |-- water_area_meters_sq: long (nullable = true)
 |-- land_area_miles_sq: double (nullable = true)
 |-- water_area_miles_sq: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- year: integer (nullable = true)



In [23]:
# reading spark tables and printing schema
secondary_schools_df = spark.read.table("secondary_schools")
print("Number of rows ", secondary_schools_df.count())
secondary_schools_df.printSchema()

Number of rows  974
root
 |-- state: string (nullable = true)
 |-- geoid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lowest_grade_provided: integer (nullable = true)
 |-- highest_grade_provided: integer (nullable = true)
 |-- land_area_meters_sq: long (nullable = true)
 |-- water_area_meters_sq: integer (nullable = true)
 |-- land_area_miles_sq: double (nullable = true)
 |-- water_area_miles_sq: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- year: integer (nullable = true)



In [24]:
# reading spark tables and printing schema
unified_school_districts_df = spark.read.table("unified_school_districts")
print("Number of rows ", unified_school_districts_df.count())
unified_school_districts_df.printSchema()

Number of rows  21779
root
 |-- state: string (nullable = true)
 |-- geoid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lowest_grade_provided: string (nullable = true)
 |-- highest_grade_provided: integer (nullable = true)
 |-- land_area_meters_sq: long (nullable = true)
 |-- water_area_meters_sq: long (nullable = true)
 |-- land_area_miles_sq: double (nullable = true)
 |-- water_area_miles_sq: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- year: integer (nullable = true)



es_ilne = spark.sql("""select state, year, count(*) as Elementary 
                                    from elementary_schools 
                                    where state = 'IA' or state = 'NE'
                                    or state = 'NJ'  or state = 'IL'
                                    group by state, year""").collect()

I have joined whole data set based on state and year and not just for those 2 states, as Elementary and Seconday school data sets do not have data for these states.

In [25]:
es_ilne = spark.sql("""select state, year, count(*) as Elementary 
                                    from elementary_schools 
                                     group by state, year""").collect()

#converting list to data frame
es_ilne_df = sc.parallelize(es_ilne).toDF()
es_ilne_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- year: long (nullable = true)
 |-- Elementary: long (nullable = true)



In [26]:
es_ilne_df.show()

+-----+----+----------+
|state|year|Elementary|
+-----+----+----------+
|   MI|2017|        29|
|   WY|2017|         1|
|   MI|2018|        28|
|   NJ|2017|       171|
|   OR|2017|         9|
|   VA|2017|         1|
|   ME|2018|        11|
|   VT|2017|         4|
|   RI|2018|         5|
|   VA|2018|         1|
|   WI|2018|        44|
|   WY|2018|         1|
|   KY|2018|         5|
|   MT|2017|       248|
|   IL|2018|       372|
|   WI|2017|        46|
|   NY|2018|        11|
|   TN|2018|        16|
|   RI|2017|         5|
|   NY|2017|        11|
+-----+----+----------+
only showing top 20 rows



In [27]:
ss_ilne = spark.sql("""select state, year, count(*) as Secondary 
                                    from secondary_schools 
                                     group by state, year""").collect()
#converting list to data frame
ss_ilne_df = sc.parallelize(ss_ilne).toDF()
ss_ilne_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- year: long (nullable = true)
 |-- Secondary: long (nullable = true)



In [28]:
ss_ilne_df.show()

+-----+----+---------+
|state|year|Secondary|
+-----+----+---------+
|   NJ|2017|       46|
|   OR|2017|        1|
|   ME|2018|        6|
|   VT|2017|        1|
|   RI|2018|        1|
|   WI|2018|       10|
|   KY|2018|        5|
|   MT|2017|      102|
|   IL|2018|      105|
|   WI|2017|       10|
|   NY|2018|        3|
|   TN|2018|       16|
|   RI|2017|        1|
|   NY|2017|        3|
|   IL|2017|      102|
|   MA|2017|       31|
|   TX|2017|        4|
|   AZ|2018|       15|
|   TX|2018|        3|
|   SC|2017|        2|
+-----+----+---------+
only showing top 20 rows



In [29]:
usd_ilne = spark.sql("""select state, year, count(*) as Unified 
                                    from unified_school_districts 
                                    group by state, year""").collect()

#converting list to data frame
usd_ilne_df = sc.parallelize(usd_ilne).toDF()
usd_ilne_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- year: long (nullable = true)
 |-- Unified: long (nullable = true)



In [30]:
# renaming columns to preserve for later stages
usd_ilne_df = usd_ilne_df.withColumnRenamed("state","JState")\
                        .withColumnRenamed("year","JYear")
usd_ilne_df.show()

+------+-----+-------+
|JState|JYear|Unified|
+------+-----+-------+
|    MI| 2017|    518|
|    IA| 2018|    333|
|    DE| 2017|     16|
|    MD| 2018|     24|
|    WY| 2017|     48|
|    MI| 2018|    516|
|    HI| 2018|      1|
|    WA| 2017|    295|
|    WA| 2018|    295|
|    NE| 2018|    246|
|    NV| 2017|     17|
|    LA| 2018|     69|
|    NJ| 2017|    342|
|    IA| 2017|    336|
|    OR| 2017|    188|
|    VA| 2017|    136|
|    PR| 2018|      1|
|    ID| 2018|    115|
|    KS| 2018|    286|
|    AK| 2017|     53|
+------+-----+-------+
only showing top 20 rows



In [31]:
joinexpression =  [usd_ilne_df.JState == es_ilne_df.state, usd_ilne_df.JYear == es_ilne_df.year]
joinType = "left_outer"

In [32]:
reportdf1 = usd_ilne_df.join(es_ilne_df,joinexpression,joinType)#.collect()
reportdf1 = reportdf1.drop("state", "year")
reportdf1.show()

+------+-----+-------+----------+
|JState|JYear|Unified|Elementary|
+------+-----+-------+----------+
|    FL| 2017|     67|      null|
|    AL| 2017|    138|         1|
|    CT| 2017|    115|        44|
|    CO| 2017|    178|      null|
|    UT| 2017|     41|      null|
|    IL| 2018|    386|       372|
|    DC| 2018|      1|      null|
|    LA| 2018|     69|      null|
|    TN| 2018|    126|        16|
|    AR| 2017|    234|      null|
|    IN| 2018|    290|      null|
|    KS| 2017|    286|      null|
|    HI| 2018|      1|      null|
|    AZ| 2017|     98|       104|
|    DE| 2018|     16|      null|
|    RI| 2017|     31|         5|
|    DE| 2017|     16|      null|
|    MT| 2018|     60|       243|
|    MI| 2017|    518|        29|
|    NJ| 2018|    342|       171|
+------+-----+-------+----------+
only showing top 20 rows



In [33]:
joinexpression =  [reportdf1.JState == ss_ilne_df.state, reportdf1.JYear == ss_ilne_df.year]
joinType = "left_outer"

In [34]:
reportdf = reportdf1.join(ss_ilne_df,joinexpression,joinType)#.collect()
reportdf = reportdf.drop("state", "year")

reportdf = reportdf.withColumnRenamed("JState","State")\
                        .withColumnRenamed("JYear","Year")

reportdf = reportdf.select("State", "Year", "Elementary", "Secondary", "Unified")\
                    .sort("State", "Year")
reportdf.show()

+-----+----+----------+---------+-------+
|State|Year|Elementary|Secondary|Unified|
+-----+----+----------+---------+-------+
|   AK|2017|      null|     null|     53|
|   AK|2018|      null|     null|     53|
|   AL|2017|         1|     null|    138|
|   AL|2018|         1|     null|    138|
|   AR|2017|      null|     null|    234|
|   AR|2018|      null|     null|    235|
|   AZ|2017|       104|       15|     98|
|   AZ|2018|       104|       15|     98|
|   CA|2017|       526|      116|    344|
|   CA|2018|       524|      112|    345|
|   CO|2017|      null|     null|    178|
|   CO|2018|      null|     null|    178|
|   CT|2017|        44|        8|    115|
|   CT|2018|        44|        8|    115|
|   DC|2017|      null|     null|      1|
|   DC|2018|      null|     null|      1|
|   DE|2017|      null|     null|     16|
|   DE|2018|      null|     null|     16|
|   FL|2017|      null|     null|     67|
|   FL|2018|      null|     null|     67|
+-----+----+----------+---------+-

In [35]:
# saving the summarized report for all states as spark table in warehouse
tablename = 'Allstates_counts'
tblwarehouse_dir = os.path.join(warehouse_dir,tablename)
reportdf.write.saveAsTable(tablename, mode = 'overwrite', path = tblwarehouse_dir )

In [41]:
# using spark SQL analyse the table for few states
summary_fewstates = spark.sql("""select *
                                    from Allstates_counts 
                                    where state In ('NE' ,'IA') 
                                    order by state""")

#print the report output for states IA & NE
summary_fewstates.show()

+-----+----+----------+---------+-------+
|State|Year|Elementary|Secondary|Unified|
+-----+----+----------+---------+-------+
|   IA|2017|      null|     null|    336|
|   IA|2018|      null|     null|    333|
|   NE|2017|      null|     null|    251|
|   NE|2018|      null|     null|    246|
+-----+----+----------+---------+-------+



# 2. Flight Data



In the previous exercise, you joined data from flights and airport codes to create a report. Create an external table for airport_codes and domestic_flights from the domestic-flights/flights.parquet and airport-codes/airport-codes.csv files. Recreate the report of top ten airports for 2008 using Spark SQL instead of dataframes.

In [42]:
# Create file paths including filenames
parquet_file_path = r'/home/ram/share/650/dsc650-master/data/domestic-flights/flights.parquet'

airportdata_filepath = r'/home/ram/share/650/dsc650-master/data/airport-codes/airport-codes.csv'


In [43]:
df_flight = spark.read.parquet(parquet_file_path)
df_flight.head(5)

[Row(origin_airport_code='MHK', destination_airport_code='AMW', origin_city='Manhattan, KS', destination_city='Ames, IA', passengers=21, seats=30, flights=1, distance=254.0, origin_population=122049, destination_population=86219, flight_year=2008, flight_month=10, __index_level_0__=0),
 Row(origin_airport_code='EUG', destination_airport_code='RDM', origin_city='Eugene, OR', destination_city='Bend, OR', passengers=41, seats=396, flights=22, distance=103.0, origin_population=284093, destination_population=76034, flight_year=1990, flight_month=11, __index_level_0__=1),
 Row(origin_airport_code='EUG', destination_airport_code='RDM', origin_city='Eugene, OR', destination_city='Bend, OR', passengers=88, seats=342, flights=19, distance=103.0, origin_population=284093, destination_population=76034, flight_year=1990, flight_month=12, __index_level_0__=2),
 Row(origin_airport_code='EUG', destination_airport_code='RDM', origin_city='Eugene, OR', destination_city='Bend, OR', passengers=11, seats=7

In [44]:
df_airpot_codes = spark.read.load(airportdata_filepath, format="csv", sep=",", inferschema=True, header=True)

df_airpot_codes.head(5)

[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft=11.0, continent=None, iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125'),
 Row(ident='00AA', type='small_airport', name='Aero B Ranch Airport', elevation_ft=3435.0, continent=None, iso_country='US', iso_region='US-KS', municipality='Leoti', gps_code='00AA', iata_code=None, local_code='00AA', coordinates='-101.473911, 38.704022'),
 Row(ident='00AK', type='small_airport', name='Lowell Field', elevation_ft=450.0, continent=None, iso_country='US', iso_region='US-AK', municipality='Anchor Point', gps_code='00AK', iata_code=None, local_code='00AK', coordinates='-151.695999146, 59.94919968'),
 Row(ident='00AL', type='small_airport', name='Epps Airpark', elevation_ft=820.0, continent=None, iso_country='US', iso_region='US-AL', municipality='Harvest', gps_code='00AL', iata_code=None, local_code='00AL', coordinat

## Join to Origin Airport

In [45]:
joinexpression =  df_flight['origin_airport_code'] == df_airpot_codes['iata_code']
joinType = "left_outer"

In [46]:
#df_flight.join(df_airpot_codes,joinexpression,joinType).show(3)
df_merged = df_flight.join(df_airpot_codes,joinexpression,joinType)
df_merged_modified = df_merged.drop("__index_level_0__","ident","local_code","continent","iso_country","iata_code")
df_merged_modified.head(2)

[Row(origin_airport_code='MHK', destination_airport_code='AMW', origin_city='Manhattan, KS', destination_city='Ames, IA', passengers=21, seats=30, flights=1, distance=254.0, origin_population=122049, destination_population=86219, flight_year=2008, flight_month=10, type='medium_airport', name='Manhattan Regional Airport', elevation_ft=1057.0, iso_region='US-KS', municipality='Manhattan', gps_code='KMHK', coordinates='-96.6707992553711, 39.14099884033203'),
 Row(origin_airport_code='EUG', destination_airport_code='RDM', origin_city='Eugene, OR', destination_city='Bend, OR', passengers=41, seats=396, flights=22, distance=103.0, origin_population=284093, destination_population=76034, flight_year=1990, flight_month=11, type='medium_airport', name='Mahlon Sweet Field', elevation_ft=374.0, iso_region='US-OR', municipality='Eugene', gps_code='KEUG', coordinates='-123.21199798583984, 44.12459945678711')]

In [47]:
df_merged_modified2 = df_merged_modified.withColumnRenamed("type","origin_airport_type")\
                                        .withColumnRenamed("name","origin_airport_name")\
                                        .withColumnRenamed("elevation_ft","origin_airport_elevation_ft")\
                                        .withColumnRenamed("iso_region","origin_airport_region")\
                                        .withColumnRenamed("municipality","origin_airport_municipality")\
                                        .withColumnRenamed("gps_code","origin_airport_gps_code")\
                                        .withColumnRenamed("coordinates","origin_airport_coordinates")

In [48]:
df_merged_modified2.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)



## Join to Destination Airport

In [49]:
joinexpression2 =  df_merged_modified2['destination_airport_code'] == df_airpot_codes['iata_code']
joinType2 = "left_outer"


In [50]:
#df_merged_modified2.join(df_airpot_codes,joinexpression2,joinType2).show(2)
df_merged_modified_dest= df_merged_modified2.join(df_airpot_codes,joinexpression2,joinType2)

df_merged_modified_dest2 = df_merged_modified_dest.drop("__index_level_0__","ident","local_code","continent","iso_country")


df_merged_modified_dest_final = df_merged_modified_dest2.withColumnRenamed("type","destination_airport_type")\
                                        .withColumnRenamed("name","destination_airport_name")\
                                        .withColumnRenamed("elevation_ft","destination_airport_elevation_ft")\
                                        .withColumnRenamed("iso_region","destination_airport_region")\
                                        .withColumnRenamed("municipality","destination_airport_municipality")\
                                        .withColumnRenamed("gps_code","destination_airport_gps_code")\
                                        .withColumnRenamed("coordinates","destination_airport_coordinates")

In [51]:
df_merged_modified_dest_final.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type: string (nullable = true)
 |-- destination_airp

## Top Ten Airports By Inbound Passengers

In [52]:
df_merged_modified_dest_final.createOrReplaceTempView("dfTable")

In [53]:
# dataframe with data from 2008
df_2008 = spark.sql("SELECT * FROM dfTable where flight_year = 2008")
df_2008.head(2)

[Row(origin_airport_code='MHK', destination_airport_code='AMW', origin_city='Manhattan, KS', destination_city='Ames, IA', passengers=21, seats=30, flights=1, distance=254.0, origin_population=122049, destination_population=86219, flight_year=2008, flight_month=10, origin_airport_type='medium_airport', origin_airport_name='Manhattan Regional Airport', origin_airport_elevation_ft=1057.0, origin_airport_region='US-KS', origin_airport_municipality='Manhattan', origin_airport_gps_code='KMHK', origin_airport_coordinates='-96.6707992553711, 39.14099884033203', destination_airport_type='small_airport', destination_airport_name='Ames Municipal Airport', destination_airport_elevation_ft=956.0, destination_airport_region='US-IA', destination_airport_municipality='Ames', destination_airport_gps_code='KAMW', iata_code='AMW', destination_airport_coordinates='-93.621803, 41.992001'),
 Row(origin_airport_code='SEA', destination_airport_code='RDM', origin_city='Seattle, WA', destination_city='Bend, OR'

In [54]:
# saving dataframe with 2008 as a table
df_2008.write.saveAsTable('Flight_2008_data', mode = 'overwrite', 
                          path = '/home/ram/Documents/spark-warehouse/flight_data')

In [55]:
# dataframe with data from 2008
Top10 = spark.sql(""" SELECT * FROM 
                    (
                    SELECT  Airport_Name,
                            Airpot_Code,
                            dense_rank() Over(ORDER BY Total_Inbound_Passengers DESC) as Rank,
                            Total_Inbound_Passengers,
                            Total_Inbound_Flights,
                            Average_Daily_Passengers,
                            Average_DailyFlights
                        FROM 
                        (
                            SELECT 
                            destination_airport_name as Airport_Name, 
                            destination_airport_code as Airpot_Code,  
                            count(passengers) as Total_Inbound_Passengers,
                            count(flights) as Total_Inbound_Flights,
                            mean(passengers) as Average_Daily_Passengers,
                            mean(flights) as Average_DailyFlights                            
                            FROM Flight_2008_data 
                            GROUP BY destination_airport_name, destination_airport_code
                            ) Temp
                      ) RankedTable 
                          WHERE Rank < 11
                """)

In [56]:
Top10.show()

+--------------------+-----------+----+------------------------+---------------------+------------------------+--------------------+
|        Airport_Name|Airpot_Code|Rank|Total_Inbound_Passengers|Total_Inbound_Flights|Average_Daily_Passengers|Average_DailyFlights|
+--------------------+-----------+----+------------------------+---------------------+------------------------+--------------------+
|Chicago O'Hare In...|        ORD|   1|                    9479|                 9479|      2784.9765798079966|   37.61683721911594|
|Hartsfield Jackso...|        ATL|   2|                    8775|                 8775|       4052.626210826211|   45.03612535612535|
|Charlotte Douglas...|        CLT|   3|                    6152|                 6152|      2444.4878088426526|   33.32899869960988|
|Minneapolis-St Pa...|        MSP|   4|                    5988|                 5988|      2354.3926185704745|  29.740313961255843|
|Philadelphia Inte...|        PHL|   5|                    5880|     