# City Immigration, Demographics and Temperatures
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to gain insights into US immigration patterns. This includes understanding popular cities for immigration, demographic information of immigrants, visa types, and age and temperature trends. Data is gathered from several sources, including the I94 immigration dataset from 2016, city temperature data from Kaggle, and US city demographic information from OpenSoft. To organize the data, 4 dimension tables are created: Cities, Immigrants, Monthly Average City Temperature, and Time, with a fact table for Immigration. The ETL process utilizes Spark and the final results are stored in the parquet format for further analysis.

In [1]:
import pandas as pd
import os
import glob
import re

from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

In [2]:
%%time

spark = SparkSession.builder\
                    .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport().getOrCreate()

df = spark.read.format('com.github.saurfang.sas.spark')\
               .load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

CPU times: user 132 ms, sys: 24.9 ms, total: 157 ms
Wall time: 45.5 s


In [3]:
# Check Spark session set up correctly

df.createOrReplaceTempView("immigration_table")
df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- validres: double (nullable = true)
 |-- delete_days: double (nullable = true)
 |-- delete_mexl: double (nullable = true)
 |-- delete_dup: double (nullable = true)
 |-- delete_visa: double (nullable = true)
 |-- delete_recdup: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- mat

### Step 1: Read in Data

In [4]:
# Read immigration data
i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_fname)

In [6]:
# Read demographics data
demo_fname = "us-cities-demographics.csv"
demo_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demo_fname)

In [5]:
# Read temperature data
temperature_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

### Step 2: Explore and Assess the Data
Data Exploration & Cleaning

#### Immigration data

In [7]:
# Explore immigration data
i94_df.count()

3096313

In [8]:
i94_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [9]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))

659


In [10]:
# Create list of valid states
valid_states = demo_df.toPandas()["State Code"].unique()
print(len(valid_states))
print(valid_states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [11]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [12]:
# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'

In [13]:
# Clean immigration data

# Remove any missing values
cleaned_i94_df = i94_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

# Convert arrival_date (SAS format) to PySpark format
cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

# only keep us related immigration data
cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

staging_i94_df = cleaned_i94_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

staging_i94_df.limit(5).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,168.0,2016-04-01,WAS,DC,34.0,M,2.0,1.0
1,383.0,2016-04-01,MIA,FL,40.0,M,2.0,1.0
2,608.0,2016-04-01,TOR,TX,45.0,M,1.0,1.0
3,930.0,2016-04-01,NEW,NY,49.0,F,2.0,1.0
4,1229.0,2016-04-01,NYC,CT,32.0,M,1.0,1.0


In [14]:
staging_i94_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: double (nullable = true)
 |-- count: double (nullable = true)



#### Temperature data

In [15]:
temperature_df.count()

8599212

In [16]:
temperature_df.limit(3).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [17]:
# Create udf to map city full name to city port abbreviation

@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [18]:
# Clean temperature data

# Remove invalid ports
# Map full name to city port abbreviation
# Only use US temperatures

cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (the latest year in the dataset)
cleaned_temp_df = cleaned_temp_df.filter(cleaned_temp_df["year"] == 2013)

staging_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

print(staging_temp_df.count())
staging_temp_df.limit(5).toPandas()

1044


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W


In [19]:
staging_temp_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



#### Demographics Data

In [20]:
demo_df.count()

2891

In [21]:
demo_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [22]:
# Clean demographics data

# Calculate percentages of numeric columns and create new ones
cleaned_demo_df = demo_df.withColumn("median_age", demo_df['Median Age']) \
    .withColumn("pct_male_pop", (demo_df['Male Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demo_df['Female Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demo_df['Number of Veterans'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demo_df['Foreign-born'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_race", (demo_df['Count'] / demo_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demo_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demo_df = cleaned_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

cleaned_demo_df.count()

883

In [23]:
# Pivot the race column

pivot_demo_df = cleaned_demo_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_port(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_demo_df = pivot_demo_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(staging_demo_df.count())
staging_demo_df.limit(10).toPandas()

180


Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082
3,FMY,FL,Fort Myers,37.3,49.8,50.2,5.8,5.8,,4.8,23.4,24.1,67.8,74015
4,ORL,FL,Orlando,33.1,48.3,51.7,4.7,4.7,0.9,4.1,25.1,33.0,66.1,270917
5,LOS,CA,Los Angeles,35.0,49.3,50.7,2.2,2.2,1.6,12.9,10.2,48.8,54.8,3971896
6,PRO,RI,Providence,29.9,49.7,50.3,2.8,2.8,2.3,7.5,17.1,43.5,54.6,179204
7,CID,IA,Cedar Rapids,36.2,48.4,51.6,6.0,6.0,1.0,4.1,9.1,4.1,89.6,130405
8,SPI,IL,Springfield,38.8,47.2,52.8,6.4,6.4,1.4,3.3,21.5,2.3,77.2,117809
9,POM,OR,Portland,36.7,49.6,50.4,4.7,4.7,2.4,10.2,7.3,9.7,82.9,632187


In [24]:
staging_demo_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
A star schema is selected as the data model due to its simplicity and effectiveness. This allows users to easily write queries by joining fact and dimension tables to analyze the data. The following tables comprise the schema:

##### Staging Tables
```
staging_i94_df
    id
    date
    city_code
    state_code
    age
    gender
    visa_type
    count
    
staging_temp_df
    year
    month
    city_code
    city_name
    avg_temperature
    lat
    long
    
staging_demo_df
    city_code
    state_code
    city_name
    median_age
    pct_male_pop
    pct_female_pop
    pct_veterans
    pct_foreign_born
    pct_native_american
    pct_asian
    pct_black
    pct_hispanic_or_latino
    pct_white
    total_pop
```

##### Dimension Tables
```
immigrant_df
    id
    gender
    age
    visa_type
    
city_df
    city_code
    state_code
    city_name
    median_age
    pct_male_pop
    pct_female_pop
    pct_veterans
    pct_foreign_born
    pct_native_american
    pct_asian
    pct_black
    pct_hispanic_or_latino
    pct_white
    total_pop
    lat
    long
    
monthly_city_temp_df
    city_code
    year
    month
    avg_temperature
    
time_df
    date
    dayofweek
    weekofyear
    month
```

##### Fact Table
```
immigration_df
    id
    state_code
    city_code
    date
    count
```

#### 3.2 Executing the Data Pipeline
The process of building the data pipeline using the star schema model includes the following steps:

1. Cleaning the data to address issues such as null values, data types, and duplicates.
2. Loading data into staging tables for the I94 immigration dataset, city temperature data, and demographic data.
3. Creating dimension tables for immigrants, cities, monthly city temperature, and time.
4. Creating a fact table for immigration information, linking it to the dimension tables to ensure referential integrity.
5. Saving the processed dimension and fact tables in the parquet format for downstream querying.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipeline to create the data model.

In [25]:
# Create dimension table for immigrant

immigrant_df = staging_i94_df.select("id", "gender", "age", "visa_type").drop_duplicates()

In [26]:
immigrant_df.count()

2435922

In [27]:
immigrant_df.limit(5).toPandas()

Unnamed: 0,id,gender,age,visa_type
0,270799.0,F,49.0,2.0
1,275758.0,M,50.0,2.0
2,445416.0,M,69.0,1.0
3,488015.0,M,6.0,2.0
4,503609.0,M,21.0,2.0


In [28]:
# Create dimension table for city

city_df = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

In [29]:
city_df.count()

142

In [30]:
city_df.limit(5).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,BRO,TX,Brownsville,30.6,47.7,52.3,2.3,2.3,0.6,0.9,0.7,92.5,95.0,183888,26.52N,96.72W
1,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,0.9,9.6,8.2,7.9,82.1,248956,34.56N,85.62W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W
3,NEW,NJ,Newark,34.6,49.0,51.0,2.1,2.1,0.8,2.6,51.4,35.6,27.1,281913,40.99N,74.56W
4,NWH,CT,New Haven,29.9,48.9,51.1,2.0,2.0,1.7,6.1,33.3,33.4,43.2,130310,40.99N,72.43W


In [31]:
# Create dimension table for monthly city temperature

monthly_city_temp_df = staging_temp_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

In [32]:
monthly_city_temp_df.count()

1043

In [33]:
monthly_city_temp_df.limit(5).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3
3,BUR,2013,3,14.5
4,RNO,2013,2,4.7


In [34]:
# Create dimension table for time

time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [35]:
time_df.count()

30

In [36]:
time_df.limit(5).toPandas()

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-04-23,7,16,4
1,2016-04-22,6,16,4
2,2016-04-08,6,14,4
3,2016-04-09,7,14,4
4,2016-04-26,3,17,4


In [37]:
# Create fact table for immigration

immigration_df = staging_i94_df.select("id", "state_code", "city_code", "date", "count").drop_duplicates()

In [38]:
immigration_df.count()

2435922

In [39]:
immigration_df.limit(5).toPandas()

Unnamed: 0,id,state_code,city_code,date,count
0,25716.0,CA,LOS,2016-04-01,1.0
1,56083.0,HI,HHW,2016-04-01,1.0
2,261977.0,FL,NYC,2016-04-02,1.0
3,290139.0,NY,NYC,2016-04-02,1.0
4,487570.0,HI,HHW,2016-04-03,1.0


In [None]:
# Write dimension tables to parquet
immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
city_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
monthly_city_temp_df.write.mode("overwrite").parquet("monthly_city_temperatues")
time_df.write.mode("overwrite").parquet("time")

# Write fact table to parquet
immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

#### 4.2 Data Quality "DQ" Checks
 
Run DQ Checks

In [None]:
# Perform quality checks here

def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(immigrant_df) & table_exists(city_df) & table_exists(monthly_city_temp_df) & table_exists(time_df) & table_exists(immigration_df):
    print("data quality check passed")
    print("dimension tables and fact table exist")
    print()
else:
    print("data quality check failed")
    print("table missing...")

In [None]:
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(immigrant_df) & table_not_empty(city_df) & table_not_empty(monthly_city_temp_df) & table_not_empty(time_df) & table_not_empty(immigration_df):
    print("data quality check passed!")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed!")
    print("null records...")

#### 4.3 Data dictionary 
Brief description of what the data is and where it came from.

##### Dimension Tables
```
immigrant_df
    id: id of immigrant
    gender: gender of immigrant
    age: age of immigrant
    visa_type: immigrant's visa type
    
city_df
    city_code: city port code
    state_code: state code of the city
    city_name: name of the city
    median_age: median age of the city
    pct_male_pop: city's male population in percentage
    pct_female_pop: city's female population in percentage
    pct_veterans: city's veteran population in percentage
    pct_foreign_born: city's foreign born population in percentage
    pct_native_american: city's native american population in percentage
    pct_asian: city's asian population in percentage
    pct_black: city's black population in percentage
    pct_hispanic_or_latino: city's hispanic or latino population in percentage
    pct_white: city's white population in percentage
    total_pop: city's total population
    lat: latitude of the city
    long: longitude of the city
    
monthly_city_temp_df
    city_code: city port code
    year: year
    month: month 
    avg_temperature: average temperature in city for given month
    
time_df
    date: date
    dayofweek: day of the week
    weekofyear: week of year
    month: month
```

##### Fact Table
```
immigration_df
    id: id
    state_code: state code of arrival city
    city_code: city port code of arrival city
    date: date of arrival
    count: count of immigrant's entries into the US
```

#### Step 5: Complete Project Write Up
Spark is chosen for this project as it is known for processing large amount of data fast (with in-memory compute), scale easily with additional worker nodes, with ability to digest different data formats (e.g. SAS, Parquet, CSV), and integrate nicely with cloud storage like S3 and warehouse like Redshift.

The data update cycle is typically chosen on two criteria. One is the reporting cycle, the other is the availabilty of new data to be fed into the system. For example, if new batch of average temperature can be made available at monthly interval, we might settle for monthly data refreshing cycle.

There are also considerations in terms of scaling existing solution.
* **If the data was increased by 100x:**
We can consider spinning up larger instances of EC2s hosting Spark and/or additional Spark work nodes. With added capacity arising from either vertical scaling or horizontal scaling, we should be able to accelerate processing time.

* **If the data populates a dashboard that must be updated on a daily basis by 7am every day:**
We can consider using Airflow to schedule and automate the data pipeline jobs. Built-in retry and monitoring mechanism can enable us to meet user requirement.

* **If the database needed to be accessed by 100+ people:**
We can consider hosting our solution in production scale data warehouse in the cloud, with larger capacity to serve more users, and workload management to ensure equitable usage of resources across users.