In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2") \
        .enableHiveSupport()\
        .getOrCreate()

## I94 Data

In [3]:
df_i94 = spark.read.format("com.github.saurfang.sas.spark")\
    .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [9]:
df_i94.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [13]:
# check number of rows in the file
rows = df_i94.count()

In [14]:
rows

3096313

In [16]:
# check percentage of null columns
for col in df_i94.columns:
    print(f"The percentage of NULL value in {col} is :", df_i94.filter(df_i94[col].isNull()).count() / rows)

The percentage of NULL value in cicid is : 0.0
The percentage of NULL value in i94yr is : 0.0
The percentage of NULL value in i94mon is : 0.0
The percentage of NULL value in i94cit is : 0.0
The percentage of NULL value in i94res is : 0.0
The percentage of NULL value in i94port is : 0.0
The percentage of NULL value in arrdate is : 0.0
The percentage of NULL value in i94mode is : 7.718857880324114e-05
The percentage of NULL value in i94addr is : 0.04928183940060323
The percentage of NULL value in depdate is : 0.04600859150867499
The percentage of NULL value in i94bir is : 0.00025901774142342846
The percentage of NULL value in i94visa is : 0.0
The percentage of NULL value in count is : 0.0
The percentage of NULL value in dtadfile is : 3.2296476486711774e-07
The percentage of NULL value in visapost is : 0.6075774639062653
The percentage of NULL value in occup is : 0.9973755883206898
The percentage of NULL value in entdepa is : 7.686561403837402e-05
The percentage of NULL value in entdepd i

Column `occup`, `entdepu` and `insum` had over 90% NULL values. They should not be used in the analysis.

In [17]:
df_immigrant = df_i94.selectExpr('cast(cicid as int)', 'cast(i94yr as int) as entry_year', 'cast(i94mon as int) as entry_month', 'cast(i94res as int) as origin_country_code', 
             'i94port as entry_port_code', 'cast(i94mode as int)', 'i94bir as age', 'gender', 'visatype')

In [18]:
df_immigrant.show(5)

+-----+----------+-----------+-------------------+---------------+-------+----+------+--------+
|cicid|entry_year|entry_month|origin_country_code|entry_port_code|i94mode| age|gender|visatype|
+-----+----------+-----------+-------------------+---------------+-------+----+------+--------+
|    6|      2016|          4|                692|            XXX|   null|37.0|  null|      B2|
|    7|      2016|          4|                276|            ATL|      1|25.0|     M|      F1|
|   15|      2016|          4|                101|            WAS|      1|55.0|     M|      B2|
|   16|      2016|          4|                101|            NYC|      1|28.0|  null|      B2|
|   17|      2016|          4|                101|            NYC|      1| 4.0|  null|      B2|
+-----+----------+-----------+-------------------+---------------+-------+----+------+--------+
only showing top 5 rows



### US City Monthly Temperature

In [19]:
df_temp = spark.read.format('csv').load('../../data2/GlobalLandTemperaturesByCity.csv', header=True, inferSchema=True)

In [20]:
df_temp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [21]:
df_temp.show(5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [24]:
df_temp.select('Country').distinct().show()

+-----------+
|    Country|
+-----------+
|       Chad|
|     Russia|
|   Paraguay|
|      Yemen|
|    Senegal|
|     Sweden|
|     Guyana|
|      Burma|
|Philippines|
|    Eritrea|
|   Djibouti|
|   Malaysia|
|  Singapore|
|     Turkey|
|     Malawi|
|       Iraq|
|    Germany|
|Afghanistan|
|   Cambodia|
|     Jordan|
+-----------+
only showing top 20 rows



This data set contains temperature from each country. We only need `United States` for our analysis. Need to do the filter to only select US cities.

In [18]:
df_temp.filter("Country = 'United States'").selectExpr('dt as date', 'AverageTemperature as avg_temp', 
                                                       'AverageTemperatureUncertainty as temp_std', 'city as city',
                                                      'Country as country').show(5)

+-------------------+------------------+--------+-------+-------------+
|               date|          avg_temp|temp_std|   city|      country|
+-------------------+------------------+--------+-------+-------------+
|1820-01-01 00:00:00|2.1010000000000004|   3.217|Abilene|United States|
|1820-02-01 00:00:00|             6.926|   2.853|Abilene|United States|
|1820-03-01 00:00:00|            10.767|   2.395|Abilene|United States|
|1820-04-01 00:00:00|17.988999999999994|   2.202|Abilene|United States|
|1820-05-01 00:00:00|            21.809|   2.036|Abilene|United States|
+-------------------+------------------+--------+-------+-------------+
only showing top 5 rows



### City Airport

In [3]:
df_air = spark.read.format('csv').load('airport-codes_csv.csv', header=True, inferSchema=True)

In [4]:
df_air.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [5]:
df_air.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [6]:
df_air_p = df_air.select("*").toPandas()

In [11]:
# check how many countries are listed in the data set
df_air_p.iso_country.unique()

array(['US', 'PR', 'MH', 'MP', 'GU', 'SO', 'AQ', 'GB', 'PG', 'AD', 'SD',
       'SA', 'AE', 'SS', 'ES', 'CN', 'AF', 'LK', 'SB', 'CO', 'AU', 'MG',
       'TD', 'AL', 'AM', 'MX', 'MZ', 'PW', 'NR', 'AO', 'AR', 'AS', 'AT',
       'ZZ', 'GA', 'AZ', 'BA', 'BB', 'BE', 'DE', 'BF', 'BG', 'GL', 'BH',
       'BI', 'IS', 'BJ', 'OM', 'XK', 'BM', 'KE', 'PH', 'BO', 'BR', 'BS',
       'CV', 'BW', 'FJ', 'BY', 'UA', 'LR', 'BZ', 'CA', 'CD', 'CF', 'CG',
       'MR', 'CH', 'CL', 'CM', 'MA', 'CR', 'CU', 'CY', 'CZ', 'SK', 'PA',
       'DZ', 'ID', 'GH', 'RU', 'CI', 'DK', 'NG', 'DO', 'NE', 'HR', 'TN',
       'TG', 'EC', 'EE', 'FI', 'EG', 'GG', 'JE', 'IM', 'FK', 'EH', 'NL',
       'IE', 'FO', 'LU', 'NO', 'PL', 'ER', 'MN', 'PT', 'SE', 'ET', 'LV',
       'LT', 'ZA', 'SZ', 'GQ', 'SH', 'MU', 'IO', 'ZM', 'FM', 'KM', 'YT',
       'RE', 'TF', 'ST', 'FR', 'SC', 'ZW', 'MW', 'LS', 'NA', 'ML', 'GM',
       'GE', 'GF', 'SL', 'GW', 'GN', 'SN', 'GR', 'GT', 'TZ', 'GY', 'SR',
       'DJ', 'HK', 'LY', 'HN', 'VN', 'KZ', 'RW', 'H

In [15]:
# check whether each combination of iso_region and municipality has multiple rows 
df_air.filter("iso_country='US'").groupby(df_air.iso_region, df_air.municipality).count().show()

+----------+--------------------+-----+
|iso_region|        municipality|count|
+----------+--------------------+-----+
|     US-CA|         Pine Valley|    2|
|     US-TX|              Austin|   25|
|     US-ID|                Kuna|    4|
|     US-OR|         Eagle Point|    4|
|     US-PA|      Fairless Hills|    1|
|     US-AR|         Springfield|    1|
|     US-IL|             Donovan|    2|
|     US-AK|Fairbanks /Ft Wai...|    2|
|     US-AZ|              Mcneal|    1|
|     US-CA|            Torrance|    4|
|     US-IL|               Flora|    2|
|     US-NY|           Esperance|    3|
|     US-MS|           Tylertown|    2|
|     US-OK|             Shawnee|    4|
|     US-TX|           Kingsland|    3|
|     US-NJ|               Salem|    5|
|     US-NY|             Kendall|    1|
|     US-VA|              Norton|    3|
|     US-KS|           Mc Donald|    2|
|     US-OR|             Dillard|    1|
+----------+--------------------+-----+
only showing top 20 rows



In [12]:
# each city can have multiple rows due to different airport types
df_air_p[(df_air_p['iso_region']=='US-TX') & (df_air_p['municipality']=='Austin')]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
296,04TA,heliport,Capitol National Bank Building Heliport,540.0,,US,US-TX,Austin,04TA,,04TA,"-97.7446975708, 30.271041666699997"
1712,18TS,heliport,Brackenridge Hospital Heliport,519.0,,US,US-TX,Austin,18TS,,18TS,"-97.73419952392578, 30.273799896240234"
1766,19TS,heliport,Kvue-Tv Heliport,780.0,,US,US-TX,Austin,19TS,,19TS,"-97.73729705810547, 32.3651008605957"
2375,1XS5,heliport,Austin Diagnostic Medical Center Heliport,720.0,,US,US-TX,Austin,1XS5,,1XS5,"-97.7074966430664, 30.412200927734375"
2546,22TX,heliport,South Austin Medical Center Heliport,687.0,,US,US-TX,Austin,22TX,,22TX,"-97.773889, 30.225556"
3783,32TS,heliport,Seton Medical Center H-4 Heliport,582.0,,US,US-TX,Austin,32TS,,32TS,"-97.74579620361328, 30.304399490356445"
4712,3XA6,heliport,Dell Children'S Medical Center Heliport,644.0,,US,US-TX,Austin,3XA6,,3XA6,"-97.708, 30.304667"
5651,4TS9,heliport,Ossiport Heliport,818.0,,US,US-TX,Austin,4TS9,,4TS9,"-97.8677978515625, 30.32379913330078"
7555,6R4,closed,Bird's Nest Airport,615.0,,US,US-TX,Austin,6R4,,6R4,"-97.56729888919999, 30.3966007233"
8528,7TX2,heliport,Falcon's Nest Heliport,900.0,,US,US-TX,Austin,7TX2,,7TX2,"-97.87670135498047, 30.419200897216797"


In [21]:
df_air.createOrReplaceTempView("airport_table")

In [78]:
df_air_transformed = spark.sql("""
SELECT iso_country, RIGHT(iso_region, 2) as state, municipality as city, 
        SUM(CASE WHEN type='small_airport' THEN 1 ELSE 0 END) AS small_airport,
        SUM(CASE WHEN type='medium_airport' THEN 1 ELSE 0 END) AS medium_airport,
        SUM(CASE WHEN type='large_airport' THEN 1 ELSE 0 END) AS large_airport
FROM airport_table
WHERE iso_country = 'US' AND iata_code IS NOT NULL
GROUP BY iso_country, iso_region, municipality
""")

In [79]:
df_air_transformed.show(5)

+-----------+-----+--------------------+-------------+--------------+-------------+
|iso_country|state|                city|small_airport|medium_airport|large_airport|
+-----------+-----+--------------------+-------------+--------------+-------------+
|         US|   WA|Burlington/Mount ...|            1|             0|            0|
|         US|   KS|               Colby|            1|             0|            0|
|         US|   OR|           La Grande|            1|             0|            0|
|         US|   VA|            Dahlgren|            1|             0|            0|
|         US|   AZ|          Scottsdale|            1|             0|            0|
+-----------+-----+--------------------+-------------+--------------+-------------+
only showing top 5 rows



### US City Demographics

In [16]:
df_demo = spark.read.format('csv').load('us-cities-demographics.csv', header=True, inferSchema=True, sep=';')

In [17]:
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [18]:
df_demo.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [19]:
df_demo_p = df_demo.toPandas()

In [20]:
df_demo_p.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [21]:
df_demo_p.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [23]:
df_demo_p.shape

(2891, 12)

In [33]:
df_demo.createOrReplaceTempView("demo_table")

In [45]:
df_demo_transformed = spark.sql("""
    SELECT DISTINCT City AS city, State As state, `Median Age` AS median_age, `Male Population`/`Total Population` AS male_pct,
        `Female Population`/`Total Population` AS female_pct, `Number of Veterans`/`Total Population` AS veteran_pct,
        `Foreign-born`/`Total Population` AS foreigner_pct
    FROM demo_table
""")

In [46]:
df_demo_transformed.show(5)

+---------------+-----------+----------+------------------+------------------+-------------------+-------------------+
|           city|      state|median_age|          male_pct|        female_pct|        veteran_pct|      foreigner_pct|
+---------------+-----------+----------+------------------+------------------+-------------------+-------------------+
|       Carolina|Puerto Rico|      42.0|0.4558303886925795|0.5441696113074205|               null|               null|
|           Ames|       Iowa|      23.0|0.5197995449794011|0.4802004550205989|0.03481829920678842|0.13229416466826538|
|Highlands Ranch|   Colorado|      39.6|0.4800179570007905|0.5199820429992095|0.04723471947065885|0.08614480759659207|
|          Salem|     Oregon|      35.4| 0.491294069027549| 0.508705930972451|0.05885999404411005|0.11953106482803887|
|        Fishers|    Indiana|      34.5|0.4898397834346063|0.5101602165653937|0.02868562001138595|0.07131321815710286|
+---------------+-----------+----------+--------