# Facteus Data Lake

## Project Summary
* The project helps Facteus to build a data lake hosted on AWS S3 to understand how COVID-19 affects payments transactions.
* I built an ETL pipeline to extract raw data from S3, process them using Spark, and transform into dimension tables for analytic purposes.

The project follows the following process:
* Scope the Project and Gather Data
* Explore and Assess the Data
* Define the Data Model
* Run ETL to Model the Data
* Project Write Up

In [1]:
### Import all the necessary packages
import os
import configparser
import gzip
from io import BytesIO
import boto3
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark import SparkConf

### 1. Scope the Project and Gather Data

#### Scope 
* The project builds an ETL pipeline to move millions of payment transaction records after COVID-19 outbreak, COVID cases tracking data and other demographic datasets to a data lake hosted on S3. 
* The data lake can generate insights of how the pandemic change people's payment behavior, how business sectors are affected, and how different demographic and geographic groups react to the pandemic.  
* I would use PySpark to extract data from AWS S3, process them and load them into dimension tables, then store the tables back to S3.

#### Describe and Gather Data 
* **Daily Spending Data**: The dataset comes from Facteus, a provider of financial data business intelligence. It covers payments transactions from 2020/04/17 to 2020/07/07, including payment date, merchant information, card holder zip code and payment details.
* **Covid County Data**: It is open sourced data tracking daily total Covid-19 tests, cases and deaths by county.
* **U.S. City Demographic Data**  This data comes from US Census Bureau's 2015 American Community Survey. It includes household details in the cities.
* **Zip_City Data**: The data comes from Simplemaps. It connects Zip Code with city and county.
* **ZIP Code Crosswalk Data**: The data comes from HUD’s Office of Policy Development and Research. It relates ZIP codes to county FIPS, including county composition details.
* **Merchant Category Code Data** The data comes from Visa Merchant Data Standards Manual. It relates merchant category codes to merchant names.

In [2]:
# Read config file
config = configparser.ConfigParser()
config.read('credentials.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# create a spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

#### 1.1 Extract Daily Spending Data from S3

In [4]:
# fix the schema
paymentSchema = R([
    Fld('Date',Date()),
    Fld('MCC',Long()),
    Fld('Zip_code',Long()),
    Fld('Num_card',Long()),
    Fld('Num_trnsc',Long()),
    Fld('Total_spend',Dbl())
])

In [5]:
# read the csv file to a Spark DataFrame
df_payment = spark.read.csv("s3a://transaction-dataset/daily-spend/*.csv",schema = paymentSchema )

In [6]:
df_payment.show(5)

+----------+----+--------+--------+---------+-----------+
|      Date| MCC|Zip_code|Num_card|Num_trnsc|Total_spend|
+----------+----+--------+--------+---------+-----------+
|2020-06-16|4814|   55414|       7|        7|      853.6|
|2020-06-16|5655|   55414|       1|        1|     161.94|
|2020-06-16|5651|   55414|       4|        4|     242.25|
|2020-06-16|5399|   55414|       1|        1|     127.02|
|2020-06-16|5814|   55414|      59|       84|     645.15|
+----------+----+--------+--------+---------+-----------+
only showing top 5 rows



#### 1.2 Extract Covid County Data from S3

In [7]:
# fix the schema
covidSchema = R([
    Fld('Date',Date()),
    Fld('County',Long()),
    Fld('Name',Str()),
    Fld('Total_tests',Long()),
    Fld('Total_deaths',Dbl()),
    Fld('Total_cases',Dbl())
])

In [8]:
# read the csv file to a Spark DataFrame
df_covid = spark.read.csv("s3a://transaction-dataset/covid-cases.csv",schema = covidSchema,header = True)

In [9]:
df_covid.toPandas().head()

Unnamed: 0,Date,County,Name,Total_tests,Total_deaths,Total_cases
0,2020-01-21,1001,Autauga,,0.0,0.0
1,2020-01-22,1001,Autauga,,0.0,0.0
2,2020-01-23,1001,Autauga,,0.0,0.0
3,2020-01-24,1001,Autauga,,0.0,0.0
4,2020-01-25,1001,Autauga,,0.0,0.0


#### 1.3 Extract U.S. City Demographic Data from S3

In [10]:
# fix the schema
citySchema = R([
    Fld('City',Str()),
    Fld('State',Str()),
    Fld('Median_age',Dbl()),
    Fld('Male_pop',Long()),
    Fld('Female_pop',Long()),
    Fld('Total_pop',Long()),
    Fld('Num_vet',Long()),
    Fld('Foreign_born',Long()),
    Fld('Avg_size',Dbl()),
    Fld('State_code',Str()),
    Fld('Race',Str()),
    Fld('Count',Long())
])

In [11]:
# read the csv file to a Spark DataFrame 
df_city = spark.read.option("delimiter", ";").csv("s3a://transaction-dataset/us-cities-demographics.csv", schema = citySchema, header = True)

In [12]:
df_city.toPandas().head()

Unnamed: 0,City,State,Median_age,Male_pop,Female_pop,Total_pop,Num_vet,Foreign_born,Avg_size,State_code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### 1.4 Extract City-Zips Data from S3

In [13]:
# read the csv file to a Spark DataFrame
df_cityzip = spark.read.csv("s3a://transaction-dataset/uszips.csv",header = True)

# change zip type to LongType
df_cityzip = df_cityzip.withColumn('zip',col('zip').cast(Long()))

In [14]:
df_cityzip.toPandas().head()

Unnamed: 0,zip,lat,lng,city,state_id,state_name,zcta,parent_zcta,population,density,county_fips,county_name,county_weights,county_names_all,county_fips_all,imprecise,military,timezone
0,601,18.18004,-66.75218,Adjuntas,PR,Puerto Rico,True,,17242,111.4,72001,Adjuntas,"{'72001':99.43,'72141':0.57}",Adjuntas|Utuado,72001|72141,False,False,America/Puerto_Rico
1,602,18.36073,-67.17517,Aguada,PR,Puerto Rico,True,,38442,523.5,72003,Aguada,{'72003':100},Aguada,72003,False,False,America/Puerto_Rico
2,603,18.45439,-67.12202,Aguadilla,PR,Puerto Rico,True,,48814,667.9,72005,Aguadilla,{'72005':100},Aguadilla,72005,False,False,America/Puerto_Rico
3,606,18.16724,-66.93828,Maricao,PR,Puerto Rico,True,,6437,60.4,72093,Maricao,"{'72093':94.88,'72121':1.35,'72153':3.78}",Maricao|Yauco|Sabana Grande,72093|72153|72121,False,False,America/Puerto_Rico
4,610,18.29032,-67.12243,Anasco,PR,Puerto Rico,True,,27073,312.0,72011,Añasco,"{'72003':0.55,'72011':99.45}",Añasco|Aguada,72011|72003,False,False,America/Puerto_Rico


#### 1.5 Extract ZIP Code Crosswalk Data from S3

In [15]:
countySchema = R([
    Fld('Zipcode',Long()),
    Fld('County',Long()),
    Fld('Res_ratio',Dbl()),
    Fld('Bus_ratio',Dbl()),
    Fld('Oth_ratio',Dbl()),
    Fld('Tot_ratio',Dbl())
])

In [16]:
# read the csv file to a Spark DataFrame
df_county = spark.read.csv("s3a://transaction-dataset/zipcode-county.csv",schema = countySchema,header = True)

In [17]:
df_county.toPandas().head()

Unnamed: 0,Zipcode,County,Res_ratio,Bus_ratio,Oth_ratio,Tot_ratio
0,501,36103,0.0,1.0,0.0,1.0
1,601,72113,0.160758,0.199017,0.128834,0.162397
2,601,72001,0.839242,0.800983,0.871166,0.837603
3,602,72003,1.0,0.998801,1.0,0.999919
4,602,72005,0.0,0.001199,0.0,8.1e-05


#### 1.6 Extract Merchant Category Code Data from S3 

In [18]:
mccSchema = R([
    Fld('MCC',Long()),
    Fld('Name',Str())
])

In [19]:
# read the csv file to a Spark DataFrame
df_mcc = spark.read.csv("s3a://transaction-dataset/MCC_List.csv",schema = mccSchema,header = True)

In [20]:
df_mcc.toPandas().head()

Unnamed: 0,MCC,Name
0,742.0,Veterinary Services
1,763.0,Agricultural Cooperatives
2,780.0,Landscaping and Horticultural Services
3,1520.0,General Contractor/Residential Building
4,1711.0,"Heating, Plumbing, Air Conditioning Contractors"


### 2. Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.
#### Cleaning the Data
Remove duplicate rows and rows with missing values if necessary. 

#### 2.1 Clean Daily Spending Data

In [21]:
# check Daily Spending Data
df_payment.count()

19509232

In [22]:
df_payment.dropDuplicates().count()

19442575

In [23]:
df_payment.dropna(how = "any").count()

19422738

In [24]:
# drop duplicate rows and missing values
df_payment = df_payment.dropDuplicates().dropna(how = "any")

#### 2.2 Clean Covid County Data

In [25]:
# check Covid County Data
df_covid.count()

1048575

In [26]:
df_covid.dropDuplicates().count()

1048575

In [27]:
df_covid.dropna(how = "any", subset = ['Date','County','Total_deaths','Total_cases']).count()

1048438

In [28]:
# inspect the data 
df_covid.createOrReplaceTempView('table')
spark.sql('SELECT * FROM table WHERE Total_deaths IS NULL').show()

+----------+------+----------+-----------+------------+-----------+
|      Date|County|      Name|Total_tests|Total_deaths|Total_cases|
+----------+------+----------+-----------+------------+-----------+
|2021-01-17| 42083|    McKean|       null|        null|       null|
|2021-01-17| 44001|   Bristol|      28896|        null|       null|
|2021-01-17| 44003|      Kent|      81180|        null|       null|
|2021-01-17| 44005|   Newport|      43252|        null|       null|
|2021-01-17| 44007|Providence|     338209|        null|       null|
|2021-01-17| 44009|Washington|      62111|        null|       null|
+----------+------+----------+-----------+------------+-----------+



In [29]:
# drop rows with null total_deaths and total cases
df_covid = df_covid.dropna(how = "any", subset = ['Date','County','Total_deaths','Total_cases'])

#### 2.3 Clean U.S. City Demographic Data

In [30]:
# check U.S. City Demographic Data
df_city.count()

2891

In [31]:
df_city.dropDuplicates().count()

2891

In [32]:
df_city.dropna(how = "any",subset = ['City']).count()

2891

#### 2.4 Clean City-Zips Data

In [33]:
# check City-Zips Data
df_cityzip.count()

33097

In [34]:
df_cityzip.dropDuplicates().count()

33097

In [35]:
df_cityzip.dropna(how = "any",subset = ['zip','city']).count()

33097

#### 2.5 Clean ZIP Code Crosswalk Data

In [36]:
# check ZIP Code Crosswalk Data
df_county.count()

54197

In [37]:
df_county.dropDuplicates().count()

54197

In [38]:
df_county.dropna(how = "any").count()

54197

#### 2.6 Clean Merchant Category Code Data

In [39]:
# check Merchant Category Code Data
df_mcc.count()

881

In [40]:
df_mcc.dropDuplicates().count()

881

In [41]:
df_mcc.dropna(how = "any").count()

880

In [42]:
# inspect the null value
df_mcc.filter(df_mcc.MCC.isNull()).show()

+----+--------------------+
| MCC|                Name|
+----+--------------------+
|null|Visa - Non-Financ...|
+----+--------------------+



In [43]:
df_mcc = df_mcc.dropna(how = 'any')

### 3. Define the Data Model


#### 3.1 Conceptual Data Model
* The data lake is built on the snowflake schema. 
* The centralized fact table is the payment table.
* The snowflake schema normalizes dimensions into multiple related tables since each dimension (covid, city demographics) contain more data than a single dimension table. 
* It also decreases the space to store the data and the number of places where it needs to be updated if the data changes.

The ER diagram is shown below
![Database ER diagram](./ER_Diagram.png)

#### 3.2 Data Pipelines
1. Create tables by selecting columns from Spark DataFrame
2. Store tables in parquet files to AWS S3 folder

### 4. Run Pipelines to Model the Data
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [44]:
output_path = 's3a://datalake-payment'

##### 4.1.1 Write payment table to parquet file

In [45]:
# extract columns to create payment table
payment_table = df_payment.select('Date',
                                  'MCC',
                                  'Zip_code',
                                  'Num_card',
                                  'Num_trnsc',
                                  'Total_spend').withColumn('id', F.monotonically_increasing_id())

In [58]:
# write payment table to the parquet file partitioned by date
payment_table.write.parquet(os.path.join(output_path,'payment.parquet'),'overwrite')

##### 4.1.2 Write covid table to parquet file

In [56]:
# extract columns to create covid table
covid_table = df_covid.select('Date',
                              'County',
                              'Name',
                              'Total_tests',
                              'Total_deaths',
                              'Total_cases')

In [57]:
# write covid table to the parquet file partitioned by date
covid_table.write.parquet(os.path.join(output_path,'covid.parquet'),'overwrite')

##### 4.1.3 Write city table to parquet file

In [54]:
# extract columns to create city table
city_table = df_city.select('City',
                            'State',
                            'State_code',
                            'Total_pop',
                            'Male_pop',
                            'Female_pop',
                            'Avg_size',
                            'Median_age',
                            'Race')

In [55]:
# write city table to the parquet file partitioned by date
city_table.write.parquet(os.path.join(output_path,'city.parquet'),'overwrite')

##### 4.1.4 Write zip_city table to parquet file

In [52]:
# extract columns to create city table
zip_city_table = df_cityzip.selectExpr('zip AS Zip_code',
                            'City',
                            'State_name AS State',
                            'State_id AS State_code')

In [53]:
zip_city_table.write.parquet(os.path.join(output_path,'zip_city.parquet'),'overwrite')

##### 4.1.5 Write zip_county table to parquet file

In [50]:
zip_county_table = df_county.selectExpr('Zipcode AS Zip_code',
                                        'County',
                                        'Res_ratio',
                                        'Bus_ratio',
                                        'Oth_ratio',
                                        'Tot_ratio')

In [51]:
zip_county_table.write.parquet(os.path.join(output_path,'zip_county.parquet'),'overwrite')

##### 4.1.6 Write mcc table to parquet file

In [47]:
mcc_table = df_mcc.select('MCC',
                          'Name')

In [49]:
mcc_table.write.option("mergeSchema", "false").parquet(os.path.join(output_path,'mcc.parquet'),'overwrite')

#### 4.2 Data Quality Checks


In [66]:
# define the function for quality check
def load_table(table):
    
    # load table
    table_path = os.path.join(output_path, (table + '.parquet'))
    df_table = spark.read.parquet(table_path)
    df_table.createOrReplaceTempView(table)
    
    # check number of rows in the table
    print(f"Number of rows in the table {table}: {df_table.count()}")

##### 4.2.1 Check payment table

In [67]:
load_table('payment')

Number of rows in the table payment: 19422738


In [69]:
# integrity check
spark.sql('DESCRIBE payment').show()

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|       Date|     date|   null|
|        MCC|   bigint|   null|
|   Zip_code|   bigint|   null|
|   Num_card|   bigint|   null|
|  Num_trnsc|   bigint|   null|
|Total_spend|   double|   null|
|         id|   bigint|   null|
+-----------+---------+-------+



In [72]:
# unit test
spark.sql('''SELECT Total_spend/Num_card AS SpendperCard 
             FROM payment
             ORDER BY 1 DESC
             LIMIT 5''').show()

+------------+
|SpendperCard|
+------------+
|   73111.788|
|    58710.84|
|    33046.42|
|     31826.3|
|    25469.77|
+------------+



##### 4.2.2 Check covid table

In [73]:
load_table('covid')

Number of rows in the table covid: 1048438


In [74]:
# integrity check
spark.sql('DESCRIBE covid').show()

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|        Date|     date|   null|
|      County|   bigint|   null|
|        Name|   string|   null|
| Total_tests|   bigint|   null|
|Total_deaths|   double|   null|
| Total_cases|   double|   null|
+------------+---------+-------+



In [77]:
# unit check
spark.sql('''
          SELECT Name, Total_cases
          FROM covid
          WHERE Date = "2020-07-01"
          ORDER BY 2 DESC
          LIMIT 5''').show()

+-----------+-----------+
|       Name|Total_cases|
+-----------+-----------+
|Los Angeles|   105291.0|
|     Queens|    65455.0|
|      Kings|    59742.0|
|   Maricopa|    52266.0|
|      Bronx|    47651.0|
+-----------+-----------+



##### 4.2.3 Check city table

In [78]:
load_table('city')

Number of rows in the table city: 2891


In [79]:
# integrity check
spark.sql('DESCRIBE city').show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|      City|   string|   null|
|     State|   string|   null|
|State_code|   string|   null|
| Total_pop|   bigint|   null|
|  Male_pop|   bigint|   null|
|Female_pop|   bigint|   null|
|  Avg_size|   double|   null|
|Median_age|   double|   null|
|      Race|   string|   null|
+----------+---------+-------+



In [86]:
# unit check
spark.sql('''
            SELECT DISTINCT City,State, Total_pop
            FROM city
            ORDER BY 3 DESC
            LIMIT 5
            ''').show()

+------------+------------+---------+
|        City|       State|Total_pop|
+------------+------------+---------+
|    New York|    New York|  8550405|
| Los Angeles|  California|  3971896|
|     Chicago|    Illinois|  2720556|
|     Houston|       Texas|  2298628|
|Philadelphia|Pennsylvania|  1567442|
+------------+------------+---------+



##### 4.2.4 Check zip_city table

In [87]:
load_table('zip_city')

Number of rows in the table zip_city: 33097


In [88]:
# integrity check
spark.sql('DESCRIBE zip_city').show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|  Zip_code|   bigint|   null|
|      City|   string|   null|
|     State|   string|   null|
|State_code|   string|   null|
+----------+---------+-------+



In [89]:
# unit check
spark.sql('''
            SELECT *
            FROM zip_city
            LIMIT 5
            ''').show()

+--------+---------+-----------+----------+
|Zip_code|     City|      State|State_code|
+--------+---------+-----------+----------+
|     601| Adjuntas|Puerto Rico|        PR|
|     602|   Aguada|Puerto Rico|        PR|
|     603|Aguadilla|Puerto Rico|        PR|
|     606|  Maricao|Puerto Rico|        PR|
|     610|   Anasco|Puerto Rico|        PR|
+--------+---------+-----------+----------+



##### 4.2.5 Check zip_county table

In [90]:
load_table('zip_county')

Number of rows in the table zip_county: 54197


In [91]:
# integrity check
spark.sql('DESCRIBE zip_county').show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
| Zip_code|   bigint|   null|
|   County|   bigint|   null|
|Res_ratio|   double|   null|
|Bus_ratio|   double|   null|
|Oth_ratio|   double|   null|
|Tot_ratio|   double|   null|
+---------+---------+-------+



In [92]:
# unit check
spark.sql('''
            SELECT *
            FROM zip_county
            LIMIT 5
            ''').show()

+--------+------+-----------+-----------+-----------+-----------+
|Zip_code|County|  Res_ratio|  Bus_ratio|  Oth_ratio|  Tot_ratio|
+--------+------+-----------+-----------+-----------+-----------+
|     501| 36103|        0.0|        1.0|        0.0|        1.0|
|     601| 72113|0.160757734|0.199017199|0.128834356|0.162397217|
|     601| 72001|0.839242266|0.800982801|0.871165644|0.837602783|
|     602| 72003|        1.0|0.998800959|        1.0|0.999919368|
|     602| 72005|        0.0|0.001199041|        0.0|  8.0632E-5|
+--------+------+-----------+-----------+-----------+-----------+



##### 4.2.5 Check mcc table

In [93]:
load_table('mcc')

Number of rows in the table mcc: 880


In [94]:
# integrity check
spark.sql('DESCRIBE mcc').show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|     MCC|   bigint|   null|
|    Name|   string|   null|
+--------+---------+-------+



In [95]:
# unit check
spark.sql('''
            SELECT *
            FROM mcc
            LIMIT 5
            ''').show()

+----+--------------------+
| MCC|                Name|
+----+--------------------+
| 742| Veterinary Services|
| 763|Agricultural Coop...|
| 780|Landscaping and H...|
|1520|General Contracto...|
|1711|Heating, Plumbing...|
+----+--------------------+



#### 4.3 Data dictionary
##### payment table
* id: automatically generated identification number
* Date: transaction date
* MCC: Visa Merchant Category Code
* Zip_code: the cardholder’s residential zip code
* Num_card: number of unique cards involved in transactions on the Date at merchants with that MCC
* Num_trnsc: number of unique transactions on the Date at merchants with that MCC
* Total_spend: total amount spent on the Date at merchants with that MCC

##### covid table
* Date: counting date
* County: county FIPS 
* Name: county name
* Total_tests: total covid tests
* Total_deaths: total deaths caused by covid
* Total_cases: total confirmed covid cases

##### city table
* City: city name
* State: state name
* State_code: abbreviation code for the state
* Total_pop: total population
* Male_pop: male population
* Female_pop: female population
* Avg_size: average family size
* Median_age: median age
* Race: race composition

##### zip_city
* Zip_code: zip code
* City: city name
* State: state name
* State_code: abbreviation code for the state

##### zip_county
* Zip_code: zip code
* County: county name
* Res_ratio: the ratio of residential addresses in the zip
* Bus_ratio: The ratio of business addresses in the zip
* Oth_ratio: The ratio of other addresses in the zip
* Tot_ratio: The ratio of total addresses in the zip 

##### mcc
* MCC: Visa Merchant Category Code
* Name: merchant name

### 5. Project Write Up
**1. Why Data Lake?**
* The key data is payments transactions. But what insights we want to drive is undefined. I add covid data into the data lake to examine how covid change people's consumption pattern. We can also put other data into data lake to drive other insights.

**2. Why PySpark?**
* Spark is much faster than Hadoop for large scale data processing and more user friendly.

**3. How often the data should be updated?**
* The data should be updated weekly. Allow a week for consumption pattern changes.

**4. What if the data was increased by 100x?**
* Spark can handle it with its better scalability and overall faster runtimes.

**5. What if the data populates a dashboard that must be updated on a daily basis by 7am every day?**
* Update the tables stored in the data lake daily using Append or Overwrite Mode by spark.write

**6. What if the database needed to be accessed by 100+ people?**
* The more people accessing the database the more CPU resources we need to get a fast experience. By using a distributed database we can improve the replications and partitioning to get faster query results for each user.