# Creating a Data Warehouse with Spark SQL

### Configure and start a SparkSession

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
import pandas as pd
import re
import os

warehouse_dir = 'PATH/TO/YOUR/DATA/WAREHOUSE' #filepath where you want your warehouse created

spark = SparkSession.builder \
    .appName("Building a Data Warehouse") \
    .config("spark.sql.warehouse.dir", warehouse_dir) \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseCompressedOops") \
    .getOrCreate()

### Create a dictionary of corresponding files to combine

In [None]:
# fp is a dictionary of file paths that correspond to each other
fp = {'2017/places.csv':'2018/places.csv',
      '2017/congressional_district.csv':'2018/congressional_district.csv',
      '2017/core_based_statistical_areas.csv':'2018/core_based_statistical_areas.csv',
      '2017/counties.csv':'2018/counties.csv',
      '2017/county_subdivisions.csv':'2018/county_subdivisions.csv',
      '2017/elementary_schools.csv':'2018/elementary_schools.csv',
      '2017/tracts.csv':'2018/tracts.csv',
      '2017/unified_school_districts.csv':'2018/unified_school_districts.csv',
      '2017/urban_areas.csv':'2018/urban_areas.csv',
      '2017/zip_code_tabulation_areas.csv':'2018/zip_code_tabulation_areas.csv',
      '2017/secondary_schools.csv':'2018/secondary_schools.csv'
     }

### Iterate through all the files we just listed, isolate table names, and create the data warehouse

In [None]:
for i in fp:
    df1 = spark.read.load(i, format='csv', sep=',', inferSchema=True, header=True) # loads 2017 files
    df2 = spark.read.load(fp[i], format='csv', sep=',', inferSchema=True, header=True) #loads 2018 files
    
    isolateTable = re.search('2017/(.*).csv', i) # isolates the string I want to use as the table name
    tableName = isolateTable.group(1) # sets tableName to whatever is between '2017/' and '.csv' in the file path
    
    df = df1.unionAll(df2) # joins both tables 
    df.write.saveAsTable(tableName, mode = 'overwrite') # saves the newly joined table

#### Before moving on, make sure all of the tables were created and correctly labeled

In [None]:
spark.catalog.listTables()

#### Now take a look at how many rows are in each table

In [None]:
for i in spark.catalog.listTables():
    print(i[0])
    spark.sql("SELECT COUNT(*) AS row_count FROM " + i[0]).show()

## Loading and Querying Tables with Spark SQL

Now that we have saved the data in our warehouse, we can run some queries in Spark SQL and create a report on school districts for the states of Nebraska and Iowa using the elementary_schools, secondary_schools and unified_school_districts tables.

In [None]:
NE_IA_counts = spark.sql('SELECT state, year, '
              '(SELECT COUNT(*) FROM elementary_schools WHERE state == "NE" OR state == "IA") AS elementary, '
              '(SELECT COUNT(*) FROM secondary_schools WHERE state == "NE" OR state == "IA") AS secondary, '
              'COUNT(*) as unified '
              'FROM unified_school_districts '
              'WHERE state == "NE" OR state == "IA" '
              'GROUP BY state, year '
              'ORDER BY state, year '
             )

NE_IA_counts.show()

The above table contains the number of elementary, secondary, and unified school districts in each state for each year
***
There are a lot of tables we haven't queried yet, so I would encourage you to play around and get a feel for data exploration in Spark SQL.

### MAKE SURE YOU RUN THE FOLLOWING CELL WHEN YOU'RE ALL DONE WITH SPARK
*Otherwise, Spark will continue running in the background until you do*

In [None]:
spark.stop()