In [41]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, substring
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
from pyspark.sql.functions import isnan, when, count, avg
from pyspark.sql.functions import sum as _sum
from pyspark.sql.types import *
import pyspark.sql.functions as func


import sys

In [42]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [43]:
os.environ['AWS_ACCESS_KEY_ID']=config.get('aws', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('aws', 'AWS_SECRET_ACCESS_KEY')

In [44]:
def create_spark_session():
    """ Create spark session """

    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [45]:
def drop_rows_and_removeDuplicates(df, dropColumns):
    print("drop rows with nulls in all columns of data frame: ", dropColumns)
    print("Rows before dropping nulls: ", df.count())
    df = df.na.drop(subset=dropColumns)                                       # In order to remove Rows with NULL values on all columns of PySpark DataFrame
    print("Rows after dropping nulls: ", df.count())
    df = df.distinct()
    print("Rows after dropping duplicate rows: ", df.count())
    return df
        

In [46]:
# These values are obtained by running other notebook named 'Capstone Project Template.ipynb', esentially getting columns that have null values exceeding 5%.

dropList_immigration = ['visapost', 'occup', 'entdepu', 'gender', 'insnum']
dropList_airportCodes =  ['elevation_ft', 'continent', 'municipality', 'gps_code', 'iata_code', 'local_code']
dropList_stateTemperature = []
dropList_demographics = []


In [47]:
def create_US_demographics_data(spark, input_data, output_data):
    """
    Fetch us-cities-demographics data from S3, processes it and extract demographics from it.
    Convert the data frames to parquet file and loaded back to S3 as output_data.
        
    Parameters:
        spark       : Spark Session
        input_data  : CSV file, location in S3 bucket
        output_data : Parquet format stored in S3

    """

    # get filepath to us-cities-demographics data file
    data = input_data + 'us-cities-demographics.csv'

    # read data file
    df = spark.read.csv(data, inferSchema=True, header=True,  sep=';')
    
    # remove specific columns as per notebook 'Capstone Project Template.ipynb':
    df = df.drop(*dropList_demographics)
    
    # perform data cleaning
    df = drop_rows_and_removeDuplicates(df, df.columns)
    
    df = df.withColumnRenamed('State Code', 'State_Code') \
        .withColumnRenamed('Male Population', 'Male_Population') \
        .withColumnRenamed('Female Population', 'Female_Population') \
        .withColumnRenamed('Total Population', 'Total_Population') \
        .withColumnRenamed('Number of Veterans', 'Number_Of_Veterans') \
        .withColumnRenamed('Average Household Size', 'Average_Household_Size') \
        .withColumnRenamed('Foreign-born', 'Foreign_Born') \
        .withColumnRenamed('Median Age', 'Median_Age') \
    
    # modify data frame to meet requirements
    df = df.groupBy("State_Code", "State") \
        .agg(
             _sum("Male_Population").alias("Male_Population"), \
             _sum("Female_Population").alias("Female_Population"), \
             _sum("Total_Population").alias("Total_Population"), \
             _sum("Number_Of_Veterans").alias("Number_Of_Veterans"), \
             _sum("Foreign_Born").alias("Foreign_Born"), \
             avg("Average_Household_Size").alias("Average_Household_Size"), \
             func.mean(df["Median_Age"]).alias("Median_Age") \
         ) 
    
    # Adding an id column
    df = df.withColumn("Id", row_number().over(Window.orderBy(monotonically_increasing_id())))

    # write dimension to parquet file
    df.write.parquet(output_data + "Demographics", mode="overwrite")

    df.show()
    
    df.printSchema()
    
    return df

In [48]:
def process_airport_codes_data(spark, input_data, output_data):
    """
    Fetch airport_codes data from S3, processes it and extract airport details from it.
    Convert the data frames to parquet files and loaded back to S3 as output_data.
        
    Parameters:
        spark       : Spark Session
        input_data  : CSV file, location in S3 bucket
        output_data : Parquet format stored in S3

    """

    # get filepath to airport_codes data file
    data = input_data + 'airport-codes_csv.csv'
    
    # read airport_codes data file
    df = spark.read.csv(data, inferSchema=True, header=True,  sep=',')  
    
    # remove specific columns as per notebook 'Capstone Project Template.ipynb':
    df = df.drop(*dropList_airportCodes)

    # extract columns to create airport_codes dimension table 
    df = drop_rows_and_removeDuplicates(df, df.columns)
    
    # add a new state_code column:
    df = df.select(df.ident, df.type, df.name, df.iso_country, df.iso_region, substring('iso_region', 4, 2).alias('state_code'), df.coordinates)
    
    # Adding an id column
    df = df.withColumn('Id', row_number().over(Window.orderBy(monotonically_increasing_id())))

    # write dimension to parquet file
    df.write.parquet(output_data + "Airport_Details", mode="overwrite")
    
    df.show()
    
    df.printSchema()
    
    return df

In [49]:
state_codes = {'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR', 'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE', 'dist. of columbia': 'DC', 'florida': 'FL', 'georgia': 'GA', 'guam': 'GU', 'hawaii': 'HI', 'idaho': 'ID', 'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA', 'kansas': 'KS', 'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME', 'maryland': 'MD', 'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN', 'mississippi': 'MS', 'missouri': 'MO', 'montana': 'MT', 'n. carolina': 'NC', 'n. dakota': 'ND', 'nebraska': 'NE', 'nevada': 'NV', 'new hampshire': 'NH', 'new jersey': 'NJ', 'new mexico': 'NM', 'new york': 'NY', 'ohio': 'OH', 'oklahoma': 'OK', 'oregon': 'OR', 'pennsylvania': 'PA', 'puerto rico': 'PR', 'rhode island': 'RI', 's. carolina': 'SC', 's. dakota': 'SD', 'tennessee': 'TN', 'texas': 'TX', 'utah': 'UT', 'vermont': 'VT', 'virgin islands': 'VI', 'virginia': 'VA', 'w. virginia': 'WV', 'washington': 'WA', 'wisconson': 'WI', 'wyoming': 'WY', 'all other codes': '99'}

In [50]:
def convertStateToStateCodes(value):
    value = str.lower(value)
    if value in state_codes.keys(): 
      return state_codes[value]
    else:
      return 'Unknown'

In [51]:
def process_GlobalLandTemperaturesByState_data(spark, input_data, output_data):
    """
    Fetch GlobalLandTemperaturesByState data from S3, processes it and extract GlobalLandTemperatures from it.
    Convert the data frames to parquet files and loaded back to S3 as output_data.
        
    Parameters:
        spark       : Spark Session
        input_data  : Input json files location in S3 bucket
        output_data : Parquet format stored in S3

    """

    # get filepath to data file
    data = input_data + 'GlobalLandTemperaturesByState.csv'
    
    # read data file
    df = spark.read.csv(data, inferSchema=True, header=True,  sep=',')  
    
    # remove specific columns as per notebook 'Capstone Project Template.ipynb':
    df = df.drop(*dropList_stateTemperature)
    
    # filtering results for United States
    df = df.filter("Country == 'United States'")
    
    # extract columns to create GlobalLandTemperaturesByCity dimension table  
    df = drop_rows_and_removeDuplicates(df, df.columns)
    
    # rounding temperature columns
    df = df.withColumn("AverageTemperature", func.round(df["AverageTemperature"], 2))
    df = df.withColumn("AverageTemperatureUncertainty", func.round(df["AverageTemperatureUncertainty"], 2))
    
    # add state_codes column to the data frame
    udf_Function = func.udf(convertStateToStateCodes, StringType())
    df = df.withColumn("State_Codes", udf_Function("State"))

    # Adding an id column
    df = df.withColumn('Id', row_number().over(Window.orderBy(monotonically_increasing_id())))
    
    # write dimension to parquet file
    df.write.parquet(output_data + "Temperature_Details", mode="overwrite")
    df.show()
    df.printSchema()
    
    return df


In [52]:
def extractColumnsForDateTime(df):
    """
    Fetch date and time details from timestamp data. 
        
    Parameters:
        input_data  : Spark Data Frame
        output_data : Spark Data Frame

    """
    df = df.select(col('Id'), 
                        col('datetime').alias('Start_Time'), 
                        hour(col('datetime')).alias('Hour'),
                        dayofmonth(col('datetime')).alias('Day'),
                        weekofyear(col('datetime')).alias('Week'),
                        month(col('datetime')).alias('Month'),
                        year(col('datetime')).alias('Year'),
                        date_format(col('datetime'), "u").alias('Weekday')
                        ).dropDuplicates()
    
    df.printSchema()
    
    return df

In [53]:
def createArrivalDateTimeDimensionTable(df, output_data):
    """
    Used to create ArrivalDateTime dimension table.
    
    Parameters:
        df: spark dataframe of Immigration data
        output_data: path to write dimension dataframe to
        return: spark dataframe of DateTime dimension
    
    """
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
    df = df.withColumn("datetime", get_datetime(df.arrdate))
    
    # extract columns
    df = extractColumnsForDateTime(df)
    
    # write time table to parquet files partitioned by year and month
    df.write.partitionBy("year", "month").mode('overwrite').parquet(output_data + "ArrivalDateTime.parquet")
    df.printSchema()
    
    return df

In [54]:
def createDepartureDateTimeDimensionTable(df, output_data):
    """
    Used to create DepartureDateTime dimension table.
    
    Parameters:
        df: spark dataframe of Immigration data
        output_data: path to write dimension dataframe to
        return: spark dataframe of DateTime dimension
    
    """
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
    df = df.withColumn("datetime", get_datetime(df.depdate))
    
    # extract columns
    df = extractColumnsForDateTime(df)
    
    # write time table to parquet files partitioned by year and month
    df.write.partitionBy("year", "month").mode('overwrite').parquet(output_data + "DepartureDateTime.parquet")
    df.printSchema()
    
    return df

In [55]:
def process_immigration_data(spark, input_data, output_data):
    """
    Fetch Immigration data from S3 and processes it.
    Convert the data frames to parquet files and loaded back to S3 as output_data.
        
    Parameters:
        spark       : Spark Session
        input_data  : Input parquet files location in S3 bucket
        output_data : Parquet format stored in S3

    """

    # get filepath to immigration data file
    data = input_data + 'sas_data/' + '*.parquet'
    
    # read immigration data file
    df = spark.read.parquet(data) 
    
    # remove specific columns as per notebook 'Capstone Project Template.ipynb':
    df = df.drop(*dropList_immigration)
    
    # extract columns to create immigration table    
    df = drop_rows_and_removeDuplicates(df, df.columns)
    
    # Adding an id column
    df = df.withColumn("Id", row_number().over(Window.orderBy(monotonically_increasing_id())))
    
    # create ArrivalDateTime dimension table
    arrival_df = createArrivalDateTimeDimensionTable(df, output_data)
    
    # create DepartureDateTime dimension table
    departure_df = createDepartureDateTimeDimensionTable(df, output_data)
    
    print("ArrivalDateTimeDimensionTable")
    arrival_df.show()
    
    print("DepartureDateTimeDimensionTable")
    departure_df.show()
    
    dropColumns = ['depdate', 'arrdate']
    df = df.drop(*dropColumns)
    
    # write Fact Table to parquet file
    df.write.parquet(output_data + "Immigration_Details", mode="overwrite")
    df.show()
    df.printSchema()
    
    return df
    
    


In [56]:
def performDataQualityChecks(df):
    """
    Perform quality checks on data.
        
    Parameters:
        df       : Spark data frame 

    """
    if df.count() < 1:
        raise ValueError("Number of rows in data frame is 0 !!")
    if len(df.columns) < 1:
        raise ValueError("Number of columns in data frame is 0 !!")
    if 'pyspark' not in str(type(df)):
        raise TypeError("Unexpected input type")    

In [57]:
def run():
    """
    Main method to clean input data and create Fact and Dimension parquet tables.
    """

    spark = create_spark_session()
    #input_data = "s3://aws-logs-464689066216-us-west-2/data/"
    #output_data = "s3://aws-logs-464689066216-us-west-2/data/"
    
    input_data = "/home/workspace/data/"
    output_data = "/home/workspace/data/"
    
    df = create_US_demographics_data(spark, input_data, output_data)
    performDataQualityChecks(df)
    print("\n\n")
    df = process_airport_codes_data(spark, input_data, output_data)
    performDataQualityChecks(df)
    print("\n\n")
    df = process_GlobalLandTemperaturesByState_data(spark, input_data, output_data)
    performDataQualityChecks(df)
    print("\n\n")
    df = process_immigration_data(spark, input_data, output_data)
    performDataQualityChecks(df)
    

In [19]:
run()

drop rows with nulls in all columns of data frame:  ['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race', 'Count']
Rows before dropping nulls:  2891
Rows after dropping nulls:  2875
Rows after dropping duplicate rows:  2875
+----------+--------------------+---------------+-----------------+----------------+------------------+------------+----------------------+------------------+---+
|State_Code|               State|Male_Population|Female_Population|Total_Population|Number_Of_Veterans|Foreign_Born|Average_Household_Size|        Median_Age| Id|
+----------+--------------------+---------------+-----------------+----------------+------------------+------------+----------------------+------------------+---+
|        MT|             Montana|         438535|           467935|          906470|             69270|       29885|    2.2749999999999995|              35.5|  1|


## Querying on the resulting parquet files:

In [None]:
result_data = "/home/workspace/data/"

In [58]:
spark = create_spark_session()

In [59]:
data_immigration = result_data + 'Immigration_Details/part-*.parquet'
data_arrival =  result_data + 'ArrivalDateTime.parquet/year=1970/month=1/part-*.parquet'
data_demographics = result_data + 'Demographics/part-*.parquet'

In [60]:
df_immigration = spark.read.parquet(data_immigration) 
df_arrival = spark.read.parquet(data_arrival) 
df_demographics = spark.read.parquet(data_demographics) 

In [61]:
df_immigration.createOrReplaceTempView("view_immigration")
df_arrival.createOrReplaceTempView("view_arrival")
df_demographics.createOrReplaceTempView("view_demographics")


In [81]:
# query to find details of passenger with specific Id and associated demographics info
query1 = "select i.id, i.i94cit, i.i94port, i.i94mode, i.i94addr, i.airline, i.fltno, d.male_population, d.female_population, d.median_age from view_immigration as i inner join view_demographics as d on i.i94addr = d.state_code where i.id = 1000"

In [None]:
# query to find details of passenger with specific Id and associated arrival times info
query2 = "select i.id, i.i94cit, i.i94port, i.i94mode, i.i94addr, i.airline, i.fltno, a.hour, a.week, a.day from view_immigration as i inner join view_arrival as a on i.id = a.id where i.id = 1000"

In [77]:
spark.sql(query1).show()

+----+------+-------+-------+-------+-------+-----+---------------+-----------------+-----------------+
|  id|i94cit|i94port|i94mode|i94addr|airline|fltno|male_population|female_population|       median_age|
+----+------+-------+-------+-------+-------+-----+---------------+-----------------+-----------------+
|1000| 117.0|    CHI|    1.0|     IL|     TK|00005|       10943864|         11570526|35.70879120879121|
+----+------+-------+-------+-------+-------+-----+---------------+-----------------+-----------------+



In [82]:
spark.sql(query2).show()

+----+------+-------+-------+-------+-------+-----+----+----+---+
|  id|i94cit|i94port|i94mode|i94addr|airline|fltno|hour|week|day|
+----+------+-------+-------+-------+-------+-----+----+----+---+
|1000| 117.0|    CHI|    1.0|     IL|     TK|00005|   0|   1|  1|
+----+------+-------+-------+-------+-------+-----+----+----+---+

