In [1]:
# Do all imports and installs here
from pyspark.sql import SparkSession 
import configparser 
# CONFIG
# config = configparser.ConfigParser()
# config.read('config.cfg')

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? 

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in the data here 
#1 Temperature  
f_temperature = '../../data2/GlobalLandTemperaturesByCity.csv'
#2 Immigration  
f_immigration_csv = 'immigration_data_sample.csv' 
f_immigration_sas = "sas_data/part-00013-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet"
f_immigration_folder = "./sas_data"
#3 Airpot 
f_airport = 'airport-codes_csv.csv'
#4 cities 
f_cities = 'us-cities-demographics.csv' 
#5 I94_SAS_Labels_Descriptions.SAS 
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()
 

In [3]:
df_immigration = spark.read.parquet(f_immigration_sas) 
df_immigration.show(10)
print(df_immigration.count())


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [4]:
df_temperature = spark.read.option("header", "true").csv(f_temperature)  
df_temperature.show(10)
print(df_temperature.count()) 

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

#### Airport table  

In [5]:
df_airport = spark.read.option("header", "true").csv(f_airport)  
df_airport.show(10)
df_airport.createOrReplaceTempView("airport") 
print(df_airport.count()) 

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [10]:
#### Cities Table 

In [6]:
df_cities = spark.read.option("header", "true").option("sep", ";").csv(f_cities)  
df_cities.show(10)
print(df_cities.count()) 
df_cities.createOrReplaceTempView("cities") 

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
####Cities
1. Field name has space `State Code`, use Backticks  ` ` select 
2. One city could has multiple race 

In [48]:

sqlstr = "select City, State, cast(`State Code` as varchar(10) ) State_Code, Race, Count(*) count from cities\
        group by City, State,`State Code`, Race\
        order by count desc "

df_cities = spark.sql(sqlstr)
print(df_cities.count())
df_cities.show(49)

# sqlstr = "select City, State, Count(*) count from cities\
#         group by City, State \
#         order by count desc "
# df_cities = spark.sql(sqlstr)
# print(df_cities.count())

# sqlstr = "select * from cities where city='Austin' " 
# df_cities = spark.sql(sqlstr)
# print(df_cities.count())
# df_cities.show(10)


2891
+----------------+--------------+----------+--------------------+-----+
|            City|         State|State_Code|                Race|count|
+----------------+--------------+----------+--------------------+-----+
|          Quincy| Massachusetts|        MA|               White|    1|
|      Wilmington|North Carolina|        NC|               Asian|    1|
|           Tampa|       Florida|        FL|  Hispanic or Latino|    1|
|        Gastonia|North Carolina|        NC|               Asian|    1|
|           Tyler|         Texas|        TX|American Indian a...|    1|
|          Rialto|    California|        CA|Black or African-...|    1|
|           Sandy|          Utah|        UT|American Indian a...|    1|
|    Arden-Arcade|    California|        CA|Black or African-...|    1|
|          Upland|    California|        CA|Black or African-...|    1|
|      Cape Coral|       Florida|        FL|Black or African-...|    1|
|   Coral Springs|       Florida|        FL|American Indian

In [None]:
#### Airport 
local_code = PK

In [9]:

#checking duplicate
sqlstr = "select local_code, name, SPLIT(iso_region,'-')[1] as state_code, type, municipality, count(*) count from airport\
 where  municipality is not null and iso_country ='US' and local_code is not null \
 group by local_code, name, SPLIT(iso_region,'-')[1], type, municipality\
 order by count desc" 
df_airport = spark.sql(sqlstr) 
print(df_airport.count()) 
df_airport.show(10) 

21213
+----------+--------------------+----------+-------------+-------------+-----+
|local_code|                name|state_code|         type| municipality|count|
+----------+--------------------+----------+-------------+-------------+-----+
|       MNG|Montana ARNG Heli...|        MT|       closed|     Billings|    2|
|       EDC|Austin Executive ...|        TX|small_airport|       Austin|    2|
|      08AK|      Fisher Airport|        AK|small_airport|     Big Lake|    1|
|      0CA5|Hoffman Private A...|        CA|small_airport| Santa Ysabel|    1|
|      0AZ1|        Taylor Field|        AZ|small_airport|       Marana|    1|
|      0XS9|        French Field|        TX|small_airport|      Bullard|    1|
|      0IA0|Knoxville Area Co...|        IA|     heliport|    Knoxville|    1|
|       0N6|Albanna Aviation ...|        DE|small_airport|       Felton|    1|
|      0TS7|    Flying U Airport|        TX|small_airport|Mineral Wells|    1|
|      11MA|    Bulljump Airport|        MA|sm

In [29]:
 #### Temperature
    

In [10]:
df_temperature.createOrReplaceTempView("temperature")   
df_temperature.columns
#checking duplicate dataa 
sqlstr = "select City, Country, avg(AverageTemperature) AverageTemperature, count(*) count\
 from temperature \
 where country='United States' and AverageTemperature is not null  \
 group by City, Country \
 order by count desc" 
df_temperature = spark.sql(sqlstr) 
print(df_temperature.count())
df_temperature.show(10)


248
+------------+-------------+------------------+-----+
|        City|      Country|AverageTemperature|count|
+------------+-------------+------------------+-----+
| Springfield|United States|10.647931343609901| 9147|
|    Columbus|United States|14.017228598909899| 6238|
|      Aurora|United States| 9.423826269638306| 5474|
|   Arlington|United States|14.542532329169752| 5444|
|      Peoria|United States|14.423363446969729| 5280|
|    Richmond|United States|13.971595172684463| 5096|
|    Pasadena|United States| 18.22948024174803| 4302|
|    Glendale|United States| 18.56516010689992| 4116|
|     Buffalo|United States|7.7269057624960205| 3141|
|Cedar Rapids|United States| 7.975768545049324| 3141|
+------------+-------------+------------------+-----+
only showing top 10 rows



In [97]:
####immigration table
convert dataype using cast command 
i94port =>airport.local_code 

In [12]:
df_immigration = spark.read.parquet(f_immigration_folder)   
df_immigration.createOrReplaceTempView("immigration")  
sqlstr = "select cast(i94cit as int) i94cit, cast(i94res as int) , i94port, i94addr, cast(biryear as int), \
    gender, cast(i94visa as int), visatype, cast(I94BIR as int), count(*) count\
    from immigration \
    where i94addr is not null \
    group by i94cit, i94res, i94port, i94addr, biryear, gender, i94visa, visatype, I94BIR\
    order by count desc" 
df_immigration = spark.sql(sqlstr)  
print(df_immigration.count())
df_immigration.show(10)


 

1102499
+------+------+-------+-------+-------+------+-------+--------+------+-----+
|i94cit|i94res|i94port|i94addr|biryear|gender|i94visa|visatype|I94BIR|count|
+------+------+-------+-------+-------+------+-------+--------+------+-----+
|   209|   209|    HHW|     HI|   1988|     F|      2|      WT|    28| 1141|
|   209|   209|    HHW|     HI|   1986|     F|      2|      WT|    30| 1123|
|   209|   209|    HHW|     HI|   1987|     F|      2|      WT|    29| 1069|
|   209|   209|    HHW|     HI|   1989|     F|      2|      WT|    27| 1057|
|   209|   209|    HHW|     HI|   1985|     F|      2|      WT|    31| 1019|
|   209|   209|    HHW|     HI|   1984|     F|      2|      WT|    32|  982|
|   209|   209|    HHW|     HI|   1990|     F|      2|      WT|    26|  968|
|   209|   209|    HHW|     HI|   1983|     F|      2|      WT|    33|  872|
|   209|   209|    HHW|     HI|   1986|     M|      2|      WT|    30|  815|
|   209|   209|    HHW|     HI|   1987|     M|      2|      WT|    2

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [28]:
from datetime import datetime
dateTimeObj = datetime.now()
dateStr = f"{dateTimeObj.year}{dateTimeObj.month}{dateTimeObj.day}"  
df_immigration.write.mode('overwrite').parquet(f"model_immigration/{dateStr}")


#### Dimmension table : 
airport, cities, temperature tables

In [33]:
df_airport.write.mode('overwrite').parquet("model_airport")
df_temperature.write.mode('overwrite').parquet("model_temperature")
df_cities.write.mode('overwrite').parquet("model_cities")
 

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

In [55]:
def fn_zero(dataframe): 
    print (dataframe.count() ==0)

 
qa_airport = spark.read.parquet("model_airport")
qa_cities = spark.read.parquet("model_cities")
qa_temperature = spark.read.parquet("model_temperature")
qa_immigration = spark.read.parquet(f"model_immigration/{dateStr}")
tables =  [ qa_airport, qa_cities, qa_temperature, qa_immigration ] 

for tableObj in tables:
    fn_zero(tableObj)
    

 

False
False
False
False


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### mode path
model_immigration/YYYYMMDD/__.parquet
model_airport/___.parquet
model_cities/___.parquet
modde_temperature/___.parquet



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
-The database needed to be accessed by 100+ people.

In [None]:
-The data populates a dashboard that must be updated on a daily basis by 7am every day.
airflow schedule 

In [None]:
-The data was increased by 100x.
if data source is on files system, I would like use COPY command insert into REDSHIFT 