# ETL - Columbus City Parking Violations and Ticket Status 2013-2018

In [None]:
# Activate your python environment

# Make sure to install this before running:
# pip install pandas
# pip install sqlalchemy
# pip install psycopg2
# pip install datetime

import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime

#### Read 1 of 3 CSV Files

In [None]:
# https://discovery.smartcolumbusos.com/dataset/conduent/160c98a1_ad56_4658_8553_5ee8e7d0d953
# Download this file from google drive link in readme as this is a big file
data_csv = "Parking_data.csv"
data_df = pd.read_csv(data_csv)
data_df.head()

#### Read 2 of 3 CSV Files

In [None]:
#http://opendata.columbus.gov/datasets/parking-meters/data
meters_csv = "Parking_Meters.csv"
meters_df = pd.read_csv(meters_csv)
meters_df.head()

#### Read 3 of 3 CSV Files

In [None]:
#The metadata file to decode the column names
#https://data.world/smartcolumbusos/040b5929-db26-4453-920a-ceb282c4359f/workspace/file?filename=geocoded-parking-violations-csv-5.csv
columns_csv = "metadata.csv"
columns_df = pd.read_csv(columns_csv, header=0, encoding = 'unicode_escape')
columns_df.head()

In [None]:
# Rename viol to violation code - was getting a KeyError: "['viol'] not in index" in the next step
data_df = data_df.rename(columns={"viol": "violation_code"})

#### Select columns from both the datasets

In [None]:
# Select required columns from parking_data
data_df = data_df[['ticket','iss dt','fine','violation_code','entity','make','iss time','lat','long','hold ct','badge','pay amt','location','meter']]

# Rename columns 
data_df = data_df.rename(columns={"ticket": "ticket_id",
                                "iss dt": "issue_date", 
                                  "fine": "fine",
                                 "violation_code": "violation_code", 
                                  "entity": "entity",
                                 "make": "car_make", 
                                  "iss time": "issue_time",
                                 "lat": "latitude", 
                                  "long": "longitude",
                                 "hold ct": "total_tickets", 
                                  "badge": "officer_badge",
                                 "pay amt": "amount_paid",
                                 "location": "location",
                                 "meter": "meter_id"})
data_df

In [None]:
# Select required columns from parking_meter
# Rename columns - easier to merge later
meters_df = meters_df.rename(columns={"METER_ID": "meter_id","METER_STATUS": "meter_status","RATE": "rate" })
meters_df.drop(meters_df.columns[[0,1,2,4,5,6,8,9,10,11,12,13,15,16,17,18]], axis = 1, inplace = True) 
meters_df.head()

#### Merge the two dataframes

In [None]:
# Merge the two DataFrames (data_df and meters_df) together based on the Meter Ids they share
merge_df = pd.merge(data_df, meters_df, on= "meter_id")
merge_df.dropna(subset=['meter_id'])
merge_df

#### Clean the merge data further

 - Drop latitude/longitude columns with values of 0
 - Add decimal points to both latitude and longitude
 - Set the the values in longitude column to all negative.
 - Convert Julian dates to normal dates in a dataframe and delete the old index
 - For loop to convert the military time into regular time and save it into list
 - Convert object to float
 - Reset index and delete old index

In [None]:
# Drop latitude/longitude columns with values of 0:
merge_df.drop(merge_df.index[merge_df['latitude'] == 0], inplace = True)
merge_df.drop(merge_df.index[merge_df['longitude'] == 0], inplace = True)

# Add decimal points to both latitude and longitude
merge_df['latitude'] = merge_df['latitude'].apply(lambda x: x / 10 ** (len((str(x))) - 2))
merge_df['longitude'] = merge_df['longitude'].apply(lambda x: x / 10 ** (len((str(x))) - 2))

# Set the longitude column to be all negative. 
merge_df.longitude = merge_df.longitude*(-1) 

# Convert Julian dates to normal dates in a dataframe 
merge_df['date_issued'] = (pd.to_datetime((merge_df.issue_date // 1000).astype(str)) + 
                 pd.to_timedelta(merge_df.issue_date % 1000, unit='D'))

# Delete the old column with julian date from which it was converted from
merge_df = merge_df.loc[:, ~merge_df.columns.str.contains('^issue_date')]

# For loop to convert the military time into regular time and save it into list
time_list = []
for time in merge_df['issue_time']:
    x = datetime.strptime(str(time),'%H%M').strftime('%I:%M %p')
    time_list.append(x)

merge_df['issue_time'] = time_list

# Reset index and delete old index
merge_df.reset_index(drop=True)

In [None]:
# Drop a row observation by condition
merge_df= merge_df[merge_df.amount_paid != '($50.00)']

In [None]:
# Convert object to float
merge_df['fine'] = merge_df['fine'].str.replace('$', '').astype(float)
merge_df['amount_paid']= merge_df['amount_paid'].str.replace('$', '').astype(float)

In [None]:
merge_df.dtypes

In [None]:
# Reset index and delete old index
merge_df.reset_index(drop=True)

In [None]:
#Saving the final Dataframe to CSV
merge_df.to_csv('final_data.csv', encoding='utf-8', index=False)

In [None]:
#Generate descriptive statistics that summarize the central tendency, dispersion and shape of a dataset’s distribution, excluding NaN values.
merge_df['fine'].describe()

In [None]:
# Confirming our primary key is unique
merge_df['ticket_id'].nunique()

# Connect to local database

In [None]:
rds_connection_string = "postgres:postgres@localhost:5432/parking_db"
engine = create_engine(f'postgresql://{rds_connection_string}')

#### Create new data (parking_fines) with select columns 

In [None]:
parking_fines_df = merge_df[['ticket_id', 'fine', 'amount_paid', 'date_issued', 'issue_time']].copy()
parking_fines_df.reset_index(drop=True)

#### Create new data (parking_cars) with select columns

In [None]:
parking_cars_df = merge_df[['ticket_id', 'car_make', 'total_tickets']].copy()
parking_cars_df.reset_index(drop=True)

#### Create new data (parking_location) with select columns

In [None]:
parking_locations_df = merge_df[['ticket_id', 'latitude', 'longitude']].copy()
parking_locations_df.reset_index(drop=True)

#### Make sure the above three tables are created in postgres as well before you start the next steps

### Check for tables

In [None]:
engine.table_names()

### Use pandas to load csv converted DataFrame into database

In [None]:
parking_fines_df.to_sql(name='parking_fines', con=engine, if_exists='append', index=False)

In [None]:
parking_cars_df.to_sql(name='parking_cars', con=engine, if_exists='append', index=False)

In [None]:
parking_locations_df.to_sql(name='parking_location', con=engine, if_exists='append', index=False)

### Confirm data has been added by querying the customer_name table
* NOTE: can also check using pgAdmin

In [None]:
pd.read_sql_query('select * from parking_fines', con=engine).head()

In [None]:
pd.read_sql_query('select * from parking_cars', con=engine).head()

In [None]:
pd.read_sql_query('select * from parking_location', con=engine).head()