# Udacity Project Casptone

###### This juperter notebook process dimension data and certain fact data that should only be uploaded once.
###### Only the final steps of the data processing were shown, and the intermediate, data-exploring steps are not included.
###### This project assumes the names of data source files, the formats, and the type of each field remain the same.

#### Import modules

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import asc, col, lit, udf
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import (StructType,
                              StructField,
                              StringType,
                              IntegerType,
                              DoubleType,
                              DateType)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1577482778537_0015,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create an instance of SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Capstone") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.8.5") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Specify input and output folder here

In [4]:
input_folder = "s3://joezcrmdb/data_source/"
output_folder = "s3://joezcrmdb/data/final/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Common Dimension: cities

#### Create custom schema, read cities data, and select certain columns

In [17]:
cities_schema = StructType([
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("median_age", DoubleType()),
    StructField("male_population", IntegerType()),
    StructField("female_population", IntegerType()),
    StructField("total_population", IntegerType()),
    StructField("veteran_number", IntegerType()),
    StructField("foreign_born", IntegerType()),
    StructField("household_size", DoubleType()),
    StructField("state_code", StringType()),
    StructField("race", StringType()),
    StructField("population_by_race", IntegerType())
])
cities_df = spark.read.csv(input_folder + "us-cities-demographics.csv", \
    sep = ";", schema = cities_schema, header = True)
race_df = cities_df.select(["city", "state", "race", "population_by_race"]) \
    .dropDuplicates()
cities_df = cities_df.select(["city", "state_code", "state", "median_age", \
    "male_population", "female_population", "total_population", \
    "veteran_number", "foreign_born", "household_size"]).dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Add an id column to the dataframe

In [18]:
val_for_race = Window.orderBy(asc("state"), asc("city"), asc("race")) \
    .rangeBetween(Window.unboundedPreceding, 0)
race_df = race_df.withColumn("counter", lit(1)) \
    .withColumn("record_id", Fsum("counter").over(val_for_race))
val_for_cities = Window.orderBy(asc("state"), asc("city")).rangeBetween( \
    Window.unboundedPreceding, 0)
cities_df = cities_df.withColumn("counter", lit(1)) \
    .withColumn("city_id", Fsum("counter").over(val_for_cities))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create a temp view

In [19]:
race_df.createOrReplaceTempView("races")
cities_df.createOrReplaceTempView("cities")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Check the assumption that city and state uniquely identify each city record

In [20]:
query = """
    SELECT COUNT(*)
    FROM
    (
        SELECT COUNT(*) AS num
        FROM cities
        GROUP BY (city, state)
    ) AS ci
    WHERE ci.num > 1
"""
spark.sql(query).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+

#### Identical as the above cell but use state_code

In [21]:
query = """
    SELECT COUNT(*)
    FROM
    (
        SELECT COUNT(*) AS num
        FROM cities
        GROUP BY (city, state_code)
    ) AS ci
    WHERE ci.num > 1
"""
spark.sql(query).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+

#### Define custom schema and read temperature data

In [22]:
temp_schema = StructType([
    StructField("date", DateType()),
    StructField("average_temp", DoubleType()),
    StructField("uncertainty", DoubleType()),
    StructField("city", StringType()),
    StructField("country", StringType()),
    StructField("latitude", StringType()),
    StructField("longitude", StringType())
])
temp_df = spark.read.csv(input_folder + "GlobalLandTemperaturesByCity.csv", \
    schema = temp_schema, sep = ";", header = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Define udf from latitude and longitude column

In [23]:
@udf (DoubleType())
def to_numeric_lat(text):
    list_result = list(text)
    number_part = list_result[:(len(list_result)-1)]
    if list_result[len(list_result)-1] == "N":
        num_sign = []
    elif list_result[len(list_result)-1] == "S":
        num_sign = ["-"]
    else:
        return None
    return float("".join(num_sign + number_part))

@udf (DoubleType())
def to_numeric_long(text):
    list_result = list(text)
    number_part = list_result[:(len(list_result)-1)]
    if list_result[len(list_result)-1] == "E":
        num_sign = []
    elif list_result[len(list_result)-1] == "W":
        num_sign = ["-"]
    else:
        return None
    return float("".join(num_sign + number_part))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Add columns to dataframe and create temp view

In [24]:
temp_df = temp_df.withColumn("numeric_lat", to_numeric_lat(col("latitude")))\
    .withColumn("numeric_long", to_numeric_long(col("longitude")))
temp_df.createOrReplaceTempView("temperature")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create an sql query and run the query

In [25]:
query = """
    SELECT DISTINCT cities.city_id AS city_id, 
        cities.city AS city, 
        cities.state_code AS state_code,
        cities.state AS state,
        temperature.numeric_lat AS latitude,
        temperature.numeric_long AS longitude,
        cities.median_age AS median_age,
        cities.male_population AS male_population,
        cities.female_population AS female_population,
        cities.total_population AS total_population,
        cities.veteran_number AS veteran_number,
        cities.foreign_born AS foreign_born,
        cities.household_size AS household_size
    FROM
    (
        SELECT city_id, city, state_code, state, median_age, male_population,
            female_population, total_population, veteran_number, foreign_born, 
            household_size
        FROM cities
    ) AS cities
    LEFT JOIN
    (
        SELECT DISTINCT city, numeric_lat, numeric_long
        FROM temperature
        WHERE country = 'United States'
    ) As temperature
    ON cities.city = temperature.city
"""
dimension_cities_df = spark.sql(query)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create temp view

In [26]:
dimension_cities_df.createOrReplaceTempView("dimension")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Search for the not unique records

In [27]:
query = """
    SELECT cll.cll_row.city,
        cll.cll_row.latitude,
        cll.cll_row.longitude
    FROM
    (
        SELECT (city, latitude, longitude) AS cll_row, COUNT(*) AS num
        FROM
        (
            SELECT * FROM dimension
            WHERE latitude IS NOT NULL
            AND longitude IS NOT NULL
        )
        GROUP BY (city, latitude, longitude)
    ) AS cll
    WHERE cll.num > 1
    """
not_unique_record = spark.sql(query)
not_unique_record.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

29

#### Show the city name of the not unique record

In [28]:
not_unique_record.select(["city"]).dropDuplicates().show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|        city|
+------------+
| Springfield|
|Fayetteville|
|    Portland|
|      Aurora|
|    Columbus|
|    Pasadena|
|    Glendale|
|    Lakewood|
| Westminster|
|   Lafayette|
|   Arlington|
|     Jackson|
|   Rochester|
| Kansas City|
|      Peoria|
|Jacksonville|
|     Norwalk|
|    Richmond|
|    Columbia|
|     Concord|
+------------+

#### Run multiple times to check the coordinate of the above cities

In [29]:
query = """
    SELECT * FROM dimension
    WHERE city = 'Concord'
    """
spark.sql(query).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+----------+--------------+--------+---------+----------+---------------+-----------------+----------------+--------------+------------+--------------+
|city_id|   city|state_code|         state|latitude|longitude|median_age|male_population|female_population|total_population|veteran_number|foreign_born|household_size|
+-------+-------+----------+--------------+--------+---------+----------+---------------+-----------------+----------------+--------------+------------+--------------+
|     54|Concord|        CA|    California|   37.78|  -122.03|      39.6|          62310|            66358|          128668|          6287|       37428|          2.72|
|    419|Concord|        NC|North Carolina|   37.78|  -122.03|      35.7|          42732|            44961|           87693|          4621|        8847|          2.72|
+-------+-------+----------+--------------+--------+---------+----------+---------------+-----------------+----------------+--------------+------------+--------

#### Input new records for city coordinates

In [30]:
new_data = [
    [493, 'Arlington', 32.95, -96.70],
    [558, 'Arlington', 39.38, -76.99],
    [258, 'Aurora', 40.99, -87.34],
    [169, 'Aurora', 39.38, -104.05],
    [246, 'Columbus', 32.95, -85.21],
    [436, 'Columbus', 39.38, -83.24],
    [73, 'Glendale', 34.56, -118.70],
    [14, 'Glendale', 32.95, -112.02],
    [110, 'Pasadena', 34.56, -118.70],
    [532, 'Pasadena', 29.74, -96.00],
    [17, 'Peoria', 32.95, -112.02],
    [270, 'Peoria', 40.99, -89.47],
    [122, 'Richmond', 37.78, -122.03],
    [567, 'Richmond', 37.78, -77.29],
    [334, 'Springfield', 42.59, -72.00],
    [274, 'Springfield', 39.38, -89.48],
    [373, 'Springfield', 37.78, -93.56],
    [421, 'Fayetteville', 34.56, -79.78],
    [25, 'Fayetteville', None, None],
    [454, 'Portland', 45.81, -123.46],
    [311, 'Portland', None, None],
    [85, 'Lakewood', None, None],
    [178, 'Lakewood', 39.38, -104.05],
    [164, 'Westminster', None, None],
    [183, 'Westminster', 39.38, -104.05],
    [306, 'Lafayette', 29.74, -92.31],
    [284, 'Lafayette', None, None],
    [484, 'Jackson', None, None],
    [364, 'Jackson', 32.95, -90.96],
    [359, 'Rochester', None, None],
    [412, 'Rochester', 42.59, -78.55],
    [367, 'Kansas City', 39.38, -93.64],
    [294, 'Kansas City', 39.38, -93.64],
    [426, 'Jacksonville', None, None],
    [211, 'Jacksonville', 29.74, -81.23],
    [189, 'Norwalk', None, None],
    [102, 'Norwalk', 34.56, -118.70],
    [475, 'Columbia', 34.56, -81.73],
    [365, 'Columbia', None, None],
    [313, 'Columbia', None, None],
    [54, 'Concord', 37.78, -122.03],
    [419, 'Concord', None, None]
]
update_schema = StructType([
    StructField("city_id", IntegerType()),
    StructField("city", StringType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType())
])
update_df = spark.createDataFrame(new_data, update_schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create Temp View for the new records

In [31]:
update_df.createOrReplaceTempView("update_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create a query to generate the final result of the dimension cities

In [32]:
query = """
    SELECT DISTINCT city_id, city, state_code, state, latitude, longitude,
        median_age, male_population, female_population, total_population,
        veteran_number, foreign_born, household_size
    FROM dimension
    WHERE city_id NOT IN
    (
        SELECT DISTINCT city_id FROM update_data
    )
    UNION
    (
        SELECT DISTINCT od.city_id AS city_id, 
            od.city AS city, 
            od.state_code AS state_code, 
            od.state AS state,
            ud.latitude AS latitude, 
            ud.longitude AS longitude, 
            od.median_age AS median_age, 
            od.male_population AS male_population,
            od.female_population AS female_population, 
            od.total_population AS total_population, 
            od.veteran_number AS veteran_number,
            od.foreign_born AS foreign_born, 
            od.household_size AS household_size
        FROM
        (
            SELECT DISTINCT city_id, latitude, longitude
            FROM update_data
        ) AS ud
        JOIN
        (
            SELECT DISTINCT city_id, city, state_code, state, latitude,
                longitude, median_age, male_population, female_population,
                total_population, veteran_number, foreign_born, household_size
            FROM dimension
        ) AS od
        ON ud.city_id = od.city_id
    )
    """
final_cities_df = spark.sql(query)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Write data to files

In [33]:
final_cities_df.write.mode("overwrite").partitionBy("state_code") \
    .parquet(output_folder + "common_dimension/dimension_cities.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Check data quality of dimension cities file

In [34]:
ci_df = spark.read.parquet(output_folder + "common_dimension/dimension_cities.parquet")
ci_df.createOrReplaceTempView("ci")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Count the resulting rows; it is an error if the count is zero

In [35]:
ci_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

596

#### Check whether city_id has NULL value; it is an error is not zero

In [36]:
query = "SELECT COUNT(*) FROM ci WHERE city_id IS NULL"
spark.sql(query).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+

## Fact: races

#### Write race data to parquet

In [37]:
query = """
    SELECT r.record_id AS record_id,
        c.city_id AS city_id,
        r.race AS race,
        r.population_by_race AS population
    FROM
    (
        SELECT DISTINCT record_id, city, state, race, 
            population_by_race
        FROM races
    ) AS r
    JOIN
    (
        SELECT DISTINCT city_id, city, state
        FROM dimension
    ) AS c
    ON r.city = c.city
    AND r.state = c.state
    """
final_race_df = spark.sql(query)
final_race_df.write.mode("overwrite") \
    .partitionBy("city_id") \
    .parquet(output_folder + "races/fact_races.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Check race data quality; it is an error if the count is zero

In [38]:
ra_df = spark.read.parquet(output_folder + "races/fact_races.parquet")
ra_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2891

## Immigration Dimension: ports

#### Read port data file

In [39]:
port_df = spark.read.csv(input_folder + "code/port_code.csv", header = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Create udf to generate numeric latitude and longitude

In [40]:
@udf (StringType())
def get_name(text):
    try:
        result = text.split(",")
        return result[0].strip()
    except Exception as e:
        return None

@udf (StringType())
def get_region_code(text):
    try:
        result = text.split(",")
        return result[1].strip()
    except Exception as e:
        return None

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Separate the field

In [41]:
final_port_df = port_df.select(["port_id", "port"]) \
    .withColumn("port_name", get_name(col("port"))) \
    .withColumn("region_code", get_region_code(col("port")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Write port data to immigration folder

In [42]:
final_port_df.select(["port_id", "port_name", "region_code"]) \
    .write.mode("overwrite") \
    .parquet(output_folder + "immigration/port_code.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### port data quality check

In [43]:
po_df = spark.read.parquet(output_folder + "immigration/port_code.parquet")
po_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

591

#### Check if port id is unique

In [44]:
po_df.createOrReplaceTempView("ports")
query = """
    SELECT COUNT(*) FROM
    (
        SELECT port_id, COUNT(*) AS num
        FROM ports
        GROUP BY port_id
    ) AS po
    WHERE po.num > 1
"""
spark.sql(query).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+

## Immigration Dimension: country

#### Read country data form file

In [45]:
country_schema = StructType([
    StructField("country_code", IntegerType()),
    StructField("country", StringType())
    ])
country_df = spark.read.csv(input_folder + "code/country_code.csv", \
    header = True, schema = country_schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Write country data to immigration folder

In [46]:
country_df.write.mode("overwrite") \
    .parquet(output_folder + "immigration/dimension_countries.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Data quality check

In [47]:
co_df = spark.read.parquet(output_folder + "immigration/dimension_countries.parquet")
co_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

289

#### Check whether country_code is unique

In [48]:
co_df.createOrReplaceTempView("countries")
query = """
    SELECT COUNT(*) FROM
    (
        SELECT country_code, COUNT(*) AS num
        FROM countries
        GROUP BY country_code
    ) AS co
    WHERE co.num > 1
"""
spark.sql(query).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+

## Immigration Dimension: visa type

#### Read data from file

In [49]:
visa_schema = StructType([
    StructField("visa_id", IntegerType()),
    StructField("visa_type", StringType())
])
visa_df = spark.read.csv(input_folder + "code/visa_type.csv", \
    header = True, schema = visa_schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Write data to immigration folder

In [50]:
visa_df.write.mode("overwrite") \
    .parquet(output_folder + "immigration/visa_type.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Data quality check; since the number of row is very small, just show them all

In [51]:
vi_df = spark.read.parquet(output_folder + "immigration/visa_type.parquet")
vi_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+
|visa_id|visa_type|
+-------+---------+
|      1| Business|
|      2| Pleasure|
|      3|  Student|
+-------+---------+

## Immigration Dimension: transportation modes

#### Read data from file

In [52]:
trans_schema = StructType([
    StructField("mode_id", IntegerType()),
    StructField("mode", StringType())
])
trans_df = spark.read.csv(input_folder + "code/transportation_mode.csv", \
    header = True, schema = trans_schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Write data to file

In [53]:
trans_df.write.mode("overwrite") \
    .parquet(output_folder + "immigration/transportation_mode.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Data quality check; just show them

In [54]:
tr_df = spark.read.parquet(output_folder + "immigration/transportation_mode.parquet")
tr_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+
|mode_id|        mode|
+-------+------------+
|      1|         Air|
|      2|         Sea|
|      3|        Land|
|      9|Not reported|
+-------+------------+

## Fact: airports

#### The fact_airport.py is submited to spark-submit
#### Perform data quality check; if the count is zero, it is an error

In [None]:
airport_df = spark.read.parquet(output_folder + "airports/fact_airports.parquet")
airport_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…