# ETL Process of Crime Data

## Importing the required packages

In [1]:
#!pip install google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client
#!pip3 install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
# Import necessary packages
import pandas as pd
import requests 
from pathlib import Path
import sys
import os
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2 import service_account
import io
from google.cloud import storage
 
# getting the name of the directory where this file is present.
current = os.path.dirname(os.path.realpath("__file__"))
# Getting the parent directory name where the current directory is present.
parent = os.path.dirname(current)

## Extract - Google Cloud Platform

### Extract using bucket on GCP

In [None]:
# read raw 'nypd_complaint.csv' located in the Google Cloud Storage bucket named 'safehouse_nyc_bucket'
crime_df = pd.read_csv('gs://safehouse_nyc_bucket/nypd_complaint.csv')

# Print the first row of the DataFrame
print(crime_df.head(1))

In [None]:
crime_df.shape

In [None]:
crime_df.head()

In [None]:
crime_df.isna().sum()

## Transform

In [None]:
# Select only relevant columns of the crime dataset
crime_df = crime_df[['OFNS_DESC','BORO_NM', 'PREM_TYP_DESC','SUSP_AGE_GROUP', 
                        'SUSP_RACE', 'SUSP_SEX', 'Latitude','Longitude']]

In [None]:
# drop all rows that have atleast 1 NA
crime_df = crime_df.dropna()

In [None]:
len(crime_df)

In [None]:
crime_df.isna().sum()

In [None]:
# Extracting the unique latitude & longitude key value pairs from transformed data
unique_count = crime_df.groupby(['Latitude', 'Longitude']).size().reset_index(name='count')

print(unique_count)

### Dividing the data into 3 batches for parallel processing on GCP

We divide the data into 3 batches and 3 csv files. The 3 csv files are written to the Cloud Storage bucket for parallel processing. The processing is necessary as we have 7 million records and we are using the Nominatim API provided by the OpenStreetMap project that enables us to convert geographic coordinates (latitude and longitude) into zipcodes. The zipcodes will be used later for neighborhood-wise crime analysis

In [None]:
# Set the GCS bucket name and file path
bucket_name = 'safehouse_nyc_bucket'
# Create a GCS client
client = storage.Client()
# Get the GCS bucket
bucket = client.bucket(bucket_name)

In [None]:
# batch 1
unique_count_1 = unique_count.iloc[:60000, ]

In [None]:
unique_count_1.info()

In [None]:
file_name = 'unique_count_1.csv'
# Convert the DataFrame to a CSV string
csv_1_string = unique_count_1.to_csv(index=False)
# Create a new blob and upload the CSV string on bucket
blob_1 = bucket.blob(file_name)
blob_1.upload_from_string(csv_1_string, content_type='text/csv')

In [None]:
# batch 2
unique_count_2 = unique_count.iloc[60000:120000, ]

In [None]:
unique_count_2.info()

In [None]:
file_name = 'unique_count_2.csv'
# Convert the DataFrame to a CSV string
csv_2_string = unique_count_2.to_csv(index=False)
# Create a new blob and upload the CSV string
blob_2 = bucket.blob(file_name)
blob_2.upload_from_string(csv_2_string, content_type='text/csv')

In [None]:
# batch 3
unique_count_3 = unique_count.iloc[120000:176964, ]

In [None]:
unique_count_3.info()

In [None]:
file_name = 'unique_count_3.csv'
# Convert the DataFrame to a CSV string
csv_3_string = unique_count_3.to_csv(index=False)
# Create a new blob and upload the CSV string
blob_3 = bucket.blob(file_name)
blob_3.upload_from_string(csv_3_string, content_type='text/csv')

The parallel processing is done on 3 different notebooks in 3 different environments on GCP.
The notebooks are in the doc folder and are named:
1.crime_analysis_1.ipynb
2.crime_analysis_2.ipynb
3.crime_analysis_3.ipynb

These notebooks call the safehouse_functions.py method and use the get_postcode() function that reads lat and lon data from the csv files and extracts zipcode data.
The zipcode data for each of these files is again stored in our GCP bucket.

All the generated csv files are in the output folder of the project.

### Combine the 3 csv files with zip code data generated from parallel processing above

In [None]:
# Read csv from gs bucket
zip1 = pd.read_csv('gs://safehouse_nyc_bucket/zip1.csv')
zip1.head()

In [None]:
zip1.info()

In [None]:
zip2 = pd.read_csv('gs://safehouse_nyc_bucket/zip2.csv')
zip2.head()

In [None]:
zip2.info()

In [None]:
zip3 = pd.read_csv('gs://safehouse_nyc_bucket/zip3.csv')
zip3.head()

In [None]:
zip3.info()

In [None]:
# Combine all 3 files
zip_unique = pd.concat([zip1, zip2, zip3], axis=0)

In [None]:
zip_unique

### Merge original transformed crime data with above zip code data

In [None]:
crime_with_zip = pd.merge(crime_df, zip_unique[['Latitude', 'Longitude', 'zipcode']], on = ['Latitude', 'Longitude'], how = "left")
crime_with_zip.head()

### Write this final transfomed data as csv to bucket

This final data generated is in the output folder named: crime_with_zip.csv

In [None]:
from google.cloud import storage
# Set the GCS bucket name and file path
bucket_name = 'safehouse_nyc_bucket'
# Create a GCS client
client = storage.Client()
# Get the GCS bucket
bucket = client.bucket(bucket_name)

file_name = 'crime_with_zip.csv'
# Convert the DataFrame to a CSV string
csv_1_string = crime_with_zip.to_csv(index=False)
# Create a new blob and upload the CSV string
blob_1 = bucket.blob(file_name)
blob_1.upload_from_string(csv_1_string, content_type='text/csv')

## Load Crime Data into MongoDb

Read the final transformed data crime_with_zip.csv and store in MongoDb

In [7]:
file_path = parent + "/output/crime_with_zip.csv"
final_crime_data = pd.read_csv(file_path)

/Users/srushtisanghavi/Documents/COLUMBIA_COURSEWORK/Spring_23/Managing_Data/Project/Untitled/SafehouseNYC


  final_crime_data = pd.read_csv(file_path)


In [8]:
final_crime_data.head()

Unnamed: 0,CMPLNT_NUM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,ADDR_PCT_CD,RPT_DT,KY_CD,OFNS_DESC,PD_CD,...,TRANSIT_DISTRICT,Latitude,Longitude,Lat_Lon,PATROL_BORO,STATION_NAME,VIC_AGE_GROUP,VIC_RACE,VIC_SEX,zipcode
0,700381962,05/28/2015,15:00:00,,,46.0,06/01/2015,578,HARRASSMENT 2,638.0,...,,40.845868,-73.915888,"(40.84586773, -73.915888033)",PATROL BORO BRONX,,25-44,WHITE HISPANIC,F,10452.0
1,642234217,10/28/2013,13:50:00,10/28/2013,13:50:00,120.0,10/28/2013,351,CRIMINAL MISCHIEF & RELATED OF,259.0,...,,40.627061,-74.077149,"(40.627060894, -74.077149232)",PATROL BORO STATEN ISLAND,,45-64,WHITE,M,10304.0
2,242465164,05/09/2012,20:50:00,05/09/2012,21:00:00,24.0,05/09/2012,236,DANGEROUS WEAPONS,782.0,...,,40.800966,-73.969047,"(40.800965968, -73.969047272)",PATROL BORO MAN NORTH,,,UNKNOWN,E,10025.0
3,927207428,01/03/2014,13:30:00,01/03/2014,13:35:00,108.0,01/03/2014,109,GRAND LARCENY,409.0,...,,40.745242,-73.894253,"(40.745241809, -73.894253382)",PATROL BORO QUEENS NORTH,,45-64,ASIAN / PACIFIC ISLANDER,M,
4,492142357,04/13/2016,00:00:00,,,40.0,04/13/2016,351,CRIMINAL MISCHIEF & RELATED OF,258.0,...,,40.810352,-73.924942,"(40.810351863, -73.924942326)",PATROL BORO BRONX,,UNKNOWN,UNKNOWN,E,10454.0


In [12]:
final_crime_data = final_crime_data[['OFNS_DESC','BORO_NM', 'PREM_TYP_DESC','SUSP_AGE_GROUP', 
                        'SUSP_RACE', 'SUSP_SEX', 'Latitude','Longitude', 'zipcode']]

In [13]:
final_crime_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6989522 entries, 0 to 6989521
Data columns (total 9 columns):
 #   Column          Dtype  
---  ------          -----  
 0   OFNS_DESC       object 
 1   BORO_NM         object 
 2   PREM_TYP_DESC   object 
 3   SUSP_AGE_GROUP  object 
 4   SUSP_RACE       object 
 5   SUSP_SEX        object 
 6   Latitude        float64
 7   Longitude       float64
 8   zipcode         float64
dtypes: float64(3), object(6)
memory usage: 479.9+ MB


In [14]:
# Converting dataframe to dictionary
crime_dict = final_crime_data.to_dict('records') # orient = records to convert into list of dictionary
crime_dict[:5]

[{'OFNS_DESC': 'HARRASSMENT 2',
  'BORO_NM': 'BRONX',
  'PREM_TYP_DESC': 'RESIDENCE - APT. HOUSE',
  'SUSP_AGE_GROUP': '25-44',
  'SUSP_RACE': 'BLACK',
  'SUSP_SEX': 'M',
  'Latitude': 40.84586773,
  'Longitude': -73.915888033,
  'zipcode': 10452.0},
 {'OFNS_DESC': 'CRIMINAL MISCHIEF & RELATED OF',
  'BORO_NM': 'STATEN ISLAND',
  'PREM_TYP_DESC': 'RESIDENCE - APT. HOUSE',
  'SUSP_AGE_GROUP': nan,
  'SUSP_RACE': nan,
  'SUSP_SEX': nan,
  'Latitude': 40.627060894,
  'Longitude': -74.077149232,
  'zipcode': 10304.0},
 {'OFNS_DESC': 'DANGEROUS WEAPONS',
  'BORO_NM': 'MANHATTAN',
  'PREM_TYP_DESC': 'STREET',
  'SUSP_AGE_GROUP': nan,
  'SUSP_RACE': nan,
  'SUSP_SEX': nan,
  'Latitude': 40.800965968,
  'Longitude': -73.969047272,
  'zipcode': 10025.0},
 {'OFNS_DESC': 'GRAND LARCENY',
  'BORO_NM': 'QUEENS',
  'PREM_TYP_DESC': 'RESIDENCE - APT. HOUSE',
  'SUSP_AGE_GROUP': nan,
  'SUSP_RACE': 'UNKNOWN',
  'SUSP_SEX': 'M',
  'Latitude': 40.745241809,
  'Longitude': -73.894253382,
  'zipcode': nan

In [None]:
from pymongo import MongoClient

client = MongoClient('localhost', 27017)

# Creating or accessing database companies_db from mongodb
crime_db = client.crime_db

# Creating or accessing collections companies from mongodb
crime = crime_db.crime

# Check to see if database is created and if it exists
print(client.list_database_names())

In [None]:
# Inserting records from dictionary into mongodb collection
crime.insert_many(crime_dict)
print("Successfully inserted records in mongoDb")

## The EDA process and graphs are created in Safehouse_NYC_WebApp.ipynb

The graphs will be created from data fetched from Mongodb in real time based on user input of zipcode