# ETL & Datawarehouse for Traffic Acident Analysis

## Project Summary
The project will walk through an ETL process for bring together US traffic accidents, COVID cases and city population data from different sources and formats.  The goal will be to analyze how COVID cases in 2020 and population density affects traffic accident volumes in the US.

**The project follows the follow steps:**
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

**Scope & Data**  
Data will be loaded directly from s3 to Redshift. This will help avoid setting-up a server that requires large amounts of storage. Redshidt will be leveraged as the Datawarehouse that will allow for storing and querying large amounts of data leverging distributed computing and parallel processing.

The following sources will be used for the data:

- **[4.2 Million Traffic accidents](https://www.kaggle.com/sobhanmoosavi/us-accidents)**
    - This is a countrywide car accident dataset from Kaggle, which covers 49 states of the USA. The accident data are collected from February 2016 to Dec 2020. There are about 4.2 million accident records in this dataset in CSV format.
    - A description of each field is documented here: https://smoosavi.org/datasets/us_accidents
    - Moosavi, Sobhan, Mohammad Hossein Samavatian, Srinivasan Parthasarathy, and Rajiv Ramnath. “A Countrywide Traffic Accident Dataset.”, 2019.
    - Moosavi, Sobhan, Mohammad Hossein Samavatian, Srinivasan Parthasarathy, Radu Teodorescu, and Rajiv Ramnath. "Accident Risk Prediction based on Heterogeneous Sparse Data: New Dataset and Insights." In proceedings of the 27th ACM SIGSPATIAL International Conference on Advances in Geographic Information Systems, ACM, 2019.
- **[City data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/table/)**
    - Includes US city demographic data stored in JSON format from Opendatasoft
- **[US covid cases](https://ourworldindata.org/covid-cases?country=~USA)**
    - A dataset from ourworldindata.org that includes information about daily COVID cases for the US and other countries



## Step 2: Explore and Assess the Data
We will load the above 3 data sets to review the different columns they provide to define the table schemas, and check if any cleaning steps are required.

In [20]:
import pandas as pd
pd.set_option('display.max_columns', None)

### Traffic accident data exploration and cleaning
We load a partial number rows to view the columns and data. 

In [22]:
# Traffic accidents data
accident_df = pd.read_csv('capstone_data/US_Accidents_Dec20.csv', nrows=10)
accident_df.head()

Unnamed: 0,ID,Source,TMC,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Number,Street,Side,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
0,A-1,MapQuest,201.0,3,2016-02-08 05:46:00,2016-02-08 11:00:00,39.865147,-84.058723,,,0.01,Right lane blocked due to accident on I-70 Eas...,,I-70 E,R,Dayton,Montgomery,OH,45424,US,US/Eastern,KFFO,2016-02-08 05:58:00,36.9,,91.0,29.68,10.0,Calm,,0.02,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Night
1,A-2,MapQuest,201.0,2,2016-02-08 06:07:59,2016-02-08 06:37:59,39.928059,-82.831184,,,0.01,Accident on Brice Rd at Tussing Rd. Expect del...,2584.0,Brice Rd,L,Reynoldsburg,Franklin,OH,43068-3402,US,US/Eastern,KCMH,2016-02-08 05:51:00,37.9,,100.0,29.65,10.0,Calm,,0.0,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Day
2,A-3,MapQuest,201.0,2,2016-02-08 06:49:27,2016-02-08 07:19:27,39.063148,-84.032608,,,0.01,Accident on OH-32 State Route 32 Westbound at ...,,State Route 32,R,Williamsburg,Clermont,OH,45176,US,US/Eastern,KI69,2016-02-08 06:56:00,36.0,33.3,100.0,29.67,10.0,SW,3.5,,Overcast,False,False,False,False,False,False,False,False,False,False,False,True,False,Night,Night,Day,Day
3,A-4,MapQuest,201.0,3,2016-02-08 07:23:34,2016-02-08 07:53:34,39.747753,-84.205582,,,0.01,Accident on I-75 Southbound at Exits 52 52B US...,,I-75 S,R,Dayton,Montgomery,OH,45417,US,US/Eastern,KDAY,2016-02-08 07:38:00,35.1,31.0,96.0,29.64,9.0,SW,4.6,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Day,Day,Day
4,A-5,MapQuest,201.0,2,2016-02-08 07:39:07,2016-02-08 08:09:07,39.627781,-84.188354,,,0.01,Accident on McEwen Rd at OH-725 Miamisburg Cen...,,Miamisburg Centerville Rd,R,Dayton,Montgomery,OH,45459,US,US/Eastern,KMGY,2016-02-08 07:53:00,36.0,33.3,89.0,29.65,6.0,SW,3.5,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,True,False,Day,Day,Day,Day


### City demographic data exploration and cleaning
We load the JSON data obtained from Opendatasoft 

In [23]:
# City demographic data
import json 
  
# Opening JSON file 
f = open('capstone_data/us-cities-demographics.json') 

data = json.load(f)
f.close()

df = pd.DataFrame(data)
df.head()

Unnamed: 0,datasetid,recordid,fields,record_timestamp
0,us-cities-demographics,0074451cff52969855654d21497e9459f1108d8d,"{'count': 8791, 'city': 'Wichita', 'number_of_...",1969-12-31T16:00:00-08:00
1,us-cities-demographics,54b201cac9c7523363eb0cfeadc352a04fe016af,"{'count': 22304, 'city': 'Allen', 'number_of_v...",1969-12-31T16:00:00-08:00
2,us-cities-demographics,9dc3d4a59d7e3e2ad31ec5a6d3bab5fac67ee462,"{'count': 8454, 'city': 'Danbury', 'number_of_...",1969-12-31T16:00:00-08:00
3,us-cities-demographics,630ac8078919e7c8c96b861a336c66af27ffcc88,"{'count': 67526, 'city': 'Nashville', 'number_...",1969-12-31T16:00:00-08:00
4,us-cities-demographics,ae093b0dc0b8b9116176092b731533f5b008c75b,"{'count': 11013, 'city': 'Stamford', 'number_o...",1969-12-31T16:00:00-08:00


**Check the number of rows in the JSON file**

In [12]:
df.shape[0]

2891

**Cleaning the data by extracting the `fields` column and writing to a CSV file format**

In [24]:
dct_arr = []
for el in df['fields']:
    dct_arr.append(el)
    
labels = []
for key in dct_arr[0]:
    labels.append(key)

labels

['count',
 'city',
 'number_of_veterans',
 'male_population',
 'foreign_born',
 'average_household_size',
 'median_age',
 'state',
 'race',
 'total_population',
 'state_code',
 'female_population']

In [25]:
import csv

try:
    with open('capstone_data/cleaned-us-cities-demographics.csv', 'w') as f:
        writer = csv.DictWriter(f, fieldnames=labels)
        writer.writeheader()
        for elem in dct_arr:
            writer.writerow(elem)
except IOError:
    print("I/O error")

In [26]:
city_df = pd.read_csv('capstone_data/cleaned-us-cities-demographics.csv')
city_df.head()

Unnamed: 0,count,city,number_of_veterans,male_population,foreign_born,average_household_size,median_age,state,race,total_population,state_code,female_population
0,8791,Wichita,23978.0,192354.0,40270.0,2.56,34.6,Kansas,American Indian and Alaska Native,389955,KS,197601.0
1,22304,Allen,5691.0,60626.0,19652.0,2.67,33.5,Pennsylvania,Black or African-American,120207,PA,59581.0
2,8454,Danbury,3752.0,43435.0,25675.0,2.74,37.3,Connecticut,Black or African-American,84662,CT,41227.0
3,67526,Nashville,27942.0,314231.0,88193.0,2.39,34.1,Tennessee,Hispanic or Latino,654596,TN,340365.0
4,11013,Stamford,2269.0,64941.0,44003.0,2.7,35.4,Connecticut,Asian,128877,CT,63936.0


### COVID data exploration and cleaning

In [27]:
# COVID case data
covid_df = pd.read_csv('capstone_data/coronavirus-data-explorer.csv')
covid_df.head()

Unnamed: 0,Country name,Code,Day,Unnamed: 3,Population,Population density (people per km²),Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,Unnamed: 10,Unnamed: 11,Unnamed: 12,Unnamed: 13,Unnamed: 14,Unnamed: 15,Unnamed: 16,Unnamed: 17,Unnamed: 18,Unnamed: 19,Median age,Share aged 65+,Share aged 70+,GDP per capita (int.-$),Population in extreme poverty,Human Development Index,Hospital beds (per 1000),Stringency Index,Life expectancy,Unnamed: 29,Total vaccinations,Total vaccinations (per 100),New vaccinations,New vaccinations.1,New vaccinations (per 100),Total deaths,Total deaths (per 1M),New deaths,New deaths.1,New deaths (per 1M),New deaths (per 1M).1,New deaths.2,New deaths.3,New deaths (per 1M).2,New deaths (per 1M).3,Total cases,Total cases (per 1M),New cases,New cases.1,New cases (per 1M),New cases (per 1M).1,Daily new confirmed COVID-19 cases,Daily new confirmed COVID-19 cases.1,New cases (per 1M).2,New cases (per 1M).3,Daily new COVID-19 tests,"Daily new COVID-19 tests per 1,000 people",Total tests,Total tests (per 1K),Daily new COVID-19 tests.1,Daily new COVID-19 tests.2,Tests per case,Positive test rate,Tests conducted per confirmed case of COVID-19,Positive test rate.1,Unnamed: 65,Case fatality rate,Days since the 5th total confirmed death,Days since 5 daily new deaths first reported,Days since total confirmed deaths reached 0.1 per million,Days since the 100th confirmed case,Days since confirmed cases first reached 30 per day,Days since the total confirmed cases per million people reached 1,Unnamed: 73,Unnamed: 74,Unnamed: 75,Unnamed: 76,Weekly confirmed COVID-19 cases,Biweekly confirmed COVID-19 cases,Weekly confirmed COVID-19 deaths,Biweekly confirmed COVID-19 deaths,Week by week change of confirmed COVID-19 cases,Biweekly change of confirmed COVID-19 cases,Week by week change of confirmed COVID-19 deaths,Biweekly change of confirmed COVID-19 deaths,The share of COVID-19 tests that are positive,Cumulative tests conducted per confirmed case of COVID-19,Cumulative tests conducted per confirmed case of COVID-20,Tests per case.1,Positive test rate.2,Positive test rate.3,Tests per case.2,reproduction_rate,people_vaccinated,people_fully_vaccinated,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred
0,Afghanistan,AFG,2020-02-24,Asia,38928341.0,54.422,,,,,,,,,,597.029,9.59,,,37.746,18.6,2.581,1.337,1803.987,,0.511,0.5,8.33,64.83,,,,,,,,,,,,,,,,,1.0,0.026,1.0,1.0,0.026,0.026,,,,,,,,,,,,,,,,,,,,,,,1.0,1.0,,,7.0,14.0,,,,,,,,,,,,,,,,,,
1,Afghanistan,AFG,2020-02-25,Asia,38928341.0,54.422,,,,,,,,,,597.029,9.59,,,37.746,18.6,2.581,1.337,1803.987,,0.511,0.5,8.33,64.83,,,,,,,,,,,,,,,,,1.0,0.026,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,,,,,,0.5,0.5,,,3.5,7.0,,,,,,,,,,,,,,,,,,
2,Afghanistan,AFG,2020-02-26,Asia,38928341.0,54.422,,,,,,,,,,597.029,9.59,,,37.746,18.6,2.581,1.337,1803.987,,0.511,0.5,8.33,64.83,,,,,,,,,,,,,,,,,1.0,0.026,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,,,,,,0.333333,0.333333,,,2.333333,4.666667,,,,,,,,,,,,,,,,,,
3,Afghanistan,AFG,2020-02-27,Asia,38928341.0,54.422,,,,,,,,,,597.029,9.59,,,37.746,18.6,2.581,1.337,1803.987,,0.511,0.5,8.33,64.83,,,,,,,,,,,,,,,,,1.0,0.026,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,,,,,,0.25,0.25,,,1.75,3.5,,,,,,,,,,,,,,,,,,
4,Afghanistan,AFG,2020-02-28,Asia,38928341.0,54.422,,,,,,,,,,597.029,9.59,,,37.746,18.6,2.581,1.337,1803.987,,0.511,0.5,8.33,64.83,,,,,,,,,,,,,,,,,1.0,0.026,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,,,,,,0.2,0.2,,,1.4,2.8,,,,,,,,,,,,,,,,,,


**Extract specific columns relevant to load to reduce the size of data**

In [28]:
covid_df = covid_df[['Country name', 'Day', 'Stringency Index', 'Total vaccinations', 'Total deaths', 'Total cases', 
                     'Daily new confirmed COVID-19 cases', 'Biweekly confirmed COVID-19 cases']]

column_dict = {'Country name':'country_name', 
               'Day':'day',
               'Stringency Index':'stringency_index',
               'Total vaccinations': 'total_vaccinations',
               'Total deaths': 'total_deaths',
               'Total cases': 'total_cases',
               'Daily new confirmed COVID-19 cases': 'daily_new_cases',
               'Biweekly confirmed COVID-19 cases': 'biweekly_cases'}

covid_df.rename(columns = column_dict, inplace = True)

covid_df.to_csv('capstone_data/cleaned-coronavirus-data-explorer.csv', index=False,)

**Check the row count for COVID data**

In [8]:
covid_df.shape[0]

75071

**Check the row count for COVID data specific to the US**

In [9]:
rslt_df = covid_df.loc[covid_df['country_name'] == 'United States']
rslt_df.shape[0]

419

## Step 3: Define the Data Model

### 3.1 Conceptual Data Model

The database schema is normalized using a *star schema* and tables are structured in the following format. A star schema reduces data redundancy while also making queries for analytics simple by reducing complex joins. 

<img src="assets/img/schema.png" alt="schema" width="650" height="650"/>


### 3.2 Mapping Out Data Pipelines

**The ETL will follow the below steps**
1. Create staging and fact/dimension tables in Redshift
2. COPY data from S3 into staging tables in Redshift
3. Run INSERT INTO statements from staging table to fact/dimension tables


**The following scripts and files will be used for ETL**
- `dwh.cfg` includes parameters required access AWS resources such as S3 & Redshift.
- `sql_queries.py` includes all the ETL related SQL queries.
- `create_tables.py` to create the necessary SQL tables.
- `etl.py` to load data from S3 to Redshift and transform data in to Star schema.

## Step 4: Run Pipelines to Model the Data 

### 4.1 Create the data model

**1. Create tables in Redshift by running create_tables.py**  
create_table.py includes SQL query statements that will create both staging and fact/dimension tables.


In [29]:
!python create_tables.py

Creating table 
    CREATE TABLE IF NOT EXISTS accident_staging_table (
        id                      VARCHAR,
        source                  VARCHAR,
        tmc                     REAL,
        severity                INT,
        start_time              TIMESTAMP,
        end_time                TIMESTAMP,
        start_lat               NUMERIC(9,6),
        start_lng               NUMERIC(9,6),
        end_lat                 NUMERIC(9,6),
        end_lng                 NUMERIC(9,6),
        distance                REAL,
        description             VARCHAR(MAX),
        number                  REAL,
        street                  VARCHAR,
        side                    VARCHAR(1),
        city                    VARCHAR,
        county                  VARCHAR,
        state                   VARCHAR(5),
        zipcode                 VARCHAR,
        country                 VARCHAR,
        timezone                VARCHAR,
        airport_code            VARCHAR,
    

**2. COPY data from S3 to staging tables in Redshift**  
**3. Run transformations and load from staging to fact/diemenstion tables**  
etl.py includes both COPY SQL statements and INSERT INTO statements for loading and transformations.

In [30]:
!python etl.py

Copying data from S3:  
    COPY covid_staging_table FROM 's3://patrickcapstone/capstone_data/cleaned-coronavirus-data-explorer.csv'
    iam_role 'arn:aws:iam::489883123546:role/dwhRole'
    CSV
    IGNOREHEADER 1
    REGION 'us-west-2'

Copying data from S3:  
    COPY city_staging_table FROM 's3://patrickcapstone/capstone_data/cleaned-us-cities-demographics.csv'
    iam_role 'arn:aws:iam::489883123546:role/dwhRole'
    CSV
    IGNOREHEADER 1
    REGION 'us-west-2'

Copying data from S3:  
    COPY accident_staging_table FROM 's3://patrickcapstone/capstone_data/US_Accidents_Dec20.csv'
    iam_role 'arn:aws:iam::489883123546:role/dwhRole'
    CSV
    IGNOREHEADER 1
    REGION 'us-west-2'

Transforming data:  
    INSERT INTO accident_table (accident_id, severity, start_time_key, end_time, date_key, 
                                description, city_key, temprature, wind_chill, humidity, 
                                pressure, visibility, wind_speed, weather_condition, sunrise_sunset

### 4.2 Data Quality Checks

**Defining tables with variables types**  
When defining the tables, variables types have been defined and in some cases character lengths have been defined. **Primary Key** has been defined to ensure keys are unique and not `Null`.

**Check row count for tables**  
Run SQL queries to check row counts to validate data has been loaded properly into staging tables and transformation into fact/dimension tables have been done properly as well.

`Select Count(city)
From city_staging_table`  
Row count should be: 2,891


`Select Count(id)
From accident_staging_table`  
Row count should be: 4,232,541


`Select Count(id)
From covid_staging_table`  
Row count should be: 75,071


`Select Count(date_key)
From covid_table`   
Row count should be: 419


`Select Count(start_time_key)
From time_table`  
Row count should be: 3,634,540


`Select Count(accident_id)
From accident_table`  
Row count should be: 4,232,541


`Select Count(city_key)
From city_table`  
Row count should be: 596


### 4.3 Data dictionary 

A data dictionary is located under: data_dictionary/data_dictionary.xlsx

<img src="assets/data_dictionary/data_dictionary_screenshot.png" alt="dictionary" width="550" height="550"/>

## Step 5: Project Summary


### Rational
By extracting and loading data from S3 directly to Redshift, we avoid setting-up a server that requires large amounts of storage. Leveraging a Datawarehouse such as Redshift allows for storing and querying large amounts of data leverging distributed computing and parallel processing. In this case, we can easily query 4.2 million rows of accident data in the US across different cities. 

The database schema is normalized using a *star schema*. A star schema reduces data redundancy while also optimizing for specific queries.

Redshift distribution keys are used to distribute data across nodes in order for queries to scale even with large amounts of data. The largest table, the accident_table is distributed based on the **city_key** column.

The data should be updated daily to gather new covid case and accident data that is updated. 




### Use Cases
With this schema, we can run queries such as how COVID has affected the amount of traffic accidents on a given day. 

**Query example**
```
With accidents As 
	(Select date_key as date, Count(accident_id) as total_accidents
	From accident_table
	Group By date_key)

Select a.date, a.total_accidents, c.biweekly_cases, c.total_cases
From covid_table c
Join accidents a On a.date = c.date_key
```

**Visualization of data from Redshift**
This seems to show that traffic accidents did not seem to decrease even though covid cases increased. However, it does seem that there are drops in accidents during lockdown periods.
<img src="assets/img/visualization.png" alt="dictionary" width="550" height="550"/>


### Further Exploration

**Large datasets**
If larger amounts of data is required to be processed, Redshift nodes can be further scaled-up and scaled out. Another option would be to switch to using Spark to run the ETL pipeline.


**Automating/scheduling pipeline**
Airflow can be used in order to schedule the ETL process to run on a regular schedule. Setting-up a structured and automated ETL with DAGs will also make the ETL process more reliable. 

**Concurrent users**
If hundreds or thousands of user require access to query the data, Redshift support this with Concurrency Scaling. Spark would also allow for large numbers of concurrent users.
