# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--
I have broken down the 2018 New York parking ticket data so that more insite can be analyized

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit, concat
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
I plan on warehousing New Yorks parking ticket dataset inorder to find more insite on the trends that are happening when parking officers are handing out tickets. I'm using the New York City Parking Violation data and parking code data to add more insite to the main dataset for fiscial year 2018. My end solution will be a datalake with Dim tables Registration, Vehicle, Violation Location, and Violation Details. My Fact table will be the Ticket table. I used Spark and AWS S3 to create a datalake. 

#### Describe and Gather Data 
Describe the data sets you're using. 

- The first datset that I am using is City of New York parking voilation tickets that happend in 2018 which can be found at this link https://data.cityofnewyork.us/City-Government/Parking-Violations-Issued-Fiscal-Year-2018/faiq-9dfq.

The information included in the dataset includes:

Summons Number
Plate ID
Registration State
Plate Type
Issue Date
Violation Code
Vehicle Body Type
Issuing Agency
Street Code1
Street Code2
Street Code3
Vehicle Expiration Date
Violation Location
Violation Precinct
Issuer Precinct
Issuer Code
Issuer Command
Issuer Squad
Violation Time
Time First Observed
Violation County
Violation In Front Of Or Opposite
House Number
Street Name
Intersecting Street
Date First Observed
Law Section
Sub Division
Violation Legal Code
Days Parking In Effect
From Hours In Effect
To Hours In Effect
Vehicle Color
Unregistered Vehicle?
Vehicle Year
Meter Number
Feet From Curb
Violation Post Code
Violation Description
No Standing or Stopping Violation
Hydrant Violation


Where did it come from? What type of information is included? 

- The second datset that I am using is the parking code discription which can be found at this link https://catalog.data.gov/dataset/dof-parking-violation-codes-63051.

The information included in the dataset includes:

Code
Discription
Manhattan 96th street & below
All other areas



In [2]:
spark = SparkSession \
        .builder \
        .appName("Capstone Cluster") \
        .getOrCreate()

#### This fist dataset is the main dataset that has 4001111 rows

In [3]:
# Read in the data here
df_ticket = spark.read.format("csv").option("header", "true").load("parking-violations-issued-fiscal-year-2018.csv")

In [4]:
pd.set_option('display.max_columns', 999)

In [5]:
df_ticket.count()

4001111

In [6]:
df_ticket.limit(5).toPandas()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Violation Location,Violation Precinct,Issuer Precinct,Issuer Code,Issuer Command,Issuer Squad,Violation Time,Time First Observed,Violation County,Violation In Front Of Or Opposite,House Number,Street Name,Intersecting Street,Date First Observed,Law Section,Sub Division,Violation Legal Code,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,1105232165,GLS6001,NY,PAS,2018-07-03T00:00:00.000,14,SDN,HONDA,X,47130,13230,80030,20180702.0,78,78,968,86684,968,0,0811P,,K,F,2,HANSON PLACE,,0,408,D1,,BBYBBBB,ALL,ALL,BLUE,0,2006,-,0,,,,,
1,1121274900,HXM7361,NY,PAS,2018-06-28T00:00:00.000,46,SDN,NISSA,X,28990,14890,15040,20200203.0,112,112,968,103419,968,0,1145A,,Q,F,71-30,AUSTIN ST,,0,408,C,,BBBBBBB,ALL,ALL,GRY,0,2017,-,0,,,,,
2,1130964875,GTR7949,NY,PAS,2018-06-08T00:00:00.000,24,SUBN,JEEP,X,64,18510,99,20180930.0,122,122,835,0,835,0,0355P,,R,,,GREAT KILLS BOAT LAU,,0,408,D5,,BBBBBBB,ALL,ALL,GREEN,0,0,-,0,,,,,
3,1130964887,HH1842,NC,PAS,2018-06-07T00:00:00.000,24,P-U,FORD,X,11310,39800,39735,0.0,122,122,835,0,835,0,0123P,,R,,,GREAT KILLS PARK BOA,,0,408,D5,,BBBBBBB,ALL,ALL,WHITE,0,0,-,0,,,,,
4,1131599342,HDG7076,NY,PAS,2018-06-29T00:00:00.000,17,SUBN,HYUND,X,47130,13230,80030,20190124.0,78,78,868,2354,868,0,0514P,,K,F,2,HANSON PLACE,,0,408,C4,,BBBBBBB,ALL,ALL,GREEN,0,2007,-,0,,,,,


In [7]:
df_ticket_code = spark.read.json("parking_violation codes.json", multiLine=True)

In [8]:
df_ticket_code.limit(5).toPandas()

Unnamed: 0,data,meta
0,"[[row-vc2y~qug8_qh44, 00000000-0000-0000-CF04-...","((Department of Finance (DOF), http://www.nyc...."


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In the main ticket file there were a lot of null values:

- Violation Post Code
- Violation Description
- No Standing or Stopping Violation
- Hydrant Violation

It seemed that these values where not used. as for the rest of these data seemed fine in terms of completeness, but there were duplicates just due to the data not being normailzed.

The second dataset has the parking codes that are complete, but I only needed the 'Code' and 'Definition' columns. The other columns didn't seem st match the data that I needed so I removed it.

#### Cleaning Steps
Please see the following steps that I did to clean the Codes data. The main Ticket data did not really need any cleaning. most of the columns that had lots of nulls or data that wasn't I simply didn't select when creating the tables. Basiclly, I dropped those columns. 

- Get the Code and Definition data from the JSON payload and put them in a list

In [9]:
code_list = []
definition_list = []
for data in df_ticket_code.toPandas().data[0]:
    code_list.append(data[8])
    definition_list.append(data[9])

- Create a dataframe with columns Code and Definition

In [10]:
df_codes = pd.DataFrame(columns=['Code','Definition'])

- Put the lists into the previouly created dataframe

In [11]:
df_codes['Code'] = code_list
df_codes['Definition'] = definition_list

- Cast the Pandas Dataframe to a Spark Dataframe

In [12]:
df_codes_spark  = spark.createDataFrame(df_codes)

In [13]:
df_codes_spark.toPandas().head()

Unnamed: 0,Code,Definition
0,30,NO STOP/STANDNG EXCEPT PAS P/U
1,60,ANGLE PARKING
2,13,NO STANDING-TAXI STAND
3,73,REG STICKER-MUTILATED/C'FEIT
4,38,FAIL TO DSPLY MUNI METER RECPT


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Please see the ERD PDF that I included.

- I choose to make the Ticket Table the Fact Table because most of the data that it contains are time stamps and id that are linked to all the other discriptive data that linkd the DIM tables.

- The Vehicle table is the first DIM table I create. It's PK in the License plate number. I thought that it was the most appopreate for a unique indentifier for a car since the VIN number is not something that was included in the dataset. The other attributs are things that describe the car -- Color, Make, Body type ect...

- The Registration table has the Plate ID as the PK. The other attibutes are things that describe a cars registration like if it has expired or what state the car has registraton in.




#### 3.2 Mapping Out Data Pipelines

- Select the Plate ID, Vehicle Make, Vehicle Body Type, Vehile Color and Vehicle Year from df_tick dataframe and assign to the df_vehicle_table
- Select the Plate ID, Plate Type, Regestration State Vehicle Experation Date, Unregestration Expired Date and assign to the df_registration_table.
- Select the Street Code1, Street Code3, Street Code3, Violation Precinct, Violation County, House Number, Street Name, Days Parking in Effect, From Hours In Effect, and To Hours In Effect and assign to df_violation_location_table.
- Select df_violation_location_table and concatonate the street_code1, street_code2, and street_code3 to make a primary key.




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Creating Vehicle Table

In [14]:
df_vehicle_table = df_ticket.select(col('Plate ID').alias('plate_id'), col('Vehicle Make').alias('vehicle_make')\
                                    ,col('Vehicle Body Type').alias('vehicle_body_type'), col('Vehicle Color').alias('vehicle_color')\
                                    ,col('Vehicle Year').alias('vehicle_year'))

### Create registration table

In [15]:
df_registration_table = df_ticket.select(col('Plate ID').alias('plate_id'), col('Plate Type').alias('plate_type')\
                                         ,col('Registration State').alias('registration_state'), col('Vehicle Expiration Date').alias('registration_expired_date')\
                                        ,col('Unregistered Vehicle?').alias('unregistered_vehicle'))

### Create Violation Location Table

In [16]:
df_violation_location_table = df_ticket.select(col('Street Code1').alias('street_code1'), col('Street Code2').alias('street_code2')\
                                         ,col('Street Code3').alias('street_code3'), col('Violation Precinct').alias('violation_precinct')\
                                        ,col('Violation County').alias('violation_county'),col('House Number').alias('house_number')
                                        ,col('Street Name').alias('street_name'),col('Days Parking In Effect    ').alias('parking_enforced_days')
                                        ,col('From Hours In Effect').alias('from_enforced_hours'),col('To Hours In Effect').alias('to_enforced_hours'))

In [17]:
df_violation_location_table = df_violation_location_table.withColumn("street_code_key", \
                                    concat(col("street_code1"), lit('-'),col("street_code2"), lit('-'),col("street_code3"))) 

#### Join Codes table with main tables code details columns

In [31]:
df_codes_joined_spark = df_codes_spark.join(df_ticket.select(col('Law Section'), col('Sub Division'), col('Violation Code'))).where(df_ticket['Violation Code'] == df_codes_spark['Code'])

In [32]:
df_codes_joined_spark1 = df_codes_joined_spark.select(col('Code').alias('code'), col('Definition').alias('definition'), col('Law Section').alias('law_section'), col('Sub Division').alias('sub_division')).dropDuplicates()

In [34]:
df_codes_joined_spark1.limit(5).toPandas()

Unnamed: 0,code,definition,law_section,sub_division
0,51,SIDEWALK,408,E5
1,11,NO STANDING-HOTEL LOADING,408,C7
2,69,FAIL TO DISP. MUNI METER RECPT,408,H1
3,8,IDLING,408,J2
4,47,DOUBLE PARKING-MIDTOWN COMML,408,L4


#### Create the Ticket Fact Table

In [None]:
ticket_fact_df = df_ticket.join(df_violation_location_table).where((df_ticket['Street Code1'] == df_violation_location_table['street_code1']) & (df_ticket['Street Code2'] == df_violation_location_table['street_code2']) & (df_ticket['Street Code3'] == df_violation_location_table['street_code3'])).dropDuplicates()

#### Alias the Columns to snake case

In [None]:
ticket_fact_df = ticket_fact_df.select(col('Summons Number').alias('summons_number'), col('Plate ID').alias('plate_id'), col('Issue Date').alias('issue_date'), col('Violation Code').alias('violation_code'), col('street_code_key')).limit(100).toPandas()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

if ticket_fact_df.count() = 0:
    print("error ticket_fact_df is empty")

if df_violation_location_table.count() = 0:
    print("error ticket_fact_df is empty")
    
if df_registration_table.count() = 0:
    print("df_registration_table is empty")
    
if df_codes_joined_spark.count() = 0:
    print("df_codes_joined_spark is empty")
    
if df_vehicle_table.count() = 0:
    print("df_vehicle_table is empty")
    
    
if ticket_fact_df.count() > ticket_fact_df.dropDuplicates().count():
    print("error ticket_fact_df has duplicates")
    
if df_violation_location_table.count() > df_violation_location_table.dropDuplicates().count():
    print("df_violation_location_table has duplicates")
    
if df_registration_table.count() > df_registration_table.dropDuplicates().count():
    print("df_registration_table has duplicates")
    
if df_codes_joined_spark.count() > df_codes_joined_spark.dropDuplicates().count():
    print("df_codes_joined_spark has duplicates")
    
if df_vehicle_table.count() > df_vehicle_table.dropDuplicates().count():
    print("df_vehicle_table has duplicates")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

- Please see the ERD I attached.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

- I chose Spark because it is fast and can scale. For this project I was able to get up and running with little configureation. If I was using Redshift I would need to take into consideration of how many CPU's\cores that I needed as my data scaled. Spark is also nice because it offers SQL type commands or Pandas commands, so it doesn't matter if you come from a SQL background or Pandas background, your going to be able to use the skills you are stronest with -- you can't do that with Redshift, Postgres, or Airflow. S3 is also great because I can control who has access to what datasets. 

- This data only needs to be updated once a year because the govenment only publishes a new data set once a year.

- If the data was increased by 100 times I wouldn't do anything because if is the fastest option due to the data needing to be processed once a year in bulk.

-- if the data needed to be run on a daily basis at 7am I would have probably used AirFlow. The scheduling funtionality as well ad the DAG work flow would better accomidate the refresh of data as well as it there was an issue and the data needed to be back-filled. None of the other options have that kind of robust funationality that we have covered in this class. 

- S3 is not really a datbase, but I would be able to add people to groups that only gave them access to buckets that they where assigned to. If I was to have a database that required a large amount of users, I would use Redshift because of the amount of computation power it has compaired to Postgres. 