In [15]:
import pandas as pd
from datetime import datetime
import datetime as dt
import numpy as np

## Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add, col, split, year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld
from pyspark.sql.types import StringType as Str, DoubleType as Dbl, IntegerType as Int, DateType as Date, LongType as Long
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

In [16]:
def create_spark_session():
    """Create or retrieve a Spark session
    """
    
    spark = SparkSession.builder.\
        config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
        enableHiveSupport().\
        getOrCreate()
    return spark

In [17]:
def process_immigration_data(spark, input_data, output_data):
    """Extract immigration data from provided folder in local machine, process it using Spark and 
    loads it into S3 as a set of tables in parquet format:
        - a fact table with immigration data and 
        - a dimension table with correspondent dates available
    @spark: spark session defined previously
    @input_data: location path for the dataset
    @output_data: S3 bucket name where to load processed output data
    """
    
    
    # Read in immigration dataset
    print('Loading immigration data...')
    #df_spark = spark.read.csv('immigration_data_sample.csv', header = 'True') # to use sample dataset
    df_spark = spark.read.format('com.github.saurfang.sas.spark').\
        load(input_data)

    
    # Read in mapping codes datasets
    print('Loading mapping codes...')
    map_mode_codes = spark.read.csv('map_codes/map_mode_codes.csv', header = 'True')
    map_visa_codes = spark.read.csv('map_codes/map_visa_codes.csv', header = 'True')
    map_state_codes = spark.read.csv('map_codes/map_state_codes.csv', header = 'True')

    map_port_codes = spark.read.csv('map_codes/map_port_codes.csv', header = 'True')
    map_country_codes = spark.read.csv('map_codes/map_country_codes.csv', header = 'True')
    
    # Create temporary tables to be able to use SparkSQL
    print('Processing immigration data...')
    #df_spark_sample.createOrReplaceTempView("df_immigration")  # to use sample dataset
    df_spark.createOrReplaceTempView("df_immigration")  # to use full dataset
    map_mode_codes.createOrReplaceTempView("map_mode_codes")
    map_visa_codes.createOrReplaceTempView("map_visa_codes")
    map_state_codes.createOrReplaceTempView("map_state_codes")

    map_port_codes.createOrReplaceTempView("map_port_codes")
    map_country_codes.createOrReplaceTempView("map_country_codes")
    
    # Join immigration data with mapping codes, convert SAS dates and rename fields
    df_immigration_joined = spark.sql("""
        SELECT
            i.i94yr as year,
            i.i94mon as month,
            i.i94cit as citizenship_country,
            i.i94res as residence_country,
            i.i94port as port,
            i.arrdate,
            date_add(to_date('1960-01-01'), i.arrdate) AS arrival_date,
            coalesce(m.mode, 'Not reported') as arrival_mode,
            coalesce(s.code, 'Not reported') as us_state,
            i.depdate,
            date_add(to_date('1960-01-01'), i.depdate) AS departure_date,
            i.i94bir as respondent_age,
            coalesce(v.visa, 'Not reported') as visa,
            i.dtadfile as date_added,
            i.visapost as visa_issued_department,
            i.occup as occupation,
            i.entdepa as arrival_flag,
            i.entdepd as departure_flag,
            i.entdepu as update_flag,
            i.matflag as match_arrival_departure_fag,
            i.biryear as birth_year,
            i.dtaddto as allowed_to_stay_date,
            i.insnum as ins_number,
            i.airline as airline,
            i.admnum as admission_number,
            i.fltno as flight_number,
            i.visatype as visa_type
        from df_immigration i 
        left join map_mode_codes m on i.i94mode = m.code
        left join map_visa_codes v on i.i94visa = v.code
        left join map_state_codes s on i.i94addr = s.code
    """)
    
    # Additional cleaning and prepare final fact table
    df_immigration_joined.createOrReplaceTempView("df_immigration_joined")
    df_immigration_cleaned = spark.sql("""
        select *
        from df_immigration_joined
        where 1=1
            and respondent_age >= 0
            and arrival_date IS NOT NULL 
            and departure_date IS NOT NULL
            and arrival_date <= departure_date
    """)
    
    fact_immigration = df_immigration_cleaned.\
        drop('arrdate', 'depdate').\
        withColumn("immigration_id", monotonically_increasing_id())
    
    # Write fact table into S3 in parquet format
    print('Writing immigration fact table in parquet...')
    
    fact_immigration.write.partitionBy("year", "month", "us_state").parquet((output_data+"fact_immigration"),'overwrite')
    print('Fact immigration table created in parquet')
    
    # Generate a dimension time table using the Immigration fact table
    print('Extracting time data...')
    fact_immigration.createOrReplaceTempView("fact_immigration")
    dim_time = spark.sql("""
        SELECT DISTINCT arrival_date AS date
        FROM fact_immigration
        WHERE arrival_date IS NOT NULL
        UNION
        SELECT DISTINCT departure_date AS date
        FROM fact_immigration
        WHERE departure_date IS NOT NULL
    """)

    dim_time.createOrReplaceTempView("dim_time")
    
    
    dim_time = dim_time.select(
        col('date').alias('date'),
        dayofmonth('date').alias('day'),
        weekofyear('date').alias('week'),
        month('date').alias('month'),
        year('date').alias('year')
    )
    
    print('Writing time dimension table in parquet...')
    dim_time.write.parquet((output_data+"dim_time"),'overwrite')
    print('Time dimension table created in parquet')

In [18]:
spark = create_spark_session()
input_data = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
output_data = "udacity_dend_capstone/"
process_immigration_data(spark=spark, input_data=input_data, output_data=output_data)

Loading immigration data...
Loading mapping codes...
Processing immigration data...


AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [24]:
#read fact table 
table=spark.read.parquet("udacity_dend_capstone/fact_immigration")
print(table.count())
table.show(3)

2953435
+-------------------+-----------------+----+------------+------------+--------------+--------------+--------+----------+----------------------+----------+------------+--------------+-----------+---------------------------+----------+--------------------+----------+-------+----------------+-------------+---------+------+-----+--------+
|citizenship_country|residence_country|port|arrival_date|arrival_mode|departure_date|respondent_age|    visa|date_added|visa_issued_department|occupation|arrival_flag|departure_flag|update_flag|match_arrival_departure_fag|birth_year|allowed_to_stay_date|ins_number|airline|admission_number|flight_number|visa_type|  year|month|us_state|
+-------------------+-----------------+----+------------+------------+--------------+--------------+--------+----------+----------------------+----------+------------+--------------+-----------+---------------------------+----------+--------------------+----------+-------+----------------+-------------+---------+----

In [25]:
#read dim time table 
table=spark.read.parquet("udacity_dend_capstone/dim_time")
print(table.count())
table.show(3)

175
+----------+---+----+-----+----+
|      date|day|week|month|year|
+----------+---+----+-----+----+
|2016-06-30| 30|  26|    6|2016|
|2016-06-03|  3|  22|    6|2016|
|2016-07-20| 20|  29|    7|2016|
+----------+---+----+-----+----+
only showing top 3 rows



In [10]:
def process_airports_data(spark, input_data, output_data):
    """Extract airport data from provided folder in local machine, process it using Spark and 
    loads it into S3 as a dimension table in parquet format:
    @spark: spark session defined previously
    @input_data: location path for the dataset
    @output_data: S3 bucket name where to load processed output data
    """
        
    # Read in airport data
    print('Loading airport data...')
    df_airports = spark.read.csv(input_data, header = 'True')
    
    #Cleaning: keep only US data, extract state and lat/long
    print('Processing airports data...')
    df_airports_us_clean = df_airports.filter("iso_country == 'US'")\
        .withColumn("state", split(col("iso_region"), "-")[1])\
        .withColumn("latitude", split(col("coordinates"), ",")[0].cast(Dbl()))\
        .withColumn("longitude", split(col("coordinates"), ",")[1].cast(Dbl()))
    
    dim_airports = df_airports_us_clean.\
        drop('continent', 'iso_region', 'coordinates')
    
    # Write dim table into S3 in parquet format
    print('Writing airports dimension table in parquet...')
    
    dim_airports.write.partitionBy("state").parquet((output_data+"dim_airports"),'overwrite')
    print('Dim airports table created in parquet')


In [11]:
#spark = create_spark_session()
input_data = 'airport-codes_csv.csv'
output_data = "udacity_dend_capstone/"
process_airports_data(spark=spark, input_data=input_data, output_data=output_data)

Loading airport data...
Processing airports data...
Writing airports dimension table in parquet...
Dim airports table created in parquet


In [12]:
#read dim airports table 
table=spark.read.parquet("udacity_dend_capstone/dim_airports")
print(table.count())
table.show(3)

22757
+-----+-------------+--------------------+------------+-----------+------------+--------+---------+----------+------------------+------------------+-----+
|ident|         type|                name|elevation_ft|iso_country|municipality|gps_code|iata_code|local_code|          latitude|         longitude|state|
+-----+-------------+--------------------+------------+-----------+------------+--------+---------+----------+------------------+------------------+-----+
| 00TA|     heliport|Sw Region Faa Hel...|         598|         US|  Fort Worth|    00TA|     null|      00TA|-97.30580139160156|32.826900482177734|   TX|
| 00TE|     heliport|Tcjc-Northeast Ca...|         600|         US|  Fort Worth|    00TE|     null|      00TE|-97.18949890136719|32.847599029541016|   TX|
| 00TS|small_airport|Alpine Range Airport|         670|         US|     Everman|    00TS|     null|      00TS|-97.24199676513672|32.607601165771484|   TX|
+-----+-------------+--------------------+------------+---------

In [19]:
def process_cities_data(spark, input_data, output_data):
    """Extract US cities demographics data from provided folder in local machine, process it using Spark and 
    loads it into S3 as a dimension table in parquet format:
    @spark: spark session defined previously
    @input_data: location path for the dataset
    @output_data: S3 bucket name where to load processed output data
    """
        
    # Read in US cities data
    print('Loading cities data...')
    df_demographics = spark.read.csv(input_data, header = 'True', sep = ";")
    #pd.read_csv(input_data, sep=';')
    
    #Cleaning: no cleaning needed
    print('Processing cities data...')
    df_demographics_clean = df_demographics.select(
        col('City').alias('city'),
        col('State').alias('state_name'),
        col('Median Age').alias('median_age'),
        col('Male Population').alias('male_population'),
        col('Female Population').alias('female_population'),
        col('Number of Veterans').alias('number_veterans'),
        col('Foreign-born').alias('foreign_born'),
        col('Average Household Size').alias('avg_household_size'),
        col('State Code').alias('state_code'),
        col('Race').alias('race'),
        col('Count').alias('count'),
    )

    
    dim_cities_demographics = df_demographics_clean
    
    # Write dim table into S3 in parquet format
    print('Writing cities dimension table in parquet...')
    
    dim_cities_demographics.write.partitionBy("state_code").parquet((output_data+"dim_cities_demographics"),'overwrite')
    print('Dim cities table created in parquet')

In [20]:
#spark = create_spark_session()
input_data = 'us-cities-demographics.csv'
output_data = "udacity_dend_capstone/"
process_cities_data(spark=spark, input_data=input_data, output_data=output_data)

Loading cities data...
Processing cities data...
Writing cities dimension table in parquet...
Dim cities table created in parquet


In [21]:
#read dim cities table 
table=spark.read.parquet("udacity_dend_capstone/dim_cities_demographics")
print(table.count())
table.show(3)

2891
+----------------+----------+----------+---------------+-----------------+---------------+------------+------------------+--------------------+-----+----------+
|            city|state_name|median_age|male_population|female_population|number_veterans|foreign_born|avg_household_size|                race|count|state_code|
+----------------+----------+----------+---------------+-----------------+---------------+------------+------------------+--------------------+-----+----------+
|Rancho Cucamonga|California|      34.5|          88127|            87105|           5821|       33878|              3.18|Black or African-...|24437|        CA|
|     West Covina|California|      39.8|          51629|            56860|           3800|       37038|              3.56|               Asian|32716|        CA|
|          Folsom|California|      40.9|          41051|            35317|           4187|       13234|              2.62|  Hispanic or Latino| 5822|        CA|
+----------------+----------+

In [4]:
def process_temperature_data(spark, input_data, output_data):
    """Extract world temperature data from provided folder in local machine, process it using Spark and 
    loads it into S3 as a dimension table in parquet format:
    @spark: spark session defined previously
    @input_data: location path for the dataset
    @output_data: S3 bucket name where to load processed output data
    """
        
    # Read in US cities data
    print('Loading temperature data...')
    df_temperature = spark.read.csv(input_data, header = 'True')
    df_temperature.createOrReplaceTempView("df_temperature")
    
    #Cleaning: remove missing data, unnecessary fields and dates and compute avg. statistics by date/city/country
    print('Processing temperature data...')
    df_temperature_clean = spark.sql("""
        SELECT 
            dt as date,
            City as city,
            Country as country,
            avg(AverageTemperature) as avg_temperature,
            avg(AverageTemperatureUncertainty) as avg_temperature_uncertainty
            --count(*)
        FROM df_temperature
        WHERE 1=1
            and dt >='1960-01-01'
            --and dt >= (select min(date) from dim_time) --make sure we cover all dates we have in the fact table
            and AverageTemperature is not null
        group by date, city, country
    """)

    
    dim_world_temperatures = df_temperature_clean
    
    # Write dim table into S3 in parquet format
    print('Writing temperature dimension table in parquet...')
    
    dim_world_temperatures.write.partitionBy("country").parquet((output_data+"dim_world_temperatures"),'overwrite')
    print('Dim temperatures table created in parquet')

In [6]:
spark = create_spark_session()
input_data = '../../data2/GlobalLandTemperaturesByCity.csv'
output_data = "udacity_dend_capstone/"
process_temperature_data(spark=spark, input_data=input_data, output_data=output_data)

Loading temperature data...
Processing temperature data...
Writing temperature dimension table in parquet...
Dim temperatures table created in parquet


In [7]:
#read dim temperatures table 
table=spark.read.parquet("udacity_dend_capstone/dim_world_temperatures")
print(table.count())
table.show(3)

2247991
+----------+------+---------------+---------------------------+-------+
|      date|  city|avg_temperature|avg_temperature_uncertainty|country|
+----------+------+---------------+---------------------------+-------+
|1979-04-01|Abohar|         28.851|        0.28300000000000003|  India|
|1983-03-01|Abohar|         20.208|                      0.179|  India|
|1991-06-01|Abohar|         35.069|                      0.469|  India|
+----------+------+---------------+---------------------------+-------+
only showing top 3 rows

