## Testing in notebook using files, stored locally and in S3 bucket in workspace.

In [22]:
import configparser
import os
import logging
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType , StructField, StructType, IntegerType, DoubleType, LongType
from pyspark.sql.functions import udf, col, lit, when, year, month, upper, to_date, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
# setup logging 
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [3]:
#AWS configuration
config = configparser.ConfigParser()
config.read('dwh.cfg', encoding='utf-8-sig')

['dwh.cfg']

In [4]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
# spark = SparkSession.builder\
#         .config("spark.jars.packages",\
#                 "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
#         .enableHiveSupport().getOrCreate()

In [5]:
def SAS_to_date_format(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
SAS_to_date_udf = udf(SAS_to_date_format, DateType())

In [6]:
def rename_columns(table, new_columns):
    for original, new in zip(table.columns, new_columns):
        table = table.withColumnRenamed(original, new)
    return table

In [8]:
input_data = 's3a://source-bucket-1726/'
output_data = 's3a://destination-bucket-1726/'

In [9]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [10]:
#Create Spark session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .enableHiveSupport() \
    .getOrCreate()
print("spark session created")

spark session created


#### Explore immigration data set

In [11]:
df_spark =spark.read.load('./sas_data')
df_spark.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [13]:
logging.info("Start processing immigration table")
# extracting columns to create immigration table
immigration_data = df_spark.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr',\
                             'arrdate', 'depdate', 'i94mode', 'i94visa').distinct()\
                     .withColumn("immigration_id", monotonically_increasing_id())

INFO:root:Start processing immigration table


In [14]:
immigration_data.show(5)

+------+------+------+-------+-------+-------+-------+-------+-------+--------------+
| cicid| i94yr|i94mon|i94port|i94addr|arrdate|depdate|i94mode|i94visa|immigration_id|
+------+------+------+-------+-------+-------+-------+-------+-------+--------------+
|  27.0|2016.0|   4.0|    BOS|     MA|20545.0|20549.0|    1.0|    1.0|             0|
| 233.0|2016.0|   4.0|    NYC|     NY|20545.0|20551.0|    1.0|    2.0|             1|
|1103.0|2016.0|   4.0|    NEW|     NY|20545.0|20553.0|    1.0|    2.0|             2|
|1123.0|2016.0|   4.0|    NEW|     PA|20545.0|20552.0|    1.0|    1.0|             3|
|1446.0|2016.0|   4.0|    NYC|     NY|20545.0|20551.0|    1.0|    2.0|             4|
+------+------+------+-------+-------+-------+-------+-------+-------+--------------+
only showing top 5 rows



In [15]:
#Renaming columns of immigration_data table
new_columns = ['cic_id', 'year', 'month', 'city_code', 'state_code','arrival_date', \
                   'departure_date', 'mode', 'visa']
immigration_data = rename_columns(immigration_data, new_columns)
immigration_data.show(5)

+------+------+-----+---------+----------+------------+--------------+----+----+--------------+
|cic_id|  year|month|city_code|state_code|arrival_date|departure_date|mode|visa|immigration_id|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+
|  27.0|2016.0|  4.0|      BOS|        MA|     20545.0|       20549.0| 1.0| 1.0|             0|
| 233.0|2016.0|  4.0|      NYC|        NY|     20545.0|       20551.0| 1.0| 2.0|             1|
|1103.0|2016.0|  4.0|      NEW|        NY|     20545.0|       20553.0| 1.0| 2.0|             2|
|1123.0|2016.0|  4.0|      NEW|        PA|     20545.0|       20552.0| 1.0| 1.0|             3|
|1446.0|2016.0|  4.0|      NYC|        NY|     20545.0|       20551.0| 1.0| 2.0|             4|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+
only showing top 5 rows



In [16]:
immigration_data = immigration_data.withColumn('country', lit('United States'))
immigration_data = immigration_data.withColumn('arrival_date', SAS_to_date_udf(col('arrival_date')))                            
immigration_data = immigration_data.withColumn('departure_date', SAS_to_date_udf(col('departure_date')))
logging.info("data wrangling completed with country as United States ")
immigration_data.show(5)

INFO:root:data wrangling completed with country as United States 


+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+
|cic_id|  year|month|city_code|state_code|arrival_date|departure_date|mode|visa|immigration_id|      country|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+
|  27.0|2016.0|  4.0|      BOS|        MA|  2016-04-01|    2016-04-05| 1.0| 1.0|             0|United States|
| 233.0|2016.0|  4.0|      NYC|        NY|  2016-04-01|    2016-04-07| 1.0| 2.0|             1|United States|
|1103.0|2016.0|  4.0|      NEW|        NY|  2016-04-01|    2016-04-09| 1.0| 2.0|             2|United States|
|1123.0|2016.0|  4.0|      NEW|        PA|  2016-04-01|    2016-04-08| 1.0| 1.0|             3|United States|
|1446.0|2016.0|  4.0|      NYC|        NY|  2016-04-01|    2016-04-07| 1.0| 2.0|             4|United States|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+
only showi

In [17]:
immigration_data.printSchema()

root
 |-- cic_id: double (nullable = true)
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- mode: double (nullable = true)
 |-- visa: double (nullable = true)
 |-- immigration_id: long (nullable = false)
 |-- country: string (nullable = false)



In [18]:
# write immigration table to parquet files partitioned by state_code
immigration_data.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(path= output_data + 'immigration_data')          
logging.info("Created parquet files from immigration table, partitioned by state")

INFO:root:Created parquet files from immigration table, partitioned by state


In [19]:
logging.info("Start processing immigration_personal")
# extract columns to create immigration_personal table
immigration_personal = df_spark.select('cicid', 'i94cit', 'i94res',\
                                'biryear', 'gender', 'insnum').distinct()\
                                .withColumn("immi_personal_id", monotonically_increasing_id())

INFO:root:Start processing immigration_personal


In [20]:
# data wrangling to match data model
new_columns = ['cic_id', 'citizen_country', 'residence_country',\
               'birth_year', 'gender', 'ins_num']
immigration_personal = rename_columns(immigration_personal, new_columns)
immigration_personal.show(5)

+---------+---------------+-----------------+----------+------+-------+----------------+
|   cic_id|citizen_country|residence_country|birth_year|gender|ins_num|immi_personal_id|
+---------+---------------+-----------------+----------+------+-------+----------------+
|5748907.0|          249.0|            249.0|    1954.0|     M|   null|               0|
|5748930.0|          249.0|            249.0|    1958.0|     F|   null|               1|
|5749074.0|          251.0|            135.0|    1977.0|     M|   null|               2|
|5749388.0|          251.0|            251.0|    1966.0|     F|   null|               3|
|5749881.0|          251.0|            251.0|    1961.0|     F|   null|               4|
+---------+---------------+-----------------+----------+------+-------+----------------+
only showing top 5 rows



In [21]:
# write immigration_personal table to parquet files
immigration_personal.write.mode("overwrite")\
                 .parquet(path = output_data + 'immigration_personal')
logging.info("Created parquet files from immigration_personal table.")

INFO:root:Created parquet files from immigration_personal table.


In [23]:
logging.info("Start processing immigration_airline")
# extract columns to create immigration_airline table
immigration_airline = df_spark.select('cicid', 'airline', 'admnum', 'fltno', 'visatype').distinct()\
                     .withColumn("immi_airline_id", monotonically_increasing_id())
immigration_airline.show(5)

INFO:root:Start processing immigration_airline


+---------+-------+---------------+-----+--------+---------------+
|    cicid|airline|         admnum|fltno|visatype|immi_airline_id|
+---------+-------+---------------+-----+--------+---------------+
|5749101.0|     DL| 9.492750593E10|00469|      B2|              0|
|5749130.0|     AA| 9.496847443E10|02572|      B2|              1|
|5749305.0|     UA| 9.500357363E10|00085|      B1|              2|
|5750351.0|     KE|5.9513939033E10|00035|      WB|              3|
|5750515.0|     AA|5.9555506133E10|00280|      WT|              4|
+---------+-------+---------------+-----+--------+---------------+
only showing top 5 rows



In [24]:
# Renaming columns of immigration_airline table
new_columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
immigration_airline = rename_columns(immigration_airline, new_columns)
immigration_airline.show(5)

+---------+-------+---------------+-------------+---------+---------------+
|   cic_id|airline|      admin_num|flight_number|visa_type|immi_airline_id|
+---------+-------+---------------+-------------+---------+---------------+
|5749101.0|     DL| 9.492750593E10|        00469|       B2|              0|
|5749130.0|     AA| 9.496847443E10|        02572|       B2|              1|
|5749305.0|     UA| 9.500357363E10|        00085|       B1|              2|
|5750351.0|     KE|5.9513939033E10|        00035|       WB|              3|
|5750515.0|     AA|5.9555506133E10|        00280|       WT|              4|
+---------+-------+---------------+-------------+---------+---------------+
only showing top 5 rows



In [25]:
immigration_airline.printSchema()

root
 |-- cic_id: double (nullable = true)
 |-- airline: string (nullable = true)
 |-- admin_num: double (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- immi_airline_id: long (nullable = false)



In [26]:
# write immigration_airline table to parquet files
immigration_airline.write.mode("overwrite")\
                .parquet(path=output_data + 'immigration_airline')
logging.info("Created parquet files from immigration_airline table.")

INFO:root:Created parquet files from immigration_airline table.


In [27]:
immigration_data.show(5)

+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+
|cic_id|  year|month|city_code|state_code|arrival_date|departure_date|mode|visa|immigration_id|      country|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+
|  27.0|2016.0|  4.0|      BOS|        MA|  2016-04-01|    2016-04-05| 1.0| 1.0|             0|United States|
| 233.0|2016.0|  4.0|      NYC|        NY|  2016-04-01|    2016-04-07| 1.0| 2.0|             1|United States|
|1103.0|2016.0|  4.0|      NEW|        NY|  2016-04-01|    2016-04-09| 1.0| 2.0|             2|United States|
|1123.0|2016.0|  4.0|      NEW|        PA|  2016-04-01|    2016-04-08| 1.0| 1.0|             3|United States|
|1446.0|2016.0|  4.0|      NYC|        NY|  2016-04-01|    2016-04-07| 1.0| 2.0|             4|United States|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+
only showi

In [28]:
#Processing for immigration_date season table
immigration_data = immigration_data.withColumn('arrival_month',month(immigration_data.arrival_date))
immigration_data = immigration_data.withColumn('arrival_year',year(immigration_data.arrival_date))
immigration_data = immigration_data.withColumn('arrival_day',dayofmonth(immigration_data.arrival_date))
immigration_data = immigration_data.withColumn('day_of_week',dayofweek(immigration_data.arrival_date))
immigration_data = immigration_data.withColumn('arrival_week_of_year',weekofyear(immigration_data.arrival_date))
immigration_data.show(5)

+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+-------------+------------+-----------+-----------+--------------------+
|cic_id|  year|month|city_code|state_code|arrival_date|departure_date|mode|visa|immigration_id|      country|arrival_month|arrival_year|arrival_day|day_of_week|arrival_week_of_year|
+------+------+-----+---------+----------+------------+--------------+----+----+--------------+-------------+-------------+------------+-----------+-----------+--------------------+
|  27.0|2016.0|  4.0|      BOS|        MA|  2016-04-01|    2016-04-05| 1.0| 1.0|             0|United States|            4|        2016|          1|          6|                  13|
| 233.0|2016.0|  4.0|      NYC|        NY|  2016-04-01|    2016-04-07| 1.0| 2.0|             1|United States|            4|        2016|          1|          6|                  13|
|1103.0|2016.0|  4.0|      NEW|        NY|  2016-04-01|    2016-04-09| 1.0| 2.0|          

In [29]:
immigration_date = immigration_data.select('arrival_date','arrival_month','day_of_week','arrival_year',\
                                           'arrival_day','arrival_week_of_year').dropDuplicates()
immigration_date.show(5)

+------------+-------------+-----------+------------+-----------+--------------------+
|arrival_date|arrival_month|day_of_week|arrival_year|arrival_day|arrival_week_of_year|
+------------+-------------+-----------+------------+-----------+--------------------+
|  2016-04-20|            4|          4|        2016|         20|                  16|
|  2016-04-04|            4|          2|        2016|          4|                  14|
|  2016-04-19|            4|          3|        2016|         19|                  16|
|  2016-04-11|            4|          2|        2016|         11|                  15|
|  2016-04-22|            4|          6|        2016|         22|                  16|
+------------+-------------+-----------+------------+-----------+--------------------+
only showing top 5 rows



In [30]:
immigration_date.printSchema()

root
 |-- arrival_date: date (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- arrival_week_of_year: integer (nullable = true)



In [31]:
# Create temporary sql table
immigration_date.createOrReplaceTempView("immigration_date")

In [32]:
# Add seasons to immigration_date dimension table
immigration_date_season = spark.sql('''SELECT arrival_date,
                         arrival_month,
                         day_of_week,
                         arrival_year,
                         arrival_day,
                         arrival_week_of_year,
                         CASE WHEN arrival_month IN (12, 1, 2) THEN 'winter' 
                                WHEN arrival_month IN (3, 4, 5) THEN 'spring' 
                                WHEN arrival_month IN (6, 7, 8) THEN 'summer' 
                                ELSE 'autumn' 
                         END AS date_season from immigration_date''')

In [33]:
immigration_date_season.show(5)

+------------+-------------+-----------+------------+-----------+--------------------+-----------+
|arrival_date|arrival_month|day_of_week|arrival_year|arrival_day|arrival_week_of_year|date_season|
+------------+-------------+-----------+------------+-----------+--------------------+-----------+
|  2016-04-20|            4|          4|        2016|         20|                  16|     spring|
|  2016-04-04|            4|          2|        2016|          4|                  14|     spring|
|  2016-04-19|            4|          3|        2016|         19|                  16|     spring|
|  2016-04-11|            4|          2|        2016|         11|                  15|     spring|
|  2016-04-22|            4|          6|        2016|         22|                  16|     spring|
+------------+-------------+-----------+------------+-----------+--------------------+-----------+
only showing top 5 rows



In [34]:
immigration_date_season.printSchema()

root
 |-- arrival_date: date (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- arrival_week_of_year: integer (nullable = true)
 |-- date_season: string (nullable = false)



In [35]:
# write immigration_date dimension table to parquet file partitioned by year and month
immigration_date_season.write.mode("overwrite")\
                        .partitionBy("arrival_year", "arrival_month").parquet(path= output_data + 'immigration_date')
logging.info("Created parquet files from immigration_date_season table.")

INFO:root:Created parquet files from immigration_date_season table.


#### Explore I94_SAS_Labels_Descriptions data set

In [36]:
logging.info("Start processing label descriptions")
label_file = os.path.join("./I94_SAS_Labels_Descriptions.SAS")
with open(label_file) as f:
    contents = f.readlines()

INFO:root:Start processing label descriptions


In [37]:
country_code = {}
for countries in contents[10:298]:
    set = countries.split('=')
    code, country = set[0].strip(), set[1].strip().strip("'")
    country_code[code] = country
country_code = spark.createDataFrame(country_code.items(), ['code', 'country'])

In [38]:
country_code.show(5)

+----+-----------+
|code|    country|
+----+-----------+
| 236|AFGHANISTAN|
| 101|    ALBANIA|
| 316|    ALGERIA|
| 102|    ANDORRA|
| 324|     ANGOLA|
+----+-----------+
only showing top 5 rows



In [39]:
country_code.write.mode("overwrite")\
     .parquet(path = output_data + 'country_code')
logging.info("Created country_code parquet files")

INFO:root:Created country_code parquet files


In [40]:
city_code = {}
for cities in contents[303:962]:
    set = cities.split('=')
    code, city = set[0].strip("\t").strip().strip("'"),\
                 set[1].strip('\t').strip().strip("''")
    city_code[code] = city
city_code = spark.createDataFrame(city_code.items(), ['code', 'city'])

In [41]:
city_code.show(5)

+----+--------------------+
|code|                city|
+----+--------------------+
| ANC|ANCHORAGE, AK    ...|
| BAR|BAKER AAF - BAKER...|
| DAC|DALTONS CACHE, AK...|
| PIZ|DEW STATION PT LA...|
| DTH|DUTCH HARBOR, AK ...|
+----+--------------------+
only showing top 5 rows



In [42]:
city_code.write.mode("overwrite")\
     .parquet(path = output_data + 'city_code')
logging.info("Created city_code parquet files")

INFO:root:Created city_code parquet files


In [43]:
state_code = {}
for states in contents[982:1036]:
    set = states.split('=')
    code, state = set[0].strip('\t').strip("'"), set[1].strip().strip("'")
    state_code[code] = state
state_code = spark.createDataFrame(state_code.items(), ['code', 'state'])

In [44]:
state_code.show(5)

+----+----------+
|code|     state|
+----+----------+
|  AK|    ALASKA|
|  AZ|   ARIZONA|
|  AR|  ARKANSAS|
|  CA|CALIFORNIA|
|  CO|  COLORADO|
+----+----------+
only showing top 5 rows



In [45]:
state_code.write.mode("overwrite")\
     .parquet(path = output_data + 'state_code')
logging.info("Created state_code parquet files")

INFO:root:Created state_code parquet files


#### Explore Temperature data set

In [46]:
logging.info("Start processing temperature_data")
# read temperature data file
temperature_data = os.path.join(input_data + 'GlobalLandTemperaturesByCity.csv')
df = spark.read.csv(temperature_data, header=True)
df.show(5)

INFO:root:Start processing temperature_data


+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [47]:
df = df.where(df['Country'] == 'United States')
temperature_data = df.select(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty',\
                     'City', 'Country']).distinct()
temperature_data.show(5)

+----------+------------------+-----------------------------+-------+-------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|
+----------+------------------+-----------------------------+-------+-------------+
|1821-06-01|            25.768|                        2.653|Abilene|United States|
|1830-11-01|            13.302|                        2.715|Abilene|United States|
|1836-11-01|             8.827|           2.1719999999999997|Abilene|United States|
|1846-07-01|28.258000000000003|           2.0069999999999997|Abilene|United States|
|1863-04-01|            18.553|           1.1740000000000002|Abilene|United States|
+----------+------------------+-----------------------------+-------+-------------+
only showing top 5 rows



In [48]:
new_columns = ['date', 'avg_temp', 'avg_temp_uncertainty', 'city', 'country']
temperature_data = rename_columns(temperature_data, new_columns)
temperature_data.show(5)

+----------+------------------+--------------------+-------+-------------+
|      date|          avg_temp|avg_temp_uncertainty|   city|      country|
+----------+------------------+--------------------+-------+-------------+
|1821-06-01|            25.768|               2.653|Abilene|United States|
|1830-11-01|            13.302|               2.715|Abilene|United States|
|1836-11-01|             8.827|  2.1719999999999997|Abilene|United States|
|1846-07-01|28.258000000000003|  2.0069999999999997|Abilene|United States|
|1863-04-01|            18.553|  1.1740000000000002|Abilene|United States|
+----------+------------------+--------------------+-------+-------------+
only showing top 5 rows



In [49]:
temperature_data = temperature_data.withColumn('date', to_date(col('date')))
temperature_data = temperature_data.withColumn('year', year(temperature_data['date']))
temperature_data = temperature_data.withColumn('month', month(temperature_data['date']))
temperature_data.show(5)

+----------+------------------+--------------------+-------+-------------+----+-----+
|      date|          avg_temp|avg_temp_uncertainty|   city|      country|year|month|
+----------+------------------+--------------------+-------+-------------+----+-----+
|1821-06-01|            25.768|               2.653|Abilene|United States|1821|    6|
|1830-11-01|            13.302|               2.715|Abilene|United States|1830|   11|
|1836-11-01|             8.827|  2.1719999999999997|Abilene|United States|1836|   11|
|1846-07-01|28.258000000000003|  2.0069999999999997|Abilene|United States|1846|    7|
|1863-04-01|            18.553|  1.1740000000000002|Abilene|United States|1863|    4|
+----------+------------------+--------------------+-------+-------------+----+-----+
only showing top 5 rows



In [50]:
temperature_data.printSchema()

root
 |-- date: date (nullable = true)
 |-- avg_temp: string (nullable = true)
 |-- avg_temp_uncertainty: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [51]:
# write temperature_data table to parquet files
temperature_data.write.mode("overwrite")\
               .parquet(path=output_data + 'temperature_data')
logging.info("Created temperature_data parquet files")

INFO:root:Created temperature_data parquet files


In [52]:
logging.info("Start processing demographics_data")
# read demographics_data file
demographics_data = os.path.join(input_data + 'us-cities-demographics.csv')
df = spark.read.format('csv').options(header=True, delimiter=';').load(demographics_data)
df.show(5)

INFO:root:Start processing demographics_data


+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [53]:
demographics_population = df.select(['City', 'State', 'Male Population', 'Female Population', \
                          'Number of Veterans', 'Foreign-born', 'Race']).distinct().na.drop() \
                          .withColumn("demog_pop_id", monotonically_increasing_id())
demographics_population.show(5)

+-------------+----------+---------------+-----------------+------------------+------------+------------------+------------+
|         City|     State|Male Population|Female Population|Number of Veterans|Foreign-born|              Race|demog_pop_id|
+-------------+----------+---------------+-----------------+------------------+------------+------------------+------------+
| Saint Joseph|  Missouri|          37688|            38408|              5846|        3755|             Asian|           0|
| Fort Collins|  Colorado|          80893|            80288|              8425|        9704|Hispanic or Latino|           1|
|        Nampa|     Idaho|          45651|            44199|              4736|        6607|             White|           2|
|        Davis|California|          33493|            34163|              2176|       13997|Hispanic or Latino|           3|
|Redondo Beach|California|          34855|            33330|              3014|       13536|Hispanic or Latino|           4|


In [54]:
new_columns = ['city', 'state', 'male_population', 'female_population', \
               'num_of_vetarans', 'foreign_born', 'race']
demographics_population = rename_columns(demographics_population, new_columns)
demographics_population.show(5)

+-------------+----------+---------------+-----------------+---------------+------------+------------------+------------+
|         city|     state|male_population|female_population|num_of_vetarans|foreign_born|              race|demog_pop_id|
+-------------+----------+---------------+-----------------+---------------+------------+------------------+------------+
| Saint Joseph|  Missouri|          37688|            38408|           5846|        3755|             Asian|           0|
| Fort Collins|  Colorado|          80893|            80288|           8425|        9704|Hispanic or Latino|           1|
|        Nampa|     Idaho|          45651|            44199|           4736|        6607|             White|           2|
|        Davis|California|          33493|            34163|           2176|       13997|Hispanic or Latino|           3|
|Redondo Beach|California|          34855|            33330|           3014|       13536|Hispanic or Latino|           4|
+-------------+---------

In [55]:
demographics_population.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- num_of_vetarans: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- race: string (nullable = true)
 |-- demog_pop_id: long (nullable = false)



In [56]:
# write demographics_population table to parquet files
demographics_population.write.mode("overwrite")\
                    .parquet(path=output_data + 'demographics_population')
logging.info("Created demographics_population parquet files")

INFO:root:Created demographics_population parquet files


In [57]:
logging.info("Start processing demographics_stats")
demographics_stats = df.select(['City', 'State', 'Median Age', 'Average Household Size'])\
                         .distinct().na.drop()\
                         .withColumn("demog_stat_id", monotonically_increasing_id())
demographics_stats.show(5)

INFO:root:Start processing demographics_stats


+---------------+----------+----------+----------------------+-------------+
|           City|     State|Median Age|Average Household Size|demog_stat_id|
+---------------+----------+----------+----------------------+-------------+
|       Milpitas|California|      36.8|                  3.32|            0|
|Rochester Hills|  Michigan|      41.2|                  2.66|            1|
|     Buena Park|California|      35.7|                  3.55|            2|
|      Daly City|California|      39.7|                  3.26|   8589934592|
|       Longview|     Texas|      36.8|                  2.55|  17179869184|
+---------------+----------+----------+----------------------+-------------+
only showing top 5 rows



In [58]:
 # Renaming columns of demographics_stats table
new_columns = ['city', 'state', 'median_age', 'avg_household_size']
demographics_stats = rename_columns(demographics_stats, new_columns)
demographics_stats.show(5)

+---------------+----------+----------+------------------+-------------+
|           city|     state|median_age|avg_household_size|demog_stat_id|
+---------------+----------+----------+------------------+-------------+
|       Milpitas|California|      36.8|              3.32|            0|
|Rochester Hills|  Michigan|      41.2|              2.66|            1|
|     Buena Park|California|      35.7|              3.55|            2|
|      Daly City|California|      39.7|              3.26|   8589934592|
|       Longview|     Texas|      36.8|              2.55|  17179869184|
+---------------+----------+----------+------------------+-------------+
only showing top 5 rows



In [59]:
# Converting all data in city and state columns to Uppercase.
demographics_stats = demographics_stats.withColumn('city', upper(col('city')))
demographics_stats = demographics_stats.withColumn('state', upper(col('state')))
demographics_stats.show(5)

+---------------+----------+----------+------------------+-------------+
|           city|     state|median_age|avg_household_size|demog_stat_id|
+---------------+----------+----------+------------------+-------------+
|       MILPITAS|CALIFORNIA|      36.8|              3.32|            0|
|ROCHESTER HILLS|  MICHIGAN|      41.2|              2.66|            1|
|     BUENA PARK|CALIFORNIA|      35.7|              3.55|            2|
|      DALY CITY|CALIFORNIA|      39.7|              3.26|   8589934592|
|       LONGVIEW|     TEXAS|      36.8|              2.55|  17179869184|
+---------------+----------+----------+------------------+-------------+
only showing top 5 rows



In [60]:
demographics_stats.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- avg_household_size: string (nullable = true)
 |-- demog_stat_id: long (nullable = false)



In [61]:
# write demographics_stats table to parquet files
demographics_stats.write.mode("overwrite")\
                    .parquet(path = output_data + 'demographics_stats')
logging.info("Created demographics_stats parquet files")

INFO:root:Created demographics_stats parquet files


## Data Quality Checks

#### 1. Data schema of every dimensional table matches data model

In [62]:
from pathlib import Path

In [63]:
path = output_data
s3_bucket = Path(path)

In [64]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        print("Table: " + path.split('/')[-1])
        schema = df.printSchema()

Table: immigration_airline
root
 |-- cic_id: double (nullable = true)
 |-- airline: string (nullable = true)
 |-- admin_num: double (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- immi_airline_id: long (nullable = true)

Table: city_code
root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)

Table: temperature_data
root
 |-- date: date (nullable = true)
 |-- avg_temp: string (nullable = true)
 |-- avg_temp_uncertainty: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

Table: demographics_population
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- num_of_vetarans: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- race: string (nu

#### 2. Tables are not empty after running ETL data pipeline

In [65]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + path.split('/')[-1] + f" is not empty. It contains total {record_num} records.")

Table: immigration_airline is not empty. It contains total 3096313 records.
Table: city_code is not empty. It contains total 659 records.
Table: temperature_data is not empty. It contains total 687004 records.
Table: demographics_population is not empty. It contains total 2875 records.
Table: immigration_date is not empty. It contains total 30 records.
Table: immigration_personal is not empty. It contains total 3096313 records.
Table: country_code is not empty. It contains total 288 records.
Table: state_code is not empty. It contains total 54 records.
Table: demographics_stats is not empty. It contains total 588 records.
Table: immigration is not empty. It contains total 3096313 records.


In [66]:
#Let's check some things in our data
immigration_data.createOrReplaceTempView("fact_immigration_data")
print("fact_immigration_data view created")
immigration_personal.createOrReplaceTempView("dim_immigration_personal")
print("dim_immigration_personal view created")
immigration_airline.createOrReplaceTempView("dim_immigration_airline")
print("dim_immigration_airline view created")
immigration_date_season.createOrReplaceTempView("dim_immigration_date_season")
print("dim_immigration_date_season view created")
country_code.createOrReplaceTempView("country_code")
print("country_code view created")
city_code.createOrReplaceTempView("city_code")
print("city_code view created")
state_code.createOrReplaceTempView("state_code")
print("state_code view created")
temperature_data.createOrReplaceTempView("dim_temperature_data")
print("dim_temperature_data view created")
demographics_population.createOrReplaceTempView("dim_demographics_population")
print("dim_demographics_population view created")
demographics_stats.createOrReplaceTempView("dim_demographics_stats")
print("dim_demographics_stats view created")

fact_immigration_data view created
dim_immigration_personal view created
dim_immigration_airline view created
dim_immigration_date_season view created
country_code view created
city_code view created
state_code view created
dim_temperature_data view created
dim_demographics_population view created
dim_demographics_stats view created


In [67]:
# we define the following function to check for null values
def nullValueIdentify(spark_context, tables_to_check):
    """
    This function performs null value checks on specific columns of given tables received as parameters and raises a ValueError exception when null values are encountered.
    It receives the following parameters:
    spark_context : spark context where the data quality check is to be performed
    tables_to_check: A dictionary containing (table, columns) pairs specifying for each table, which column is to be checked for null values.   
    """  
    for table in tables_to_check:
        print(f"Performing data quality check on table {table}.")
        for column in tables_to_check[table]:
            returnedVal = spark_context.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed quality check.")

In [68]:
#dictionary of tables and columns to be checked
tables_to_check = { 'fact_immigration_data' : ['cic_id'], 'dim_immigration_personal':['residence_country'],
                   'dim_immigration_airline': ['visa_type'], 'country_code':['code','country'] ,
                    'city_code':['code','city'] ,'state_code':['code','state'] ,'dim_temperature_data':['date','city'] ,
                   'dim_demographics_population':['city','male_population'] ,'dim_demographics_stats':['city','avg_household_size']
                  }

#We call our function on the spark context
nullValueIdentify(spark, tables_to_check)

Performing data quality check on table fact_immigration_data...
Table fact_immigration_data passed quality check.
Performing data quality check on table dim_immigration_personal...
Table dim_immigration_personal passed quality check.
Performing data quality check on table dim_immigration_airline...
Table dim_immigration_airline passed quality check.
Performing data quality check on table country_code...
Table country_code passed quality check.
Performing data quality check on table city_code...
Table city_code passed quality check.
Performing data quality check on table state_code...
Table state_code passed quality check.
Performing data quality check on table dim_temperature_data...
Table dim_temperature_data passed quality check.
Performing data quality check on table dim_demographics_population...
Table dim_demographics_population passed quality check.
Performing data quality check on table dim_demographics_stats...
Table dim_demographics_stats passed quality check.


In [70]:
#To identify the distinct combinations of city and state in our 'fact_immigration_data' table
spark.sql("""
SELECT COUNT(DISTINCT city_code, state_code) as Distinct_city_state
FROM fact_immigration_data
""").show()

+-------------------+
|Distinct_city_state|
+-------------------+
|               6408|
+-------------------+

