# Part 1 - Initial Cleaning and Transforming of the Complaints Data

The following ETL section was done in VScode using pandas. \
Read in file containing complaints dataset from NYC Open Data Website 
https://data.cityofnewyork.us/Public-Safety/NYPD-Complaint-Data-Historic/qgea-i56i

In [0]:
crime = pd.read_csv('NYPD_Complaint_Data_Historic.csv', low_memory = False)

Separate date to day, month and year to replace incorrect dates

In [0]:
crime[['DAY','MONTH','YEAR']] = crime['CMPLNT_FR_DT'].str.split('/', 2, expand = True)

Replace incorrectly entered years

In [0]:
crime['YEAR'] = crime['YEAR'].replace(['1028'], '2018')
crime['YEAR'] = crime['YEAR'].replace(['1017'], '2017')
crime['YEAR'] = crime['YEAR'].replace(['1018'], '2018')
crime['YEAR'] = crime['YEAR'].replace(['1016'], '2016')
crime['YEAR'] = crime['YEAR'].replace(['1027'], '2017')
crime['YEAR'] = crime['YEAR'].replace(['1015'], '2015')
crime['YEAR'] = crime['YEAR'].replace(['1025'], '2015')
crime['YEAR'] = crime['YEAR'].replace(['1021'], '2021')
crime['YEAR'] = crime['YEAR'].replace(['1019'], '2019')
crime['YEAR'] = crime['YEAR'].replace(['1029'], '2019')
crime['YEAR'] = crime['YEAR'].replace(['1010'], '2020')
crime['YEAR'] = crime['YEAR'].replace(['1020'], '2020')
crime['YEAR'] = crime['YEAR'].replace(['1026'], '2016')

Recombine day, month and year to date

In [0]:
crime['DATE'] = crime[['DAY','MONTH','YEAR']].apply(lambda row: '/'.join(row.values.astype(str)), axis=1)

Drop previously created columns and column with incorrect dates

In [0]:
crime = crime.drop(['CMPLNT_FR_DT','DAY','MONTH','YEAR'], axis = 1)

Convert time related columns to datetime, then create a combined date and time column - This will take a while to run

In [0]:
import datetime
from datetime import date, time
crime = crime.astype({'DATE':str, 'CMPLNT_FR_TM':str})
crime['DATE_AND_TIME'] = pd.to_datetime(crime['DATE'] + ' ' + crime['CMPLNT_FR_TM'], errors = 'coerce')

Convert date column to datetime as well

In [0]:
crime['DATE'] = pd.to_datetime(crime['DATE'], format='%m/%d/%Y')

Sort to years 2017 and after

In [0]:
crime = crime[crime['DATE_AND_TIME'] >= '2017/01/01']

Remove rows with null coordinates

In [0]:
crime = crime.dropna(subset = ['Latitude'])
crime = crime.dropna(subset = ['Longitude'])

Export to csv in a folder for exported data - Uploaded to a blob in a storage container to pull from later

In [0]:
crime.to_csv(r'C:\Users\{file_path}\Exports\crime.csv', header=True)

# Part 2 - Further Cleaning of the Complaints Data

In [0]:
import pandas as pd
import datetime
import re

SAS_TOKEN = 'sp=racwdlmeop&st=2023-01-19T15:17:20Z&se=2023-02-10T23:17:20Z&spr=https&sv=2021-06-08&sr=c&sig=SNP1pr7qFgO1k1a8nm2MmfX9mp2EnPJKaBQ7eHEEgsg%3D'
CONTAINER = 'fg4'
STOR_ACCT = 'cohort40storage'
ROOT_PATH = f'wasbs://{CONTAINER}@{STOR_ACCT}.blob.core.windows.net/'

spark.conf.set(f'fs.azure.sas.{CONTAINER}.{STOR_ACCT}.blob.core.windows.net', SAS_TOKEN)

read_path = ROOT_PATH + 'mta-nypd/crime.csv'
# df = spark.read.format('csv').option('header',True).load(read_path)
all_nyc_complaints_spark = spark.read.csv(
    read_path, 
    header=True, 
    mode="DROPMALFORMED", 
    multiLine = True
)


The code above provides access to crime.csv file from a storage blob. This file is imported as a pyspark dataframe and covers complaints from 2017 to 2021 and in all five boroughs. Because this is such a large dataset, we will be breaking it down to borough and year. First, we remove columns that were deemed to be unnecessary, such as the park's name (if the crime occured at a park), using the code below.

In [0]:
nyc_complaints_spark = all_nyc_complaints_spark.drop('CMPLNT_FR_TM', 'CMPLNT_TO_DT', 'CMPLNT_TO_TM', 'CRM_ATPT_CPTD_CD', 'JURISDICTION_CODE', 'PARKS_NM', 'HADEVELOPT', 'HOUSING_PSA', 'X_COORD_CD', 'Y_COORD_CD', 'TRANSIT_DISTRICT', 'Lat_Lon')


The large dataset is broken down to smaller subsets since it was too large for pandas dataframe. Randomsplit was used to distribute the rows equally into 10 subsets.

In [0]:
df1,df2,df3,df4,df5,df6,df7,df8,df9,df10 = nyc_complaints_spark.randomSplit([0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1], seed = 0)


In [0]:
def only_borough(df):
    global BK
    global MN
    global BX
    global QN
    global SI
    global NA
    complaint_BK = df[df['BORO_NM'] == 'BROOKLYN']
    BK = pd.concat([BK, complaint_BK])
    complaint_MN = df[df['BORO_NM'] == 'MANHATTAN']
    MN = pd.concat([MN, complaint_MN])
    complaint_QN = df[df['BORO_NM'] == 'QUEENS']
    QN = pd.concat([QN, complaint_QN])    
    complaint_SI = df[df['BORO_NM'] == 'STATEN ISLAND']
    SI = pd.concat([SI, complaint_SI])
    complaint_BX = df[df['BORO_NM'] == 'BRONX']
    BX = pd.concat([BX, complaint_BX])
    complaint_NA = df[pd.isnull(df['BORO_NM'])]
    NA = pd.concat([NA, complaint_NA])

BK = pd.DataFrame()
MN = pd.DataFrame()
QN = pd.DataFrame()
SI = pd.DataFrame()
BX = pd.DataFrame()
NA = pd.DataFrame()

Six new dataframes were created, one for each borough and one for rows that did not name a borough. The helper function above filters out each row into its respective borough. Below is the actual iteration through all 10 subsets. The resulting six dataframes were converted back into a ppyspark dataframe and saved as csvs.

In [0]:
data = [df1, df2, df3, df4, df5, df6, df7, df8, df9, df10]    
boroughs = [MN, QN, BK, SI, NA, BX]
bor = ['MN', 'QN', 'BK', 'SI', 'NA', 'BX']
    
for i in data:    
    only_borough(pd_df)
    
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
mySchema = StructType([ StructField("CMPLNT_NUM", StringType(), True)\
                       ,StructField("ADDR_PCT_CD", StringType(), True)\
                       ,StructField("RPT_DT", StringType(), True)\
                       ,StructField("KY_CD", StringType(), True)\
                       ,StructField("OFNS_DESC", StringType(), True)\
                       ,StructField("PD_CD", StringType(), True)\
                       ,StructField("PD_DESC", StringType(), True)\
                       ,StructField("LAW_CAT_CD", StringType(), True)\
                       ,StructField("BORO_NM", StringType(), True)\
                       ,StructField("LOC_OF_OCCUR_DESC", StringType(), True)\
                       ,StructField("PREM_TYP_DESC", StringType(), True)\
                       ,StructField("JURIS_DESC", StringType(), True)\
                       ,StructField("SUSP_AGE_GROUP", StringType(), True)\
                       ,StructField("SUSP_RACE", StringType(), True)\
                       ,StructField("SUSP_SEX", StringType(), True)\
                       ,StructField("Latitude", StringType(), True)\
                       ,StructField("Longitude", StringType(), True)\
                       ,StructField("PATROL_BORO", StringType(), True)\
                       ,StructField("STATION_NAME", StringType(), True)\
                       ,StructField("VIC_AGE_GROUP", StringType(), True)\
                       ,StructField("VIC_RACE", StringType(), True)\
                       ,StructField("VIC_SEX", StringType(), True)\
                       ,StructField("DATE", StringType(), True)\
                       ,StructField("DATE_AND_TIME", StringType(), True)])

for i in range(5):
    subway = spark.createDataFrame(borough[i],schema=mySchema)
    subway.coalesce(1).write.mode('overwrite').csv(ROOT_PATH + f"/{bor[i]}_complaints.csv", header = 'True')


#Part 3 - Transformations: Connect Complaints to Stations

In [0]:
spark.conf.set(f'fs.azure.sas.{CONTAINER}.{STOR_ACCT}.blob.core.windows.net', SAS_TOKEN)

read_path = ROOT_PATH + 'BX_complaints.csv'
BX_complaints_spark = spark.read.csv(
    read_path, 
    header=True, 
    mode="DROPMALFORMED", 
    inferSchema = True,
    multiLine = True
)


At this point, each of the csvs we saved only contains complaints from one borough. The Bronx dataset is shown above. It is worth noting that the inferred imported schema has changed the last column to a timestamp, which will come in handy when the rows are further separated out into years. The code above was also altered so that it can also read in the datasets for the other boroughs. The first helper function below filters out rows that are between two dates and the second saves the rows as a new csv.

In [0]:
from pyspark.sql import functions as F

def only_year(old_df, first_date, second_date):
    return old_df\
        .filter((F.col('DATE') > F.lit(first_date)) &\
        (F.col('DATE') < F.lit(second_date)))

def save(df, borough, year):
    df_name = spark.createDataFrame(df)
    df_name.coalesce(1).write.mode('overwrite').csv(ROOT_PATH + f"/Borough_Year_Complaints/{borough}_{year}.csv", header = 'True')

The only_year function was used to separate the datasets of each borough into different years, ranging from 2017 to 2021. Below is an example of how the Manhattan crimes were separated out.

In [0]:
MN_2021 = only_year(MN_complaints_spark, '2020-12-31', '2022-01-01')
MN_2020 = only_year(MN_complaints_spark, '2019-12-31', '2021-01-01')
MN_2019 = only_year(MN_complaints_spark, '2018-12-31', '2020-01-01')
MN_2018 = only_year(MN_complaints_spark, '2017-12-31', '2019-01-01')
MN_2017 = only_year(MN_complaints_spark, '2016-12-31', '2018-01-01')

A dataset regarding the locations of the train stations was imported in as a pyspark dataframe in the cell below. This dataset will help us determine the closest train station to a complaint and what the distance is. It should be noted that the train stations dataset does not include Staten Island train stations.

In [0]:
read_path = ROOT_PATH + 'mta-nypd/stopsNYCgrouped.csv'
stations = spark.read.csv(
    read_path, 
    header=True,
    inferSchema = True,
    mode="DROPMALFORMED", 
    multiLine = True)

In [0]:
stations = stations.withColumnRenamed('Station Name','Station_Name').withColumnRenamed('Station Latitude','Station_Latitude').withColumnRenamed('Station Longitude','Station_Longitude').withColumnRenamed('All Lines','All_Lines')

s = stations.collect()

Two more helper functions below. The distance function was designed to determine the distance between two locations using latitudes and longitudes. Using the distance function, the closest_station function would iterate through all the stations' location and the location of a complaint to determine which station is the closest.

In [0]:
from math import cos, asin, sqrt, pi

def distance(lat1, lon1, lat2, lon2):
    p = pi/180    
    a = 0.5 - cos((lat2-lat1)*p)/2 + cos(lat1*p) * cos(lat2*p) * (1-cos((lon2-lon1)*p))/2    
    return 12742 * asin(sqrt(a))

def closest_station(lat, long, stations):
    min_distance = 10000000.0    
    closest = ''    
    for i in range(465):
        tmp = distance(
                    lat,
                    long,
                    stations[i].Station_Latitude,
                    stations[i].Station_Longitude)
        if tmp < min_distance:
            min_distance = tmp            
            closest = stations[i].Station_Name   
            line = stations[i].All_Lines
    return closest, min_distance, line

The code below changes each pyspark dataframe to a pandas dataframe, loops through each dataset for a borough and adds on a column for the closest station, the distance between the crime and the station, and all the trains that stops at that station. The dataframes are then saved to csv files.  Similar to the code previous, the one below was altered for each borough by changing out the two letter code the borough.

In [0]:
datas = [SI_2021, SI_2020, SI_2019, SI_2018, SI_2017]
year = 2021
for data in datas:
    data = data.drop('STATION_NAME')
    pd_data = data.toPandas()
    pd_data['closest_station'] = pd_data.apply(
                                   lambda row:
                                   closest_station(
                                                    row['Latitude'],
                                                    row['Longitude'],
                                                    s),
                                   axis=1)
    pd_data[['closest_station','station_distance','station_line']] = pd.DataFrame(
    pd_data['closest_station'].tolist(),
    index = pd_data.index)
    save(pd_data, 'Staten_Island', year)
    print(f'{year} for Staten Island data saved')
    year -=1
    
    

2021 for Staten Island data saved
2020 for Staten Island data saved
2019 for Staten Island data saved
2018 for Staten Island data saved
2017 for Staten Island data saved
