<a href="https://colab.research.google.com/github/shubhamgundawarNYU/Big-Data-Project-Group-16/blob/main/misc-datasets-notebooks/NYPD_Complaint_Data_Current_(Year_To_Date).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##**BIG DATA PROJECT**

### NYPD Complaint Data Current (Year To Date)
Link to Dataset (https://data.cityofnewyork.us/Public-Safety/NYPD-Complaint-Data-Current-Year-To-Date-/5uac-w243)

### DATA CLEANING AT SCALE

#### Mounting Google Drive to Google Colab Notebook to Load the Data Set

Make sure you have the dataset in your Google Drive and you mount your drive to the Colab.

The file should be at the following path: `gdrive/My Drive/NYPD_Complaint_Data_Current_Year_To_Date.csv`


In [None]:
from google.colab import drive 
drive.mount('/content/gdrive')

Mounted at /content/gdrive


#### Importing required and Necessary Libraries for cleaning the data present in the data set

In [None]:
import numpy as np
import pandas as pd
import io

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=f8c4a49109dd1c28932f75cf435999f4e5a167acde54645954488746ba506b4e
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


#### Installing and importing openclean python library for data profiling and statistical inference

In [None]:
pip install openclean humanfriendly

Collecting openclean
  Downloading openclean-0.2.1-py3-none-any.whl (5.2 kB)
Collecting humanfriendly
  Downloading humanfriendly-10.0-py2.py3-none-any.whl (86 kB)
[K     |████████████████████████████████| 86 kB 3.3 MB/s 
[?25hCollecting openclean-core==0.4.1
  Downloading openclean_core-0.4.1-py3-none-any.whl (267 kB)
[K     |████████████████████████████████| 267 kB 18.2 MB/s 
Collecting refdata>=0.2.0
  Downloading refdata-0.2.0-py3-none-any.whl (37 kB)
Collecting histore>=0.4.0
  Downloading histore-0.4.1-py3-none-any.whl (109 kB)
[K     |████████████████████████████████| 109 kB 72.0 MB/s 
[?25hCollecting flowserv-core>=0.8.0
  Downloading flowserv_core-0.9.2-py3-none-any.whl (260 kB)
[K     |████████████████████████████████| 260 kB 34.5 MB/s 
Collecting jsonschema>=3.2.0
  Downloading jsonschema-4.2.1-py3-none-any.whl (69 kB)
[K     |████████████████████████████████| 69 kB 8.2 MB/s 
Collecting jellyfish
  Downloading jellyfish-0.8.9.tar.gz (137 kB)
[K     |█████████████████


# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

Run a local spark session to test your installation:

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()

#### Reading the Data Set CSV File using `spark.read.csv()` Function

In [None]:
df = spark.read.csv("/content/gdrive/MyDrive/NYPD_Complaint_Data_Current_Year_To_Date.csv", inferSchema=True, header =True)

### In order to find similar datasets to 'NYPD Complaint Data Historic', we use openclean python library's easy integration with Socrata NYC Open Data engine and find out all the datasets that have similar column structure as our original dataset.

In [None]:
import openclean
from openclean.data.source.socrata import Socrata

for dataset in Socrata().catalog(domain='data.cityofnewyork.us'):
    if 'complaint' in dataset.name.lower() or 'NYPD' in dataset.name or 'Crime' in dataset.name.lower():
        print(f'{dataset.identifier}\t{dataset.domain}\t{dataset.name}')

qgea-i56i	data.cityofnewyork.us	NYPD Complaint Data Historic
uip8-fykc	data.cityofnewyork.us	NYPD Arrest Data (Year to Date)
eabe-havv	data.cityofnewyork.us	DOB Complaints Received
5uac-w243	data.cityofnewyork.us	NYPD Complaint Data Current (Year To Date)
8h9b-rp9u	data.cityofnewyork.us	NYPD Arrests Data (Historic)
833y-fsy8	data.cityofnewyork.us	NYPD Shooting Incident Data (Historic)
5ucz-vwe8	data.cityofnewyork.us	NYPD Shooting Incident Data (Year To Date)
uwyv-629c	data.cityofnewyork.us	Housing Maintenance Code Complaints
a2nx-4u46	data.cityofnewyork.us	Complaint Problems
bqiq-cu78	data.cityofnewyork.us	NYPD Hate Crimes
sv2w-rv3k	data.cityofnewyork.us	NYPD Criminal Court Summons (Historic)
nre2-6m2s	data.cityofnewyork.us	Consumer Services Mediated Complaints
mv4k-y93f	data.cityofnewyork.us	NYPD Criminal Court Summons Incident Level Data (Year To Date)
6v9u-ndjg	data.cityofnewyork.us	Building Complaint Disposition Codes
9jgj-bmct	data.cityofnewyork.us	DOHMH Indoor Environmental Compl

#### From the above list we select 10 datasets that have maximum overlapping columns with 'NYPD Complaint Data Historic'

In [None]:
df.count()

323817

#### Get Data Type for each column present in the Data Set




In [None]:
df.printSchema()

root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- HOUSING_PSA: integer (nullable = true)
 |-- JURISDICTION_CODE: integer (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- PATROL_BORO: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- STATION_NAME: string (nullable = true)
 |-- SUSP_

#### Outputing the List of Columns in the Data Set

In [None]:
df.columns

['CMPLNT_NUM',
 'ADDR_PCT_CD',
 'BORO_NM',
 'CMPLNT_FR_DT',
 'CMPLNT_FR_TM',
 'CMPLNT_TO_DT',
 'CMPLNT_TO_TM',
 'CRM_ATPT_CPTD_CD',
 'HADEVELOPT',
 'HOUSING_PSA',
 'JURISDICTION_CODE',
 'JURIS_DESC',
 'KY_CD',
 'LAW_CAT_CD',
 'LOC_OF_OCCUR_DESC',
 'OFNS_DESC',
 'PARKS_NM',
 'PATROL_BORO',
 'PD_CD',
 'PD_DESC',
 'PREM_TYP_DESC',
 'RPT_DT',
 'STATION_NAME',
 'SUSP_AGE_GROUP',
 'SUSP_RACE',
 'SUSP_SEX',
 'TRANSIT_DISTRICT',
 'VIC_AGE_GROUP',
 'VIC_RACE',
 'VIC_SEX',
 'X_COORD_CD',
 'Y_COORD_CD',
 'Latitude',
 'Longitude',
 'Lat_Lon',
 'New Georeferenced Column']

#### Get top 10 rows of the complaints dataframe

In [None]:
df.show(n=10)

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+-------------------+-----+----------+-----------------+--------------------+--------+-----------+-----+-------+-------------+----------+------------+--------------+--------------+--------+----------------+-------------+--------------+-------+----------+----------+------------------+------------------+--------------------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|         JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|     SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|      VIC_RACE|VIC_SEX|X_COORD_CD|Y_COORD_CD|          Latitude|         Longitude|             Lat_Lon|New Georeferenced Column|
+----------+------

## We see that the columns `X_COORD_CD`,`Y_COORD_CD`,`Latitude`, `Longitude` and `Lat_Lon` conveys the same data as `New Georeferenced Column`.

#### Hence, we drop those columns and keep only `New Georeferenced Column` column in our cleaned dataset.

In [None]:
df = df.drop('X_COORD_CD','Y_COORD_CD','Latitude','Longitude','Lat_Lon')

In [None]:
df.columns

['CMPLNT_NUM',
 'ADDR_PCT_CD',
 'BORO_NM',
 'CMPLNT_FR_DT',
 'CMPLNT_FR_TM',
 'CMPLNT_TO_DT',
 'CMPLNT_TO_TM',
 'CRM_ATPT_CPTD_CD',
 'HADEVELOPT',
 'HOUSING_PSA',
 'JURISDICTION_CODE',
 'JURIS_DESC',
 'KY_CD',
 'LAW_CAT_CD',
 'LOC_OF_OCCUR_DESC',
 'OFNS_DESC',
 'PARKS_NM',
 'PATROL_BORO',
 'PD_CD',
 'PD_DESC',
 'PREM_TYP_DESC',
 'RPT_DT',
 'STATION_NAME',
 'SUSP_AGE_GROUP',
 'SUSP_RACE',
 'SUSP_SEX',
 'TRANSIT_DISTRICT',
 'VIC_AGE_GROUP',
 'VIC_RACE',
 'VIC_SEX',
 'New Georeferenced Column']

#### Removing all the **duplicate** entries

In [None]:
df = df.drop_duplicates()

In [None]:
df.count()

323817

In [None]:
df.distinct().count()

323817

In [None]:
# Download the full 'NYPD Complaint Data Current Year To Date' dataset.
# Note that the downloaded full dataset file is about 380 MB in size! Use the
# alternative data file with 10,000 rows that is included in the repository if
# you do not want to download the full data file.

import gzip
import humanfriendly
import os

dataset = Socrata().dataset('5uac-w243')

# By default, this example uses a small sample of the full dataset that
# is included in the 'data' subfolder within this repository.
#datafile = './data/5uac-w243.tsv.gz'

# Remove the comment for this line if you want to use the full dataset.
datafile = './5uac-w243.tsv.gz'


# Download file only if it does not exist already.
if not os.path.isfile(datafile):
    with gzip.open(datafile, 'wb') as f:
        print('Downloading ...\n')
        dataset.write(f)


fsize = humanfriendly.format_size(os.stat(datafile).st_size)
print("Using '{}' in file {} of size {}".format(dataset.name, datafile, fsize))

Downloading ...

Using 'NYPD Complaint Data Current (Year To Date)' in file ./5uac-w243.tsv.gz of size 24.41 MB


In [None]:
# Due to the size of the full dataset file, we make use of openclean's
# stream operator to avoid having to load the dataset into main-memory.

from openclean.pipeline import stream

ds = stream(datafile)

In [None]:
# Profile the resulting dataset view using the default data profiler.

from openclean.profiling.column import DefaultColumnProfiler

profiles = ds.profile(default_profiler=DefaultColumnProfiler)

In [None]:
# Print overview of profiling results.

profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
CMPLNT_NUM,323817,0,323817,1.0,18.304819
ADDR_PCT_CD,323817,0,77,0.000238,6.156808
BORO_NM,323817,771,5,1.5e-05,2.159873
CMPLNT_FR_DT,323817,0,1686,0.005207,8.282486
CMPLNT_FR_TM,323817,0,1440,0.004447,8.59011
CMPLNT_TO_DT,323817,29197,1238,0.004202,8.198287
CMPLNT_TO_TM,323817,29084,1440,0.004886,9.234543
CRM_ATPT_CPTD_CD,323817,161,2,6e-06,0.109424
HADEVELOPT,323817,322640,24,0.020391,4.005595
HOUSING_PSA,323817,299580,353,0.014565,7.570633


#### **Checking** if the complaint number is unique or not

In [None]:
df.select('CMPLNT_NUM').distinct().count()

323817

#### As we can see `CMPLNT_NUM` should have been unique, but it is not.
#### Let's see what are the duplicate values.

In [None]:
df1 = df.groupBy('CMPLNT_NUM').count().filter("count > 1")
df1.drop('count').count()

0

In [None]:
df1.sort('CMPLNT_NUM').show(n = 10)

+----------+-----+
|CMPLNT_NUM|count|
+----------+-----+
+----------+-----+



#### Check for complaint number `854455675`

In [None]:
df.filter('CMPLNT_NUM = 854455675').show()

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------------+-----+----------+-----------------+--------------------+--------+-----------+-----+-------+-------------+----------+------------+--------------+---------+--------+----------------+-------------+--------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|      JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|      VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------------+-----+----------+-----------------+--

#### We understand, that complaint number is unique. 

#### Thus, the `CMPLNT_NUM` column can be used to uniquely identify a record in the complaints dataset.

## Find Count of Null, None, NaN of All DataFrame Columns

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+------+------------+--------------+---------+--------+----------------+-------------+--------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+---

#### Get top 5 rows where complaint from Date is NaN

In [None]:
df.where(col('CMPLNT_FR_DT').isNull()).show(n=5)

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+------+------------+--------------+---------+--------+----------------+-------------+--------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+---

### Get rows where either complaint from date or complaint from time is null

In [None]:
df.filter(df.CMPLNT_FR_DT.isNull() | df.CMPLNT_FR_TM.isNull()).show(5)

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+------+------------+--------------+---------+--------+----------------+-------------+--------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+---

1. There maybe null values in the CMPLNT_TO_DT, CMPLNT_TO_TM, LOC_OF_OCCUR_DESC, PREM_TYP_DESC, PARKS_NM and HADEVELOPT columns. 

2. LOC_OF_OCCUR_DESC, PREM_TYP_DESC, PARKS_NM and HADEVELOPT need not be present for all the fields so cannot be used to eliminate records.

3. However, complaint from date and complaint from time columns cannot have null values. We drop the rows where either complaint from date or complaint from time is null.

In [None]:
df.drop(df.CMPLNT_FR_DT.isNull() | df.CMPLNT_FR_TM.isNull())

DataFrame[CMPLNT_NUM: int, ADDR_PCT_CD: int, BORO_NM: string, CMPLNT_FR_DT: string, CMPLNT_FR_TM: string, CMPLNT_TO_DT: string, CMPLNT_TO_TM: string, CRM_ATPT_CPTD_CD: string, HADEVELOPT: string, HOUSING_PSA: int, JURISDICTION_CODE: int, JURIS_DESC: string, KY_CD: int, LAW_CAT_CD: string, LOC_OF_OCCUR_DESC: string, OFNS_DESC: string, PARKS_NM: string, PATROL_BORO: string, PD_CD: int, PD_DESC: string, PREM_TYP_DESC: string, RPT_DT: string, STATION_NAME: string, SUSP_AGE_GROUP: string, SUSP_RACE: string, SUSP_SEX: string, TRANSIT_DISTRICT: int, VIC_AGE_GROUP: string, VIC_RACE: string, VIC_SEX: string, New Georeferenced Column: string]

In [None]:
df = df.filter(df.CMPLNT_FR_DT.isNotNull() | df.CMPLNT_FR_TM.isNotNull())

In [None]:
df.show(100)

+----------+-----------+-------------+------------+------------+------------+------------+----------------+-------------+-----------+-----------------+-------------------+-----+-----------+-----------------+--------------------+----------------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+--------------------+--------+----------------+-------------+--------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|      BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|   HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|         JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|        PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|           SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|            VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------------+------------+-----------

Check if minimum and maximum values of date and time are valid or not. The value of time cannot be 24:00:00. 

In [None]:
## Minimum value of the column in pyspark
df.agg({'CMPLNT_FR_TM': 'min'}).show()

+-----------------+
|min(CMPLNT_FR_TM)|
+-----------------+
|         00:00:00|
+-----------------+



In [None]:
## Maximum value of the column in pyspark
df.agg({'CMPLNT_FR_TM': 'max'}).show()

+-----------------+
|max(CMPLNT_FR_TM)|
+-----------------+
|         23:59:00|
+-----------------+



In [None]:
df.agg({'CMPLNT_FR_DT': 'min'}).show()

+-----------------+
|min(CMPLNT_FR_DT)|
+-----------------+
|       01/01/1955|
+-----------------+



In [None]:
df.agg({'CMPLNT_FR_DT': 'max'}).show()

+-----------------+
|max(CMPLNT_FR_DT)|
+-----------------+
|       12/31/2020|
+-----------------+



**Some basic data quality checks are as below:**
1. Check if there are no garbage values in the location of occurence description column. The valid values that this column should ideally contain are: 'FRONT OF', 'REAR OF', 'OUTSIDE', 'INSIDE', 'OPPOSITE OF'
2. Check if there are no garbage values in law category column. The valid values are: 'FELONY', 'VIOLATION', 'MISDEMEANOR'
3. Check if there are no misspellings in Borough Name. There should be 5 distinct boroughs: Manhattan, Bronx, Queens, Brooklyn, Staten Island. We implement unique method, in case of misspellings multiple values of the same borough would be returned.
4. Check if 'CRM_ATPT_CPTD_CD' column has no garbage value. The only acceptable values are Completed or Attempted.
5. Ideally, key code should contain only 3 digits. Implementing a check below to see if there are any invalid values for the key code. 

In [None]:
df.select('LOC_OF_OCCUR_DESC').distinct().show()

+-----------------+
|LOC_OF_OCCUR_DESC|
+-----------------+
|      OPPOSITE OF|
|          REAR OF|
|             null|
|           INSIDE|
|          OUTSIDE|
|         FRONT OF|
+-----------------+



In [None]:
df.select('LAW_CAT_CD').distinct().show()

+-----------+
| LAW_CAT_CD|
+-----------+
|     FELONY|
|MISDEMEANOR|
|  VIOLATION|
+-----------+



### Checks for Borough Name

In [None]:
df.select('BORO_NM').distinct().show()

+-------------+
|      BORO_NM|
+-------------+
|         null|
|       QUEENS|
|     BROOKLYN|
|        BRONX|
|    MANHATTAN|
|STATEN ISLAND|
+-------------+



We can see there are no misspellings for the Borough names and thus no need for additional data correction for the same.

In [None]:
df.where(col('BORO_NM').isNull()).show()

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+-------------------+-----+-----------+-----------------+--------------------+--------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+---------+--------+----------------+-------------+--------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|         JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|            VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+------

#### Dropping Rows where Borough Name is NULL

In [None]:
df = df.filter(df.BORO_NM.isNotNull())

In [None]:
df.count()

323046

In [None]:
df.filter(df.BORO_NM.isNull()).show()

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+------+------------+--------------+---------+--------+----------------+-------------+--------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+---

#### Check if all Rows of the column 'CRM_ATPT_CPTD_CD' have the expected values. Either Completed or Atempted. From the statistics above, there are only 7 rows that have null value for this column. Since it constitutes of very less percentage of the entire dataset, we decided to drop the rows that have null values for this column.

In [None]:
df.select('CRM_ATPT_CPTD_CD').distinct().show()

+----------------+
|CRM_ATPT_CPTD_CD|
+----------------+
|       ATTEMPTED|
|       COMPLETED|
|            null|
+----------------+



In [None]:
# similar to pandas' value_counts
df.groupBy('CRM_ATPT_CPTD_CD').count().orderBy('count', ascending=False).show()

+----------------+------+
|CRM_ATPT_CPTD_CD| count|
+----------------+------+
|       COMPLETED|318245|
|       ATTEMPTED|  4698|
|            null|   103|
+----------------+------+



In [None]:
df.filter(df.CRM_ATPT_CPTD_CD.isNull()).show()

+----------+-----------+---------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------------+-----+-----------+-----------------+-------------+--------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+---------+--------+----------------+-------------+--------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|  BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|      JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|    OFNS_DESC|PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|            VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+---------+------------+------------+------------+------------+----------------+----------+-----------+--------

In [None]:
df = df.filter(df.CRM_ATPT_CPTD_CD.isNotNull())

In [None]:
df.count()

322943

#### We can see that all the key codes are valid 3-digit numbers

In [None]:
df.filter((df.KY_CD < 100) | (df.KY_CD > 999)).count()

0

## Defining checks for outliers in age group

In [None]:
df.select('SUSP_AGE_GROUP').distinct().show()

+--------------+
|SUSP_AGE_GROUP|
+--------------+
|           940|
|           938|
|          1032|
|           <18|
|         25-44|
|          null|
|          -946|
|          -971|
|       UNKNOWN|
|           -69|
|           65+|
|         18-24|
|          2021|
|          -955|
|         45-64|
|          -941|
|          -966|
|          -969|
|          1017|
|          -973|
+--------------+
only showing top 20 rows



#### There are many invalid age groups like negative values, unrealistically high age groups, etc.

#### Lets find all the invalid age groups and replace them with `NaN`

In [None]:
valid_age_groups = ['<18','18-24','25-44','45-64','65+',np.NaN]
df = df.withColumn('SUSP_AGE_GROUP', when(df.SUSP_AGE_GROUP.isin(valid_age_groups), df.SUSP_AGE_GROUP).otherwise(np.NaN))
df.show()

+----------+-----------+---------+------------+------------+------------+------------+----------------+-------------+-----------+-----------------+-------------------+-----+-----------+-----------------+--------------------+--------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+--------------------+--------+----------------+-------------+--------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|  BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|   HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|         JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|           SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|            VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+---------+------------+------------+------------+------------+

In [None]:
df.select('SUSP_AGE_GROUP').distinct().show()

+--------------+
|SUSP_AGE_GROUP|
+--------------+
|           <18|
|         25-44|
|           65+|
|           NaN|
|         18-24|
|         45-64|
+--------------+



In [None]:
df.show(n=5)

+----------+-----------+---------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------------+-----+-----------+-----------------+--------------------+--------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+---------+--------+----------------+-------------+--------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|  BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|      JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|      VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+---------+------------+------------+------------+------------+----------------+----------+-----------+------

In [None]:
df.select('VIC_AGE_GROUP').distinct().show()

+-------------+
|VIC_AGE_GROUP|
+-------------+
|           -1|
|         -960|
|         -921|
|          <18|
|        25-44|
|          -61|
|          936|
|      UNKNOWN|
|          -48|
|          65+|
|         -945|
|        18-24|
|           -3|
|        45-64|
|          970|
|         -935|
|          -51|
|          963|
|          945|
+-------------+



#### There are many invalid age groups like negative values, unrealistically high age groups, etc.

#### Lets find all the invalid age groups and replace them with `NaN`

In [None]:
valid_age_groups = ['<18','18-24','25-44','45-64','65+',np.NaN]
df = df.withColumn('VIC_AGE_GROUP', when(df.VIC_AGE_GROUP.isin(valid_age_groups), df.VIC_AGE_GROUP).otherwise(np.NaN))
df.show()

+----------+-----------+---------+------------+------------+------------+------------+----------------+-------------+-----------+-----------------+-------------------+-----+-----------+-----------------+--------------------+--------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+--------------------+--------+----------------+-------------+--------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|  BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|   HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|         JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|           SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|            VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+---------+------------+------------+------------+------------+

In [None]:
df.select('VIC_AGE_GROUP').distinct().show()

+-------------+
|VIC_AGE_GROUP|
+-------------+
|          <18|
|        25-44|
|          65+|
|          NaN|
|        18-24|
|        45-64|
+-------------+



In [None]:
df.show(n=5)

+----------+-----------+---------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------------+-----+-----------+-----------------+--------------------+--------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+---------+--------+----------------+-------------+--------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|  BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|      JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|      VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+---------+------------+------------+------------+------------+----------------+----------+-----------+------

### Check for Race Values 

In [None]:
df.select('SUSP_RACE').distinct().show()

+--------------------+
|           SUSP_RACE|
+--------------------+
|               WHITE|
|               BLACK|
|AMERICAN INDIAN/A...|
|                null|
|      BLACK HISPANIC|
|      WHITE HISPANIC|
|             UNKNOWN|
|ASIAN / PACIFIC I...|
+--------------------+



#### Replace all `UNKNOWN` values with `NaN`

In [None]:
from pyspark.sql.functions import regexp_replace

df = df.withColumn("SUSP_RACE",
  regexp_replace("SUSP_RACE", "UNKNOWN", "NaN"))

In [None]:
df.show(100)

+----------+-----------+-------------+------------+------------+------------+------------+----------------+-------------+-----------+-----------------+-------------------+-----+-----------+-----------------+--------------------+----------------+--------------------+-----+--------------------+--------------------+----------+------------+--------------+--------------------+--------+----------------+-------------+--------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|      BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|   HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|         JURIS_DESC|KY_CD| LAW_CAT_CD|LOC_OF_OCCUR_DESC|           OFNS_DESC|        PARKS_NM|         PATROL_BORO|PD_CD|             PD_DESC|       PREM_TYP_DESC|    RPT_DT|STATION_NAME|SUSP_AGE_GROUP|           SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|            VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------------+------------+-----------

In [None]:
df.select('SUSP_RACE').distinct().show()

+--------------------+
|           SUSP_RACE|
+--------------------+
|               WHITE|
|               BLACK|
|AMERICAN INDIAN/A...|
|                null|
|      BLACK HISPANIC|
|      WHITE HISPANIC|
|                 NaN|
|ASIAN / PACIFIC I...|
+--------------------+



In [None]:
df.select('VIC_RACE').distinct().show()

+--------------------+
|            VIC_RACE|
+--------------------+
|               WHITE|
|               BLACK|
|AMERICAN INDIAN/A...|
|                null|
|      BLACK HISPANIC|
|      WHITE HISPANIC|
|             UNKNOWN|
|ASIAN / PACIFIC I...|
+--------------------+



In [None]:
from pyspark.sql.functions import regexp_replace

df = df.withColumn("VIC_RACE",
  regexp_replace("VIC_RACE", "UNKNOWN", "NaN"))

In [None]:
df.select('VIC_RACE').distinct().show()

+--------------------+
|            VIC_RACE|
+--------------------+
|               WHITE|
|               BLACK|
|AMERICAN INDIAN/A...|
|                null|
|      BLACK HISPANIC|
|      WHITE HISPANIC|
|                 NaN|
|ASIAN / PACIFIC I...|
+--------------------+



### Checks for Suspect & Victim Sex

In [None]:
df.select('SUSP_SEX').distinct().show()

+--------+
|SUSP_SEX|
+--------+
|       F|
|    null|
|       M|
|       U|
+--------+



#### Checking values in suspect sex

In [None]:
df.groupBy('SUSP_SEX').count().orderBy('count', ascending=False).show()

+--------+------+
|SUSP_SEX| count|
+--------+------+
|       M|152451|
|       U| 67077|
|    null| 62146|
|       F| 41269|
+--------+------+



In [None]:
df.select('VIC_SEX').distinct().show()

+-------+
|VIC_SEX|
+-------+
|      F|
|      E|
|      M|
|      D|
+-------+



In [None]:
df.groupBy('VIC_SEX').count().orderBy('count', ascending=False).show()

+-------+------+
|VIC_SEX| count|
+-------+------+
|      F|129594|
|      M|113275|
|      D| 51207|
|      E| 28867|
+-------+------+



#### Get unique values of offense description in sorted order

In [None]:
df.select('OFNS_DESC').distinct().orderBy('OFNS_DESC', ascending=True).show()

+--------------------+
|           OFNS_DESC|
+--------------------+
|                null|
| ADMINISTRATIVE CODE|
|AGRICULTURE & MRK...|
|ALCOHOLIC BEVERAG...|
|ANTICIPATORY OFFE...|
|               ARSON|
|ASSAULT 3 & RELAT...|
|     BURGLAR'S TOOLS|
|            BURGLARY|
|CHILD ABANDONMENT...|
|CRIMINAL MISCHIEF...|
|   CRIMINAL TRESPASS|
|     DANGEROUS DRUGS|
|   DANGEROUS WEAPONS|
|  DISORDERLY CONDUCT|
|DISRUPTION OF A R...|
|ENDAN WELFARE INCOMP|
|            ESCAPE 3|
|      FELONY ASSAULT|
|   FELONY SEX CRIMES|
+--------------------+
only showing top 20 rows



#### Getting Total Count of Offense Description

In [None]:
df.select('OFNS_DESC').distinct().count()

61

In [None]:
df.groupBy('OFNS_DESC').count().show()

+--------------------+-----+
|           OFNS_DESC|count|
+--------------------+-----+
|ANTICIPATORY OFFE...|   13|
|   FELONY SEX CRIMES|   14|
|NEW YORK CITY HEA...|    3|
|OTHER OFFENSES RE...|  466|
|VEHICLE AND TRAFF...| 6060|
|KIDNAPPING & RELA...|  100|
|HOMICIDE-NEGLIGEN...|    2|
|OFF. AGNST PUB OR...|12985|
|PETIT LARCENY OF ...|  105|
|OFFENSES AGAINST ...|    1|
|      FELONY ASSAULT|16743|
|ALCOHOLIC BEVERAG...|   52|
|OFFENSES RELATED ...|    4|
|CRIMINAL MISCHIEF...|33575|
|         THEFT-FRAUD| 3003|
|   THEFT OF SERVICES|   99|
|            JOSTLING|    4|
|MISCELLANEOUS PEN...|10748|
|LOITERING/GAMBLIN...|    1|
|               ARSON|  472|
+--------------------+-----+
only showing top 20 rows



**Map Key Codes with Offense Description**

In [None]:
key_off_mapping = df.groupBy('KY_CD').agg(collect_set('OFNS_DESC').alias('OFNS_DESCS')).orderBy('KY_CD')
key_off_mapping.show()

+-----+--------------------+
|KY_CD|          OFNS_DESCS|
+-----+--------------------+
|  102|[HOMICIDE-NEGLIGE...|
|  103|[HOMICIDE-NEGLIGE...|
|  104|              [RAPE]|
|  105|           [ROBBERY]|
|  106|    [FELONY ASSAULT]|
|  107|          [BURGLARY]|
|  109|     [GRAND LARCENY]|
|  110|[GRAND LARCENY OF...|
|  111|[POSSESSION OF ST...|
|  112|       [THEFT-FRAUD]|
|  113|           [FORGERY]|
|  114|             [ARSON]|
|  115|[PROSTITUTION & R...|
|  116|[SEX CRIMES, FELO...|
|  117|   [DANGEROUS DRUGS]|
|  118| [DANGEROUS WEAPONS]|
|  119|[INTOXICATED/IMPA...|
|  120|[ENDAN WELFARE IN...|
|  121|[CRIMINAL MISCHIE...|
|  122|          [GAMBLING]|
+-----+--------------------+
only showing top 20 rows



In [None]:
key_off_mapping.count()

62

In [None]:
df.select('KY_CD').distinct().count()

62

### Functional Dependency Violation for OFNS_DESC & KY_CD

In [None]:
ofns_key = ds.to_df()

In [None]:
from openclean.operator.map.violations import fd_violations
from openclean.operator.collector.count import distinct

fd2_violations = fd_violations(ofns_key, ['KY_CD'], ['OFNS_DESC'])

print('# of violations for FD(KY_CD -> OFNS_DESC) is {}\n'.format(len(fd2_violations)))
for key, gr in fd2_violations.items():
    print(gr[['KY_CD', 'OFNS_DESC']])

# of violations for FD(KY_CD -> OFNS_DESC) is 9

       KY_CD                OFNS_DESC
104      126  MISCELLANEOUS PENAL LAW
161      126  MISCELLANEOUS PENAL LAW
195      126  MISCELLANEOUS PENAL LAW
243      126  MISCELLANEOUS PENAL LAW
244      126  MISCELLANEOUS PENAL LAW
...      ...                      ...
323640   126  MISCELLANEOUS PENAL LAW
323643   126  MISCELLANEOUS PENAL LAW
323665   126  MISCELLANEOUS PENAL LAW
323723   126  MISCELLANEOUS PENAL LAW
323762   126  MISCELLANEOUS PENAL LAW

[10450 rows x 2 columns]
       KY_CD                       OFNS_DESC
119      343  OTHER OFFENSES RELATED TO THEF
629      343  OTHER OFFENSES RELATED TO THEF
1167     343  OTHER OFFENSES RELATED TO THEF
1877     343               THEFT OF SERVICES
2605     343  OTHER OFFENSES RELATED TO THEF
...      ...                             ...
320888   343  OTHER OFFENSES RELATED TO THEF
322746   343  OTHER OFFENSES RELATED TO THEF
323015   343  OTHER OFFENSES RELATED TO THEF
323021   343  OTHER

In [None]:
from openclean.operator.collector.repair import Shortest, Vote, conflict_repair

# Define the conflict resolution strategy. We use a majority vote for RHS attributes.
strategy = {'KY_CD': Vote()}

# resolve the conflicts
resolved = conflict_repair(conflicts=fd1_violations, strategy=strategy, in_order=False)

In [None]:
#resolving for key code -> offense description

from openclean.operator.collector.repair import Shortest, Vote, conflict_repair

# Define the conflict resolution strategy. We use a majority vote for RHS attributes.
strategy = {'OFNS_DESC': Vote(tiebreaker=Shortest())}

# resolve the conflicts
resolved = conflict_repair(conflicts=fd2_violations, strategy=strategy, in_order=False)

In [None]:
trial = fd_violations(resolved, ['KY_CD'], ['OFNS_DESC'])

print('# of violations for FD(KY_CD -> OFNS_DESC)) is {}\n'.format(len(trial)))
print(violation_group)

# of violations for FD(KY_CD -> OFNS_DESC)) is 0

       CMPLNT_NUM  ...                       New Georeferenced Column
13      276278658  ...  POINT (-73.94569741099998 40.786182447000044)
166     890663602  ...   POINT (-73.91843446099993 40.80893116500005)
180     728758084  ...  POINT (-73.91212195899993 40.747301667000045)
181     985806516  ...  POINT (-73.98411999099994 40.754849215000036)
268     842358698  ...   POINT (-73.85902567999993 40.88735603500004)
...           ...  ...                                            ...
323571  172047038  ...   POINT (-73.90410359499998 40.66646466200007)
323678  217581853  ...   POINT (-73.89515414499994 40.66132136900006)
323733  793541563  ...  POINT (-73.80633674899997 40.701804167000034)
323738  306924169  ...   POINT (-73.89690223799995 40.66833311800008)
323806  476968757  ...   POINT (-73.87565666399996 40.75686097700003)

[4058 rows x 36 columns]


In [None]:
violation_group = resolved[(resolved['KY_CD']=='125')]

fd3_violations = fd_violations(resolved, ['KY_CD'], ['OFNS_DESC'])

print('# of violations for FD(KY_CD -> OFNS_DESC) is {}\n'.format(len(fd3_violations)))
print(violation_group)

# of violations for FD(KY_CD -> OFNS_DESC) is 0

       CMPLNT_NUM  ...                       New Georeferenced Column
1307    243028990  ...   POINT (-74.00125319299997 40.71634415200003)
2550    726416926  ...   POINT (-73.77623390699993 40.67998073800004)
3208    533129014  ...   POINT (-73.89776780399995 40.86326000900005)
3579    657165465  ...  POINT (-73.99398802999998 40.716193040000064)
5803    822041811  ...   POINT (-73.92894705999998 40.61544337400005)
...           ...  ...                                            ...
317623  926822345  ...   POINT (-74.00125319299997 40.71634415200003)
319353  486010364  ...   POINT (-73.91354311599997 40.75347568500007)
320550  398377413  ...   POINT (-74.13004566699995 40.63852488600002)
320592  777788721  ...   POINT (-74.00125319299997 40.71634415200003)
322429  999058929  ...   POINT (-73.94184447299993 40.75709455700007)

[307 rows x 36 columns]


In [None]:
resolved.groupby(['OFNS_DESC', 'KY_CD']).size()

OFNS_DESC                       KY_CD
ADMINISTRATIVE CODE             365       528
                                675        34
ALCOHOLIC BEVERAGE CONTROL LAW  346        52
ANTICIPATORY OFFENSES           354        13
ARSON                           114       472
                                         ... 
SEX CRIMES                      116      1117
                                233      4046
THEFT-FRAUD                     112      3003
UNAUTHORIZED USE OF A VEHICLE   353       936
VEHICLE AND TRAFFIC LAWS        348      6060
Length: 63, dtype: int64

In [None]:
keymapping = resolved.groupby('KY_CD').apply(lambda x: x['OFNS_DESC'].unique())
print(keymapping)

KY_CD
101    [MURDER & NON-NEGL. MANSLAUGHTER]
102         [HOMICIDE-NEGLIGENT-VEHICLE]
103     [HOMICIDE-NEGLIGENT,UNCLASSIFIE]
104                               [RAPE]
105                            [ROBBERY]
                     ...                
572                 [DISORDERLY CONDUCT]
578                      [HARRASSMENT 2]
675                [ADMINISTRATIVE CODE]
677    [NYS LAWS-UNCLASSIFIED VIOLATION]
678            [MISCELLANEOUS PENAL LAW]
Length: 63, dtype: object


#### Each key code represents a particular offense description. There is a one to one mapping. So we would use key code for future analysis instead of offense description.

#### Calculating the null values present in the data columnwise (with respect to the features)

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+------+------------+--------------+---------+--------+----------------+-------------+--------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|HADEVELOPT|HOUSING_PSA|JURISDICTION_CODE|JURIS_DESC|KY_CD|LAW_CAT_CD|LOC_OF_OCCUR_DESC|OFNS_DESC|PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|PREM_TYP_DESC|RPT_DT|STATION_NAME|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+------------+------------+------------+------------+----------------+----------+-----------+-----------------+----------+-----+----------+-----------------+---------+--------+-----------+-----+-------+-------------+---

In [None]:
amount_missing_df = df.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df.columns])
amount_missing_df.show()

+----------+-----------+-------+------------+------------+-------------------+-------------------+----------------+------------------+------------------+-----------------+----------+-----+----------+-------------------+--------------------+-----------------+-----------+-----+-------+--------------------+------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+-------+------------------------+
|CMPLNT_NUM|ADDR_PCT_CD|BORO_NM|CMPLNT_FR_DT|CMPLNT_FR_TM|       CMPLNT_TO_DT|       CMPLNT_TO_TM|CRM_ATPT_CPTD_CD|        HADEVELOPT|       HOUSING_PSA|JURISDICTION_CODE|JURIS_DESC|KY_CD|LAW_CAT_CD|  LOC_OF_OCCUR_DESC|           OFNS_DESC|         PARKS_NM|PATROL_BORO|PD_CD|PD_DESC|       PREM_TYP_DESC|RPT_DT|      STATION_NAME|   SUSP_AGE_GROUP|          SUSP_RACE|          SUSP_SEX|  TRANSIT_DISTRICT|     VIC_AGE_GROUP|          VIC_RACE|VIC_SEX|New Georeferenced Column|
+----------+-----------+-------+----------

#### Thus, we can see that the percentage of null values per variable has gone considerably down after cleaning. Some variables like 'PARKS_NM', 'HADEVELOPT' and such can have null values as established above. 

JURISDICTION wise count

In [None]:
df.groupBy('JURIS_DESC').count().show()

+--------------------+------+
|          JURIS_DESC| count|
+--------------------+------+
|    N.Y. POLICE DEPT|290317|
|      PORT AUTHORITY|  1089|
| N.Y. TRANSIT POLICE|  6295|
|  HEALTH & HOSP CORP|   148|
|    U.S. PARK POLICE|     8|
|   N.Y. STATE POLICE|    35|
| N.Y. HOUSING POLICE| 23926|
| TRI-BORO BRDG TUNNL|    72|
|         METRO NORTH|    13|
|    N.Y. STATE PARKS|     6|
|NEW YORK CITY SHE...|    11|
| DEPT OF CORRECTIONS|   205|
|NYS DEPT TAX AND ...|     2|
|               OTHER|   653|
|  LONG ISLAND RAILRD|     9|
|             AMTRACK|     5|
|STATN IS RAPID TRANS|     5|
|           NYC PARKS|   144|
+--------------------+------+



### Number of columns in Clean Data

In [None]:
len(df.columns)

31

### Number of rows in Clean Data

In [None]:
df.count()

322943

In [None]:
df.printSchema()

root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- HOUSING_PSA: integer (nullable = true)
 |-- JURISDICTION_CODE: integer (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- PATROL_BORO: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- STATION_NAME: string (nullable = true)
 |-- SUSP_

### **Exporting Clean Data in CSV**

The Cleaned Data Set will be saved as `NYPD_Complaint_Data_Historic_Cleaned.csv`

In [None]:
pd_df = df.toPandas()
pd_df.to_csv("NYPD_Complaint_Data_Current_Year_To_Date_Cleaned_Spark.csv")