In [1]:
# spark session
import os
import pyspark

conf = pyspark.SparkConf()
conf.set('spark.driver.memory','8g')
conf.set('spark.network.timeout', '6000s')
conf.set('spark.executor.heartbeatInterval', '240s')

sc = pyspark.SparkContext(conf=conf)
sc.setLogLevel("ERROR")
spark = pyspark.SQLContext.getOrCreate(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/11 16:10:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
file_path = "data/sampled_data.csv"
df = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv(file_path)

                                                                                

In [3]:
for column in df.columns:
    df = df.withColumnRenamed(column, column.upper())
df.printSchema()

root
 |-- _ID: string (nullable = true)
 |-- CAD_EVNT_ID: integer (nullable = true)
 |-- CREATE_DATE: timestamp (nullable = true)
 |-- INCIDENT_DATE: timestamp (nullable = true)
 |-- INCIDENT_TIME: timestamp (nullable = true)
 |-- NYPD_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- PATRL_BORO_NM: string (nullable = true)
 |-- GEO_CD_X: integer (nullable = true)
 |-- GEO_CD_Y: integer (nullable = true)
 |-- RADIO_CODE: string (nullable = true)
 |-- TYP_DESC: string (nullable = true)
 |-- CIP_JOBS: string (nullable = true)
 |-- ADD_TS: timestamp (nullable = true)
 |-- DISP_TS: timestamp (nullable = true)
 |-- CLOSNG_TS: timestamp (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- INCIDENT_YEAR: integer (nullable = true)
 |-- INCIDENT_MONTH: integer (nullable = true)
 |-- ARRIVD_TS: timestamp (nullable = true)



In [4]:
df.limit(5).toPandas()

Unnamed: 0,_ID,CAD_EVNT_ID,CREATE_DATE,INCIDENT_DATE,INCIDENT_TIME,NYPD_PCT_CD,BORO_NM,PATRL_BORO_NM,GEO_CD_X,GEO_CD_Y,...,CIP_JOBS,ADD_TS,DISP_TS,CLOSNG_TS,LATITUDE,LONGITUDE,LOCATION,INCIDENT_YEAR,INCIDENT_MONTH,ARRIVD_TS
0,66b6112f63f4a3726cf562a2,87454918,2022-07-31,2022-07-31,2024-08-11 16:17:52,83,BROOKLYN,PATROL BORO BKLYN NORTH,1008430,193926,...,Non CIP,2022-07-31 16:17:52,2022-07-31 16:18:29,2022-07-31 16:51:01,40.698934,-73.912799,"{'type': 'Point', 'coordinates': [-73.91279879...",2022,7,NaT
1,66b6113b63f4a3726cf5c934,87422499,2022-07-30,2022-07-30,2024-08-11 08:34:39,77,BROOKLYN,PATROL BORO BKLYN NORTH,1005328,183150,...,Non CIP,2022-07-30 08:34:39,2022-07-30 08:36:02,2022-07-30 09:13:16,40.669364,-73.92402,"{'type': 'Point', 'coordinates': [-73.92401953...",2022,7,NaT
2,66b6112f63f4a3726cf57d26,87446402,2022-07-31,2022-07-31,2024-08-11 07:06:31,105,QUEENS,PATROL BORO QUEENS SOUTH,1053427,182517,...,Non CIP,2022-07-31 07:06:37,2022-07-31 07:57:35,2022-07-31 10:00:36,40.667382,-73.75064,"{'type': 'Point', 'coordinates': [-73.75063953...",2022,7,NaT
3,66b6112463f4a3726cf5545f,87459303,2022-07-31,2022-07-31,2024-08-11 19:48:12,46,BRONX,PATROL BORO BRONX,1010708,247154,...,Non CIP,2022-07-31 19:48:12,2022-08-01 02:07:50,2022-08-01 04:57:24,40.845024,-73.904374,"{'type': 'Point', 'coordinates': [-73.90437388...",2022,7,NaT
4,66b6113b63f4a3726cf5dcf5,87416409,2022-07-30,2022-07-30,2024-08-11 00:31:54,45,BRONX,PATROL BORO BRONX,1029570,246794,...,Non CIP,2022-07-30 00:31:54,2022-07-30 00:31:54,2022-07-30 00:44:36,40.84396,-73.836203,"{'type': 'Point', 'coordinates': [-73.83620321...",2022,7,NaT


In [5]:
DROP_COLUMNS = [
    '_id',
    'cad_evnt_id',
    'patrl_boro_nm',
    'geo_cd_x',
    'geo_cd_y',
    'closing_ts',
    'location',
    'create_date',
    'closng_ts'
]
df = df.drop(*DROP_COLUMNS)

In [6]:
df = df.dropna()

In [7]:
#get the timestamp concattenated
from pyspark.sql.functions import substring, col, concat, lit, to_timestamp, coalesce

df = df.withColumn("time", substring(col("INCIDENT_TIME"), 12, 8))\
    .withColumn("INCIDENT_DATE", substring(col("INCIDENT_DATE"), 1, 10))\
    .withColumn("INCIDENT_DATE", concat(col("INCIDENT_DATE"), lit(" "), col("time")))\
    .withColumn("INCIDENT_DATE", to_timestamp(col("INCIDENT_DATE"), "yyyy-MM-dd HH:mm:ss"))\
    .withColumn("INCIDENT_DATE", coalesce(col("INCIDENT_DATE"), col("ADD_TS")))\
    .drop(col("INCIDENT_TIME"), col("ADD_TS"), col('time'))

In [8]:
df.printSchema()

root
 |-- INCIDENT_DATE: timestamp (nullable = true)
 |-- NYPD_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- RADIO_CODE: string (nullable = true)
 |-- TYP_DESC: string (nullable = true)
 |-- CIP_JOBS: string (nullable = true)
 |-- DISP_TS: timestamp (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- INCIDENT_YEAR: integer (nullable = true)
 |-- INCIDENT_MONTH: integer (nullable = true)
 |-- ARRIVD_TS: timestamp (nullable = true)



In [9]:
# get the typ_desc to a tokenizer to make boolean flag columns
from pyspark.sql.functions import explode, upper, regexp_replace
from pyspark.ml.feature import Tokenizer

df = df.withColumn("cleaned_text", upper(regexp_replace(col("TYP_DESC"), "[^a-zA-Z0-9\\s]", " ")))

# Replace multiple spaces with a single space
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "\\s+", " "))

In [10]:
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
df_tokenized = tokenizer.transform(df)

# Explode the words array into individual rows
df_exploded = df_tokenized.withColumn("word", explode(col("words")))

# Group by word and count
df_word_count = df_exploded.groupBy("word").count().orderBy(col("count").desc())
df_word_count.show(100)

                                                                                

+-----------+-----+
|       word|count|
+-----------+-----+
|    transit| 3660|
|     patrol| 3491|
|     inside| 3364|
|      other| 3225|
| visibility| 2742|
|         by| 2454|
|     bureau| 2405|
|  personnel| 2405|
| inspection| 2405|
|   directed| 1904|
|    outside| 1822|
|   possible| 1679|
|    station| 1656|
|      crime| 1649|
|investigate| 1645|
|complainant| 1348|
|        see| 1348|
|   progress| 1096|
|         in| 1096|
|      order| 1024|
|      train|  977|
|maintenance|  975|
|      sweep|  975|
|    vehicle|  964|
|     family|  905|
|       susp|  820|
|  ambulance|  793|
|       case|  793|
|        non|  753|
|     crimes|  733|
|    dispute|  719|
|     mobile|  706|
|        run|  706|
|     person|  667|
|      visit|  636|
|       past|  589|
|       home|  489|
|    larceny|  483|
|    serious|  475|
|   accident|  417|
| harassment|  414|
|        edp|  405|
|   burglary|  382|
|        for|  381|
|      calls|  381|
|       help|  381|
| disorderly|  372|


In [11]:
from pyspark.sql.functions import when
wordsToFlag = [
    "transit",
    "crime",
    "knife",
    "chase",
    "prowler",
    "train",
    "larceny",
    "burglary",
    "harassment",
    "traffic",
    "assault",
    "fire"
]

for word in wordsToFlag:
    df = df.withColumn(f"TYP_DESC_HAS_{word.upper()}",
                                       when(col("cleaned_text")\
                                            .contains(word.upper()), 1)\
                                       .otherwise(0))

In [12]:
df = df.drop("TYP_DESC", "cleaned_text")
df.printSchema()

root
 |-- INCIDENT_DATE: timestamp (nullable = true)
 |-- NYPD_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- RADIO_CODE: string (nullable = true)
 |-- CIP_JOBS: string (nullable = true)
 |-- DISP_TS: timestamp (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- INCIDENT_YEAR: integer (nullable = true)
 |-- INCIDENT_MONTH: integer (nullable = true)
 |-- ARRIVD_TS: timestamp (nullable = true)
 |-- TYP_DESC_HAS_TRANSIT: integer (nullable = false)
 |-- TYP_DESC_HAS_CRIME: integer (nullable = false)
 |-- TYP_DESC_HAS_KNIFE: integer (nullable = false)
 |-- TYP_DESC_HAS_CHASE: integer (nullable = false)
 |-- TYP_DESC_HAS_PROWLER: integer (nullable = false)
 |-- TYP_DESC_HAS_TRAIN: integer (nullable = false)
 |-- TYP_DESC_HAS_LARCENY: integer (nullable = false)
 |-- TYP_DESC_HAS_BURGLARY: integer (nullable = false)
 |-- TYP_DESC_HAS_HARASSMENT: integer (nullable = false)
 |-- TYP_DESC_HAS_TRAFFIC: integer (null

In [13]:
#get hour weekday
from pyspark.sql.functions import hour, weekday
df = df\
    .withColumn("HOUR", hour(col("INCIDENT_DATE")))\
    .withColumn("WEEKDAY", weekday(col("INCIDENT_DATE")))\
    .drop(col("INCIDENT_DATE"))

In [14]:
#get the time difference
from pyspark.sql.functions import unix_timestamp  #unix timestamp in seconds. convert to minutes
df = df\
    .withColumn("TIME_TO_ARRIVE_FROM_DISPATCH",
                (unix_timestamp(col("ARRIVD_TS")) - unix_timestamp(col("DISP_TS"))) / 60.0)

# Get the minimum and maximum values of the column
min_val = df.agg({"TIME_TO_ARRIVE_FROM_DISPATCH": "min"}).collect()[0][0]
max_val = df.agg({"TIME_TO_ARRIVE_FROM_DISPATCH": "max"}).collect()[0][0]

print("Minimum dispatch to arrival time: " , min_val)
print("Maximum dispatch to arrival time: " , max_val)

df = df\
    .filter(col("TIME_TO_ARRIVE_FROM_DISPATCH") >= 1)\
    .drop(col("ARRIVD_TS"), col("DISP_TS"))



Minimum dispatch to arrival time:  0.0
Maximum dispatch to arrival time:  1567.9666666666667


                                                                                

In [15]:
df.printSchema()

root
 |-- NYPD_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- RADIO_CODE: string (nullable = true)
 |-- CIP_JOBS: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- INCIDENT_YEAR: integer (nullable = true)
 |-- INCIDENT_MONTH: integer (nullable = true)
 |-- TYP_DESC_HAS_TRANSIT: integer (nullable = false)
 |-- TYP_DESC_HAS_CRIME: integer (nullable = false)
 |-- TYP_DESC_HAS_KNIFE: integer (nullable = false)
 |-- TYP_DESC_HAS_CHASE: integer (nullable = false)
 |-- TYP_DESC_HAS_PROWLER: integer (nullable = false)
 |-- TYP_DESC_HAS_TRAIN: integer (nullable = false)
 |-- TYP_DESC_HAS_LARCENY: integer (nullable = false)
 |-- TYP_DESC_HAS_BURGLARY: integer (nullable = false)
 |-- TYP_DESC_HAS_HARASSMENT: integer (nullable = false)
 |-- TYP_DESC_HAS_TRAFFIC: integer (nullable = false)
 |-- TYP_DESC_HAS_ASSAULT: integer (nullable = false)
 |-- TYP_DESC_HAS_FIRE: integer (nullable = false)
 |-- HOUR: inte

In [16]:
import geopandas as gpd
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import udf, broadcast
from shapely.wkt import loads as load_wkt, dumps as dump_wkt
from shapely.geometry import Point
# Load the GeoJSON file with NYC neighborhood boundaries
neighborhoods_gdf = gpd.read_file('data/nyc_neighborhoods.geojson')

# Convert Polygon objects to WKT strings
neighborhoods_gdf['geometry'] = neighborhoods_gdf['geometry'].apply(lambda x: dump_wkt(x))

# Convert GeoDataFrame to a list for broadcasting
neighborhoods_list = neighborhoods_gdf[['neighborhood', 'geometry']].values.tolist()

# Broadcast the neighborhoods list for efficient join
neighborhoods_broadcast = sc.broadcast(neighborhoods_list)

# Define a UDF to check if a point is within a neighborhood polygon
def point_in_neighborhood(lat, lon, neighborhoods_list):
    point = Point(lon, lat)
    for neighborhood, polygon_wkt in neighborhoods_list:
        polygon = load_wkt(polygon_wkt)
        if polygon.contains(point):
            return neighborhood
    return None

# Register the UDF
point_in_neighborhood_udf = udf(
    lambda lat, lon: point_in_neighborhood(lat, lon, neighborhoods_broadcast.value),
    StringType()
)

# Add the neighborhood information
df = df.withColumn("NEIGHBORHOOD",
                   point_in_neighborhood_udf(col("LATITUDE"), col("LONGITUDE")))\
    .drop(col("LATITUDE"), col("LONGITUDE"))\
    .dropna() #to ensure we only include neighborhoods that arent NA


  neighborhoods_gdf['geometry'] = neighborhoods_gdf['geometry'].apply(lambda x: dump_wkt(x))


In [17]:
df_pandas = df.toPandas()

                                                                                

In [18]:
df.printSchema()

root
 |-- NYPD_PCT_CD: integer (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- RADIO_CODE: string (nullable = true)
 |-- CIP_JOBS: string (nullable = true)
 |-- INCIDENT_YEAR: integer (nullable = true)
 |-- INCIDENT_MONTH: integer (nullable = true)
 |-- TYP_DESC_HAS_TRANSIT: integer (nullable = false)
 |-- TYP_DESC_HAS_CRIME: integer (nullable = false)
 |-- TYP_DESC_HAS_KNIFE: integer (nullable = false)
 |-- TYP_DESC_HAS_CHASE: integer (nullable = false)
 |-- TYP_DESC_HAS_PROWLER: integer (nullable = false)
 |-- TYP_DESC_HAS_TRAIN: integer (nullable = false)
 |-- TYP_DESC_HAS_LARCENY: integer (nullable = false)
 |-- TYP_DESC_HAS_BURGLARY: integer (nullable = false)
 |-- TYP_DESC_HAS_HARASSMENT: integer (nullable = false)
 |-- TYP_DESC_HAS_TRAFFIC: integer (nullable = false)
 |-- TYP_DESC_HAS_ASSAULT: integer (nullable = false)
 |-- TYP_DESC_HAS_FIRE: integer (nullable = false)
 |-- HOUR: integer (nullable = true)
 |-- WEEKDAY: integer (nullable = true)
 |-- TIME_TO_ARRIVE

In [19]:
df_pandas.to_csv('data/cleaned_data.csv', index=False)