## Uploading the data to Hadoop

In [4]:
!pip install pandas requests hdfs



In [40]:
import requests
import pandas as pd
from hdfs import InsecureClient
from io import StringIO
import time

# HDFS client configuration
hdfs_client = InsecureClient('http://localhost:9870', user='khaleed_mammad')
hdfs_path = '/input/nypd_data/'

# API endpoint
api_url = "https://data.cityofnewyork.us/resource/qgea-i56i.csv"

# Parameters for pagination
limit = 50000  # Maximum rows per request
offset = 0
rows_to_fetch = 8900000 // 8  
rows_fetched = 0


max_retries = 5
retry_delay = 5

chunk_size = 100000  # Rows per chunk

print("Starting data fetch and upload process...\n")

# Fetch and upload the dataset in chunks
while rows_fetched < rows_to_fetch:
    params = {"$limit": limit, "$offset": offset}
    retries = 0

    while retries < max_retries:
        try:
            print(f"Fetching data: Offset={offset}, Limit={limit} (Attempt {retries + 1})...")
            response = requests.get(api_url, params=params, timeout=30)

            if response.status_code != 200:
                print(f"Error {response.status_code}: {response.text}")
                retries += 1
                time.sleep(retry_delay)
                continue

            # Read the chunk into a DataFrame
            chunk = pd.read_csv(StringIO(response.text))
            if chunk.empty:
                print("No more data to fetch. Exiting.")
                break

            rows_fetched += len(chunk)
            offset += limit

            # Upload the chunk to HDFS
            chunk_path = f'/input/nypd_data/NYPD_Complaint_Data_Chunk_{offset//limit}.csv'
            with StringIO() as csv_buffer:
                chunk.to_csv(csv_buffer, index=False)
                hdfs_client.write(chunk_path, csv_buffer.getvalue(), overwrite=True)
            print(f"Uploaded chunk to HDFS: {chunk_path} (Total rows fetched: {rows_fetched})\n")
            break

        except requests.exceptions.Timeout:
            print("Request timed out. Retrying...")
            retries += 1
            time.sleep(retry_delay)

        except Exception as e:
            print(f"An error occurred: {e}")
            retries += 1
            time.sleep(retry_delay)

    if retries == max_retries:
        print(f"Max retries reached for Offset={offset}. Exiting.")
        break

print(f"Process complete. Total rows fetched: {rows_fetched}")

Starting data fetch and upload process...

Fetching data: Offset=0, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_1.csv (Total rows fetched: 50000)

Fetching data: Offset=50000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_2.csv (Total rows fetched: 100000)

Fetching data: Offset=100000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_3.csv (Total rows fetched: 150000)

Fetching data: Offset=150000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_4.csv (Total rows fetched: 200000)

Fetching data: Offset=200000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_5.csv (Total rows fetched: 250000)

Fetching data: Offset=250000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_6.csv (Total rows fetched: 300000)

Fetching data: Offset=300000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_7.csv (Total rows fetched: 350000)

Fetching data: Offset=350000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_8.csv (Total rows fetched: 400000)

Fetching data: Offset=400000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_9.csv (Total rows fetched: 450000)

Fetching data: Offset=450000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_10.csv (Total rows fetched: 500000)

Fetching data: Offset=500000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_11.csv (Total rows fetched: 550000)

Fetching data: Offset=550000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_12.csv (Total rows fetched: 600000)

Fetching data: Offset=600000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_13.csv (Total rows fetched: 650000)

Fetching data: Offset=650000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_14.csv (Total rows fetched: 700000)

Fetching data: Offset=700000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_15.csv (Total rows fetched: 750000)

Fetching data: Offset=750000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_16.csv (Total rows fetched: 800000)

Fetching data: Offset=800000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_17.csv (Total rows fetched: 850000)

Fetching data: Offset=850000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_18.csv (Total rows fetched: 900000)

Fetching data: Offset=900000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_19.csv (Total rows fetched: 950000)

Fetching data: Offset=950000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_20.csv (Total rows fetched: 1000000)

Fetching data: Offset=1000000, Limit=50000 (Attempt 1)...
Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_21.csv (Total rows fetched: 1050000)

Fetching data: Offset=1050000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_22.csv (Total rows fetched: 1100000)

Fetching data: Offset=1100000, Limit=50000 (Attempt 1)...


  chunk = pd.read_csv(StringIO(response.text))


Uploaded chunk to HDFS: /input/nypd_data/NYPD_Complaint_Data_Chunk_23.csv (Total rows fetched: 1150000)

Process complete. Total rows fetched: 1150000


## Reading data and analyzing

In [1]:
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np

import warnings
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import to_date 
from pyspark.sql.functions import year, month 
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *
import os
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

In [3]:
import os
import pandas as pd

# Define the directory containing the CSV files
directory = '/Users/khaleed_mammad/Desktop/Khaleed/ADA_Semesters/Fall_2024/Big_Data/Project'

# Output file path
output_file = os.path.join(directory, 'NYPD_Complaint_Data_Combined.csv')

# Get a list of all CSV files in the directory
csv_files = [file for file in os.listdir(directory) if file.endswith('.csv')]

# Check if there are CSV files
if not csv_files:
    print("No CSV files found in the directory.")
else:
    print(f"Found {len(csv_files)} CSV files. Merging now...")

    # Initialize an empty list to hold DataFrames
    dataframes = []

    # Loop through each CSV file and append its DataFrame to the list
    for file in csv_files:
        file_path = os.path.join(directory, file)
        print(f"Reading file: {file}")
        try:
            df = pd.read_csv(file_path)
            dataframes.append(df)
        except Exception as e:
            print(f"Error reading {file}: {e}")

    # Concatenate all DataFrames into one
    combined_df = pd.concat(dataframes, ignore_index=True)

    # Save the combined DataFrame to a new CSV file
    combined_df.to_csv(output_file, index=False)
    print(f"All files merged successfully into: {output_file}")

Found 23 CSV files. Merging now...
Reading file: NYPD_Complaint_Data_Chunk_10.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_11.csv
Reading file: NYPD_Complaint_Data_Chunk_13.csv
Reading file: NYPD_Complaint_Data_Chunk_12.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_16.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_17.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_15.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_14.csv
Reading file: NYPD_Complaint_Data_Chunk_5.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_4.csv
Reading file: NYPD_Complaint_Data_Chunk_6.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_7.csv
Reading file: NYPD_Complaint_Data_Chunk_3.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_2.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_1.csv
Reading file: NYPD_Complaint_Data_Chunk_9.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_8.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_19.csv
Reading file: NYPD_Complaint_Data_Chunk_18.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_23.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_22.csv


  df = pd.read_csv(file_path)


Reading file: NYPD_Complaint_Data_Chunk_20.csv
Reading file: NYPD_Complaint_Data_Chunk_21.csv
All files merged successfully into: /Users/khaleed_mammad/Desktop/Khaleed/ADA_Semesters/Fall_2024/Big_Data/Project/NYPD_Complaint_Data_Combined.csv


In [30]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("NYPD Data Analysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Path to the local combined CSV file
local_csv_path = output_file

# Read the CSV file into a DataFrame
df = spark.read.option("header", "true").option("inferSchema", "true").csv(local_csv_path)

                                                                                

In [31]:
# Show the first 5 rows
df.show(5)

+----------+-------------------+-------------------+-------------------+------------+-----------+-------------------+-----+--------------------+-----+--------------------+----------------+-----------+-------------+-----------------+--------------------+-------------------+-----------------+--------+----------+-----------+----------+----------+--------------+--------------+--------+----------------+---------+----------+--------------------+--------------------+------------+-------------+--------+-------+
|cmplnt_num|       cmplnt_fr_dt|       cmplnt_fr_tm|       cmplnt_to_dt|cmplnt_to_tm|addr_pct_cd|             rpt_dt|ky_cd|           ofns_desc|pd_cd|             pd_desc|crm_atpt_cptd_cd| law_cat_cd|      boro_nm|loc_of_occur_desc|       prem_typ_desc|         juris_desc|jurisdiction_code|parks_nm|hadevelopt|housing_psa|x_coord_cd|y_coord_cd|susp_age_group|     susp_race|susp_sex|transit_district| latitude| longitude|             lat_lon|         patrol_boro|station_name|vic_age_group|

In [33]:
df.first()

Row(cmplnt_num='265088253', cmplnt_fr_dt=datetime.datetime(2023, 2, 9, 0, 0), cmplnt_fr_tm=datetime.datetime(2024, 11, 22, 15, 0), cmplnt_to_dt=datetime.datetime(2023, 3, 9, 0, 0), cmplnt_to_tm='15:00:00', addr_pct_cd=26.0, rpt_dt=datetime.datetime(2023, 3, 14, 0, 0), ky_cd=109, ofns_desc='GRAND LARCENY', pd_cd=439.0, pd_desc='LARCENY,GRAND FROM OPEN AREAS, UNATTENDED', crm_atpt_cptd_cd='COMPLETED', law_cat_cd='FELONY', boro_nm='MANHATTAN', loc_of_occur_desc='FRONT OF', prem_typ_desc='MAILBOX INSIDE', juris_desc='N.Y. POLICE DEPT', jurisdiction_code=0, parks_nm='(null)', hadevelopt='(null)', housing_psa='(null)', x_coord_cd=995804.0, y_coord_cd=235548.0, susp_age_group='UNKNOWN', susp_race='UNKNOWN', susp_sex='U', transit_district=None, latitude=40.813196, longitude=-73.958257, lat_lon='(40.813196, -73.958257)', patrol_boro='PATROL BORO MAN NORTH', station_name='(null)', vic_age_group='65+', vic_race='WHITE', vic_sex='M')

In [34]:
# Print the schema
df.printSchema()

root
 |-- cmplnt_num: string (nullable = true)
 |-- cmplnt_fr_dt: timestamp (nullable = true)
 |-- cmplnt_fr_tm: timestamp (nullable = true)
 |-- cmplnt_to_dt: timestamp (nullable = true)
 |-- cmplnt_to_tm: string (nullable = true)
 |-- addr_pct_cd: double (nullable = true)
 |-- rpt_dt: timestamp (nullable = true)
 |-- ky_cd: integer (nullable = true)
 |-- ofns_desc: string (nullable = true)
 |-- pd_cd: double (nullable = true)
 |-- pd_desc: string (nullable = true)
 |-- crm_atpt_cptd_cd: string (nullable = true)
 |-- law_cat_cd: string (nullable = true)
 |-- boro_nm: string (nullable = true)
 |-- loc_of_occur_desc: string (nullable = true)
 |-- prem_typ_desc: string (nullable = true)
 |-- juris_desc: string (nullable = true)
 |-- jurisdiction_code: integer (nullable = true)
 |-- parks_nm: string (nullable = true)
 |-- hadevelopt: string (nullable = true)
 |-- housing_psa: string (nullable = true)
 |-- x_coord_cd: double (nullable = true)
 |-- y_coord_cd: double (nullable = true)
 |-- 

In [35]:
df.describe()

DataFrame[summary: string, cmplnt_num: string, cmplnt_to_tm: string, addr_pct_cd: string, ky_cd: string, ofns_desc: string, pd_cd: string, pd_desc: string, crm_atpt_cptd_cd: string, law_cat_cd: string, boro_nm: string, loc_of_occur_desc: string, prem_typ_desc: string, juris_desc: string, jurisdiction_code: string, parks_nm: string, hadevelopt: string, housing_psa: string, x_coord_cd: string, y_coord_cd: string, susp_age_group: string, susp_race: string, susp_sex: string, transit_district: string, latitude: string, longitude: string, lat_lon: string, patrol_boro: string, station_name: string, vic_age_group: string, vic_race: string, vic_sex: string]

In [36]:
df.columns

['cmplnt_num',
 'cmplnt_fr_dt',
 'cmplnt_fr_tm',
 'cmplnt_to_dt',
 'cmplnt_to_tm',
 'addr_pct_cd',
 'rpt_dt',
 'ky_cd',
 'ofns_desc',
 'pd_cd',
 'pd_desc',
 'crm_atpt_cptd_cd',
 'law_cat_cd',
 'boro_nm',
 'loc_of_occur_desc',
 'prem_typ_desc',
 'juris_desc',
 'jurisdiction_code',
 'parks_nm',
 'hadevelopt',
 'housing_psa',
 'x_coord_cd',
 'y_coord_cd',
 'susp_age_group',
 'susp_race',
 'susp_sex',
 'transit_district',
 'latitude',
 'longitude',
 'lat_lon',
 'patrol_boro',
 'station_name',
 'vic_age_group',
 'vic_race',
 'vic_sex']

## Data Cleaning & Preprocessing

In [39]:
df.select("cmplnt_fr_dt").show(5)

+-------------------+
|       cmplnt_fr_dt|
+-------------------+
|2023-02-09 00:00:00|
|2023-03-14 00:00:00|
|2023-03-14 00:00:00|
|2023-03-14 00:00:00|
|2023-03-14 00:00:00|
+-------------------+
only showing top 5 rows



In [40]:
# Firstly, we create columns to store 'Year, month, hour and day of the week' of start date of complaint
df = df.withColumn('Start_Date', to_timestamp('cmplnt_fr_dt', 'MM/dd/yyyy'))
df = df.withColumn('Date', to_date('Start_Date'))
df = df.withColumn('Hour', hour(df['cmplnt_fr_tm']))
df = df.withColumn('YEAR', year('Date')) 
df = df.withColumn('MONTH', month('Date'))
df = df.withColumn("DAY_of_WEEK", date_format("Date", "E"))

In [47]:
df.groupBy('vic_age_group').count().show()

[Stage 28:>                                                         (0 + 8) / 8]

+-------------+------+
|vic_age_group| count|
+-------------+------+
|          <18| 43353|
|        25-44|409647|
|      UNKNOWN|326170|
|         1018|     2|
|          -49|     1|
|          949|     1|
|          65+| 58291|
|         -968|     1|
|        18-24| 95680|
|          -33|     1|
|           -3|     2|
|        45-64|216793|
|           -2|     3|
|          -30|     3|
|           -6|     2|
|          -57|     1|
|          950|     1|
|         -945|     1|
|         -963|     1|
|          -27|     1|
+-------------+------+
only showing top 20 rows





In [None]:
# We can categorize age groups --> Teenager, Young Adult, Middle Age, Mid Old
df1 = df1.withColumn('VIC_AGE_Cat',
    when(df1.VIC_AGE_GROUP == '<18', 'Teenager')
    .when(df1.VIC_AGE_GROUP == '18-25', 'Young Adult')
    .when(df1.VIC_AGE_GROUP == '25-44', 'Middle Age')
    .when(df1.VIC_AGE_GROUP == '44-64', 'Mid Old')
    .otherwise('Senior'))

In [48]:
from pyspark.sql.functions import when

# We can categorize age groups --> Teenager, Young Adult, Middle Age, Mid Old and Senior

# As UNKNOWN occurs 326170 times, we replace it with mode instead of instead of deleting (but we delete invalid ages)

mode_value = (
    df.filter(df["vic_age_group"] != "UNKNOWN")
    .groupBy("vic_age_group")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())
    .first()["vic_age_group"]  # Get the value of the most frequent category
)

df = df.withColumn(
    "vic_age_group",
    when(df["vic_age_group"] == "UNKNOWN", mode_value)  # Replace UNKNOWN with mode
    .when(df["vic_age_group"].isin("<18", "18-24", "25-44", "45-64", "65+"), df["vic_age_group"])  # Keep valid categories
    .otherwise(None)  # Mark invalid values as NULL for removal
)

df = df.filter(df["vic_age_group"].isNotNull())

# Now, we can categorize
df = df.withColumn(
    "vic_age_category",
    when(df["vic_age_group"] == "<18", "Teenager")
    .when(df["vic_age_group"] == "18-24", "Young Adult")
    .when(df["vic_age_group"] == "25-44", "Middle Age")
    .when(df["vic_age_group"] == "45-64", "Mid Old")
    .when(df["vic_age_group"] == "65+", "Senior")
)

                                                                                

In [49]:
df.groupBy('vic_age_group').count().show()

[Stage 34:>                                                         (0 + 8) / 8]

+-------------+------+
|vic_age_group| count|
+-------------+------+
|          <18| 43353|
|        25-44|735817|
|          65+| 58291|
|        18-24| 95680|
|        45-64|216793|
+-------------+------+





In [50]:
df.groupBy('vic_age_category').count().show()

[Stage 37:>                                                         (0 + 8) / 8]

+----------------+------+
|vic_age_category| count|
+----------------+------+
|          Senior| 58291|
|        Teenager| 43353|
|      Middle Age|735817|
|     Young Adult| 95680|
|         Mid Old|216793|
+----------------+------+



                                                                                

In [73]:
# We need to drop some irrelevant and redundant columns (some are too specific and some of them have too many null values)
dataset = df.drop('cmplnt_num','cmplnt_fr_dt', 'cmplnt_fr_tm', 'cmplnt_to_dt', 'cmplnt_to_tm','pd_cd','rpt_dt',
                 'pd_desc','loc_of_occur_desc', 'prem_typ_desc', 'juris_desc', 'jurisdiction_code', 'parks_nm',
                  'hadevelopt', 'housing_psa','x_coord_cd','y_coord_cd','susp_age_group','susp_race', 
                  'susp_sex', 'transit_district','patrol_boro','station_name','vic_age_group','vic_race', 'Start_Date')

In [74]:
dataset.show(1)

+-----------+-----+-------------+----------------+----------+---------+---------+----------+--------------------+-------+----------+----+----+-----+-----------+----------------+
|addr_pct_cd|ky_cd|    ofns_desc|crm_atpt_cptd_cd|law_cat_cd|  boro_nm| latitude| longitude|             lat_lon|vic_sex|      Date|Hour|YEAR|MONTH|DAY_of_WEEK|vic_age_category|
+-----------+-----+-------------+----------------+----------+---------+---------+----------+--------------------+-------+----------+----+----+-----+-----------+----------------+
|       26.0|  109|GRAND LARCENY|       COMPLETED|    FELONY|MANHATTAN|40.813196|-73.958257|(40.813196, -73.9...|      M|2023-02-09|  15|2023|    2|        Thu|          Senior|
+-----------+-----+-------------+----------------+----------+---------+---------+----------+--------------------+-------+----------+----+----+-----+-----------+----------------+
only showing top 1 row



In [75]:
#Renaming the columns for better readabiliy

dataframe = dataset.toDF('Neighborhood', 'Offence_Code', 'Offence_Type',
        'Status','Offence_Level','Borough','Latitude','Longitude','Lat_Lon', 'Victim_Sex', 'Date', 'Hour', 'Year', 'Month', 'Day_of_Week','Victim_Agegroup')



In [76]:
# We also handle with null values

mean_long = dataframe.agg({'Longitude': 'mean'}).collect()[0][0] 
dataframe = dataframe.fillna(mean_long, subset=['Longitude'])

mean_lat = dataframe.agg({'Latitude': 'mean'}).collect()[0][0] 
dataframe = dataframe.fillna(mean_lat, subset=['Latitude'])

dataframe = dataframe.fillna(-1, subset=['Borough','Neighborhood','Offence_Code','Offence_Type','Status','Offence_level','Victim_Sex'])
dataframe = dataframe.na.drop('any')

                                                                                

In [77]:
print(f"Previous dataset: {df.count()} rows \nNew dataset: {dataframe.count()} rows")

[Stage 98:>                                                         (0 + 8) / 8]

Previous dataset: 1149934 rows 
New dataset: 1149919 rows




In [90]:
# We save the dataset
dataframe.write.csv("cleaned_nypd_data.csv", header=True, mode='overwrite')

                                                                                

## EDA

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("NYPD Data Analysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

24/11/24 17:38:15 WARN Utils: Your hostname, Khaleeds-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.31.31 instead (on interface en0)
24/11/24 17:38:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/24 17:38:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
cleaned_data = spark.read.csv("cleaned_nypd_data.csv", header=True, inferSchema=True)

                                                                                

In [4]:
spark.sql("create database nypddatabase");

In [6]:
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

cleaned_data.write.mode("overwrite").saveAsTable("nypddatabase.df")

                                                                                

In [8]:
# Execute the SQL query
result_df = spark.sql("""
    SELECT Borough, COUNT(*) AS Count
    FROM nypddatabase.df
    GROUP BY Borough
    ORDER BY Count DESC
    LIMIT 5
""")

# Display the result
result_df.show()

+-------------+------+
|      Borough| Count|
+-------------+------+
|     BROOKLYN|319820|
|    MANHATTAN|280934|
|       QUEENS|251879|
|        BRONX|244984|
|STATEN ISLAND| 50043|
+-------------+------+



In [11]:
result_df = spark.sql(("""
    SELECT Offence_Level, COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Offence_Level
    """))
                      
result_df.show()

+-------------+------+
|Offence_Level| Count|
+-------------+------+
|       FELONY|388294|
|  MISDEMEANOR|581826|
|    VIOLATION|179799|
+-------------+------+



In [12]:
result_df = spark.sql(("""
    SELECT Offence_Level,Victim_Sex,  COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Offence_Level,Victim_Sex
    """))
                      
result_df.show()

+-------------+----------+------+
|Offence_Level|Victim_Sex| Count|
+-------------+----------+------+
|  MISDEMEANOR|         D|138826|
|  MISDEMEANOR|         M|180814|
|       FELONY|         M|154413|
|       FELONY|         L|  4248|
|       FELONY|         D| 55350|
|    VIOLATION|         D|  1650|
|       FELONY|         F|133428|
|  MISDEMEANOR|         L|  1613|
|    VIOLATION|         F|112429|
|       FELONY|         E| 40854|
|    VIOLATION|         L|   197|
|  MISDEMEANOR|         F|195483|
|  MISDEMEANOR|         E| 65090|
|    VIOLATION|         E|  2061|
|    VIOLATION|         M| 63462|
|       FELONY|         U|     1|
+-------------+----------+------+



In [26]:
# Seems like we have some errors in sex column

In [28]:
result_df = spark.sql(("""
    SELECT Victim_Sex,  COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Victim_Sex
    ORDER BY Count
    """))
                      
result_df.show()

+----------+------+
|Victim_Sex| Count|
+----------+------+
|         U|     1|
|         L|  6058|
|         E|108005|
|         D|195826|
|         M|398689|
|         F|441340|
+----------+------+



In [29]:
# Dropping U, L, E , D would cause a problem as they contain big protion of our data. Therefore, 
# we decide to keep it since our purpose is not studying crime patterns by gender

In [16]:
result_df = spark.sql(("""
    SELECT Offence_Level,Year,  COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Offence_Level,Year
    """))
                      
result_df.show()

+-------------+----+------+
|Offence_Level|Year| Count|
+-------------+----+------+
|    VIOLATION|2021| 10648|
|       FELONY|2009|    33|
|       FELONY|2022|177146|
|       FELONY|1989|     3|
|       FELONY|1988|     6|
|  MISDEMEANOR|2021| 33976|
|       FELONY|1966|     1|
|       FELONY|1022|     3|
|  MISDEMEANOR|2019|   226|
|       FELONY|2021| 27312|
|    VIOLATION|1013|     1|
|       FELONY|2007|    34|
|  MISDEMEANOR|2022|268722|
|       FELONY|2012|    85|
|       FELONY|1995|     6|
|  MISDEMEANOR|2013|    76|
|  MISDEMEANOR|1982|     1|
|  MISDEMEANOR|1973|     6|
|    VIOLATION|2016|     6|
|  MISDEMEANOR|1998|     4|
+-------------+----+------+
only showing top 20 rows



In [14]:
result_df = spark.sql(("""
    SELECT Offence_Type, COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Offence_Type
    ORDER BY Count DESC
    LIMIT 10
    """))
                      
result_df.show()

+--------------------+------+
|        Offence_Type| Count|
+--------------------+------+
|       PETIT LARCENY|238813|
|       HARRASSMENT 2|176697|
|ASSAULT 3 & RELAT...|119694|
|       GRAND LARCENY|110416|
|CRIMINAL MISCHIEF...| 92441|
|      FELONY ASSAULT| 56920|
|OFF. AGNST PUB OR...| 38688|
|VEHICLE AND TRAFF...| 37391|
|             ROBBERY| 36442|
|MISCELLANEOUS PEN...| 35142|
+--------------------+------+



In [17]:
result_df = spark.sql(("""
    SELECT Victim_Agegroup, COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Victim_Agegroup
    """))
                      
result_df.show()

+---------------+------+
|Victim_Agegroup| Count|
+---------------+------+
|         Senior| 58289|
|       Teenager| 43349|
|     Middle Age|735812|
|    Young Adult| 95679|
|        Mid Old|216790|
+---------------+------+



In [30]:
result_df = spark.sql(("""
    SELECT Year, COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Year
    ORDER BY Year
    """))
                      
result_df.show()

+----+-----+
|Year|Count|
+----+-----+
|1011|    3|
|1012|    6|
|1013|    2|
|1018|    1|
|1021|    2|
|1022|   11|
|1023|   23|
|1900|    1|
|1921|    2|
|1922|    7|
|1923|    5|
|1949|    1|
|1955|    1|
|1961|    1|
|1964|    1|
|1966|    1|
|1967|    1|
|1969|    1|
|1970|    1|
|1971|    2|
+----+-----+
only showing top 20 rows



In [31]:
# Seems like we have errors in year column


In [32]:
result_df = spark.sql(("""
    SELECT Year, COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Year
    ORDER BY Year DESC
    """))
                      
result_df.show()

+----+------+
|Year| Count|
+----+------+
|2023|543278|
|2022|529812|
|2021| 71936|
|2020|  1547|
|2019|   686|
|2018|   439|
|2017|   304|
|2016|   243|
|2015|   215|
|2014|   186|
|2013|   189|
|2012|   140|
|2011|   132|
|2010|    68|
|2009|    47|
|2008|    41|
|2007|    52|
|2006|    41|
|2005|    28|
|2004|    28|
+----+------+
only showing top 20 rows



In [34]:
# Since most of data smaples belong to the years "2023", "2022", "2021" and "2020", we only keep those.
cleaned_data = cleaned_data.filter(cleaned_data["Year"].isin(2023, 2022, 2021, 2020))

In [21]:
result_df = spark.sql(("""
    SELECT Month, COUNT(*) AS Count 
    FROM nypddatabase.df 
    GROUP BY Month
    ORDER BY Month DESC
    """))
                      
result_df.show()

+-----+------+
|Month| Count|
+-----+------+
|   12|125448|
|   11|113254|
|   10| 96635|
|    9| 91761|
|    8| 95835|
|    7| 97012|
|    6| 95485|
|    5| 95251|
|    4| 87400|
|    3| 89348|
|    2| 77779|
|    1| 84711|
+-----+------+



In [73]:
cleaned_data.write.csv("ready_data.csv", header=True, mode='overwrite')

                                                                                

# Machine Learning

In [74]:
final_data = spark.read.csv("ready_data.csv", header=True, inferSchema=True)

                                                                                

In [77]:
final_data.count()

1146573

In [64]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, ChiSqSelector
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics

In [35]:
str_featues = []
num_features = []

for feature in cleaned_data.dtypes:
    
    # print unique value and data type for each feature
    feature_name = str(feature[0])
    feature_type = str(feature[1])
    uni = cleaned_data.select(feature_name).distinct().count()
    print("unique number of {} ({}):".format(feature_name, feature_type),uni)
    
    # find out features with integer type
    if feature_type == "int" or feature_type == "decimal(65,0)":
        
        num_features.append(feature_name)
        
    # find out string feature and it's unique value less than 70
    elif feature_type == "string" and uni < 70:
        
        str_featues.append(feature_name)

                                                                                

unique number of Neighborhood (double): 78


                                                                                

unique number of Offence_Code (int): 69


                                                                                

unique number of Offence_Type (string): 66




unique number of Status (string): 2


                                                                                

unique number of Offence_Level (string): 3


[Stage 175:>                                                        (0 + 8) / 8]                                                                                

unique number of Borough (string): 6


                                                                                

unique number of Latitude (double): 96433


                                                                                

unique number of Longitude (double): 97259


                                                                                

unique number of Lat_Lon (string): 102112


                                                                                

unique number of Victim_Sex (string): 5


                                                                                

unique number of Date (date): 1442




unique number of Hour (int): 24


                                                                                

unique number of Year (int): 4


                                                                                

unique number of Month (int): 12


                                                                                

unique number of Day_of_Week (string): 7
unique number of Victim_Agegroup (string): 5




In [25]:
print("Total {} numerical features:{}".format(len(num_features),num_features)) 
print("Total {} string features:{}".format(len(str_featues),str_featues)) 

Total 4 numerical features:['Offence_Code', 'Hour', 'Year', 'Month']
Total 7 string features:['Offence_Type', 'Status', 'Offence_Level', 'Borough', 'Victim_Sex', 'Day_of_Week', 'Victim_Agegroup']


In [38]:
# We convert categorical string features into numerical indices:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in str_featues]

In [39]:
# We need to prepare a feature vector for machine learning in PySpark
inputs = ["{}_index".format(x) for x in str_featues] + num_features
feature_vector = VectorAssembler(inputCols= list(set(inputs) - set('Offence_Level_index')), outputCol="features")

In [43]:
# Then, we create a PySpark ML Pipeline to preprocess the data and apply transformations systematically
pipeline = Pipeline(stages=indexers + [feature_vector])
data = pipeline.fit(cleaned_data).transform(cleaned_data)

                                                                                

In [41]:
# We can now choose input data 
input_data = data.select(["features","Offence_Level_index"])
input_data = input_data.withColumnRenamed("Offence_Level_index","label")

In [46]:
# And, we divide our data into 0.7 train, 0.3 test
train_data, test_data = input_data.randomSplit([.7,.3])

In [47]:
# Initialize evaluators for different metrics
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
precision_evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='weightedPrecision')
recall_evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='weightedRecall')
f1_evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')

## Logistic Regression

In [54]:
lg_model = LogisticRegression(labelCol='label', maxIter=10, elasticNetParam=1, regParam=0.5,threshold=0.3)
lg_model = lg_model.fit(train_data)
lg_pred = lg_model.transform(test_data)
acc = accuracy_evaluator.evaluate(lg_pred)
prec = precision_evaluator.evaluate(lg_pred)
recall = recall_evaluator.evaluate(lg_pred)
f1 = recall_evaluator.evaluate(lg_pred)

                                                                                

In [55]:
acc

0.5068581449908248

In [56]:
recall

0.5068581449908248

In [57]:
prec

0.25690517914354

In [58]:
f1

0.5068581449908248

In [59]:
rf_model = RandomForestClassifier(labelCol='label',maxBins=75,numTrees=3,maxDepth=3)
rf_model = rf_model.fit(train_data)
rf_pred = rf_model.transform(test_data)
acc = accuracy_evaluator.evaluate(rf_pred)
prec = precision_evaluator.evaluate(rf_pred)
recall = recall_evaluator.evaluate(rf_pred)
f1 = recall_evaluator.evaluate(rf_pred)

                                                                                

In [70]:
print(f"Accuracy: {acc}")
print(f"Precision: {prec}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

Accuracy: 0.9976945947829319
Precision: 0.9977049637297467
Recall: 0.9976945947829319
F1 Score: 0.9976945947829319


In [72]:
dt_model = DecisionTreeClassifier(labelCol='label', maxDepth=5, maxBins=70)
dt_model = dt_model.fit(train_data)
dt_pred = dt_model.transform(test_data)
acc = accuracy_evaluator.evaluate(dt_pred)
prec = precision_evaluator.evaluate(dt_pred)
recall = recall_evaluator.evaluate(dt_pred)
f1 = recall_evaluator.evaluate(dt_pred)
print(f"Accuracy: {acc}")
print(f"Precision: {prec}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

[Stage 359:>                                                        (0 + 8) / 8]

Accuracy: 1.0
Precision: 1.0
Recall: 1.0
F1 Score: 1.0


