# NYPD-Complaint-Data to DB

<span style="color: red;">**WARNING:** Only execute this notebook if your machine has sufficient memory (RAM >= 24 GB preferred)</span>

### Loading raw data

* [NYPD Complaint Data Historic](https://data.cityofnewyork.us/Public-Safety/NYPD-Complaint-Data-Historic/qgea-i56i/about_data) is downloaded as a `.csv` locally and stored in `data` directory.

In [1]:
import sys
import os

sys.path.append(f"..{os.path.sep}")

data_dir = 'data'
fname = 'NYPD_Complaint_Data_Historic_20241027.csv'
fpath = os.path.join(sys.path[-1], data_dir, fname)

assert os.path.exists(fpath), f'{os.path.abspath(fpath)} does not exists!'

In [2]:
import polars as pl

complaint_lf = pl.scan_csv(fpath, try_parse_dates=True)
complaint_lf.collect_schema()

Schema([('CMPLNT_NUM', String),
        ('CMPLNT_FR_DT', String),
        ('CMPLNT_FR_TM', String),
        ('CMPLNT_TO_DT', String),
        ('CMPLNT_TO_TM', String),
        ('ADDR_PCT_CD', Int64),
        ('RPT_DT', String),
        ('KY_CD', Int64),
        ('OFNS_DESC', String),
        ('PD_CD', Int64),
        ('PD_DESC', String),
        ('CRM_ATPT_CPTD_CD', String),
        ('LAW_CAT_CD', String),
        ('BORO_NM', String),
        ('LOC_OF_OCCUR_DESC', String),
        ('PREM_TYP_DESC', String),
        ('JURIS_DESC', String),
        ('JURISDICTION_CODE', Int64),
        ('PARKS_NM', String),
        ('HADEVELOPT', String),
        ('HOUSING_PSA', String),
        ('X_COORD_CD', Int64),
        ('Y_COORD_CD', Int64),
        ('SUSP_AGE_GROUP', String),
        ('SUSP_RACE', String),
        ('SUSP_SEX', String),
        ('TRANSIT_DISTRICT', String),
        ('Latitude', Float64),
        ('Longitude', Float64),
        ('Lat_Lon', String),
        ('PATROL_BORO', String),


### Handling datetime variables

In [3]:
complaint_lf = complaint_lf.with_columns(pl.col('CMPLNT_FR_DT').str.to_date("%m/%d/%Y"),
                                         pl.col('CMPLNT_TO_DT').str.to_date("%m/%d/%Y"),
                                         pl.col('RPT_DT').str.to_date("%m/%d/%Y").alias('report_date'),
                                         pl.when(pl.col("CMPLNT_TO_TM").str.contains('(null)'))
                                         .then(None).otherwise(pl.col('CMPLNT_TO_TM')).str.to_time("%H:%M:%S").name.keep()).drop('RPT_DT')

complaint_lf = complaint_lf.with_columns(pl.col('CMPLNT_FR_DT').dt.combine(pl.col('CMPLNT_FR_TM')).alias('cmplnt_from_date'),
                                         pl.col('CMPLNT_TO_DT').dt.combine(pl.col('CMPLNT_TO_TM')).alias('cmplnt_to_date')
                                         ).drop('CMPLNT_FR_DT','CMPLNT_FR_TM','CMPLNT_TO_DT','CMPLNT_TO_TM')

#### Checking `cmplnt_from_date` variable for dates that are not parsed correctly and bad data

In [4]:
cmplnt_from_datetime = complaint_lf.select('cmplnt_from_date').group_by('cmplnt_from_date').len('count').sort('cmplnt_from_date').collect()
cmplnt_from_datetime

cmplnt_from_date,count
datetime[μs],u32
,702
1010-05-14 20:00:00,1
1010-08-05 13:00:00,1
1010-08-22 13:35:00,1
1010-08-29 16:20:00,1
…,…
2023-12-31 23:34:00,1
2023-12-31 23:35:00,1
2023-12-31 23:37:00,1
2023-12-31 23:40:00,3


- In Renda's reference notebook [PrepCrimeDataForDB.ipynb](../reference/PrepCrimeDataForDB.ipynb), years that are $< 25$ are converted into years in 21st century years, whereas the data has years which are from 20th century.

In [5]:
cmplnt_from_datetime.filter(pl.col('cmplnt_from_date').dt.year().is_between(1800, 1925))

cmplnt_from_date,count
datetime[μs],u32
1900-03-10 19:00:00,1
1900-05-08 21:00:00,1
1900-06-02 19:00:00,1
1900-08-06 09:00:00,1
1900-08-07 08:30:00,1
…,…
1923-01-25 10:00:00,1
1923-04-05 17:00:00,1
1923-07-19 18:30:00,1
1923-09-15 15:00:00,1


- The dataset does contain some bad data, in this case there are some dates from 11th century. But considering the overall size of the data these are negligible and thus can be dropped or set to `null`.

In [6]:
cmplnt_from_datetime.filter(pl.col('cmplnt_from_date').dt.year() < 1900)

cmplnt_from_date,count
datetime[μs],u32
1010-05-14 20:00:00,1
1010-08-05 13:00:00,1
1010-08-22 13:35:00,1
1010-08-29 16:20:00,1
1010-10-28 20:09:00,1
…,…
1028-12-07 09:00:00,1
1029-02-07 23:00:00,1
1029-07-04 08:30:00,1
1029-10-29 20:40:00,1


#### Checking `cmplnt_to_date` variable for dates that are not parsed correctly and bad data

In [7]:
cmplnt_to_datetime = complaint_lf.select('cmplnt_to_date').group_by('cmplnt_to_date').len('count').sort('cmplnt_to_date').collect()
cmplnt_to_datetime

cmplnt_to_date,count
datetime[μs],u32
,1820391
1017-12-08 12:30:00,1
1018-09-28 12:56:00,1
1023-06-28 19:15:00,1
1023-08-29 12:19:00,1
…,…
2023-12-31 23:50:00,1
2023-12-31 23:53:00,1
2023-12-31 23:56:00,2
2024-01-01 00:03:00,1


- In Renda's reference notebook [PrepCrimeDataForDB.ipynb](../reference/PrepCrimeDataForDB.ipynb), years that are $< 25$ are converted into years in 21st century years, whereas the data has years which are from 20th century.

In [8]:
cmplnt_to_datetime.filter(pl.col('cmplnt_to_date').dt.year().is_between(1800, 1925))

cmplnt_to_date,count
datetime[μs],u32
1912-04-12 11:30:00,1
1912-05-10 18:29:00,1
1920-04-21 14:15:00,1
1920-12-30 18:07:00,1


- The dataset does contain some bad data, in this case there are some dates from 11th century. But considering the overall size of the data these are negligible and thus can be dropped or set to `null`.

In [9]:
cmplnt_to_datetime.filter(pl.col('cmplnt_to_date').dt.year() < 1900)

cmplnt_to_date,count
datetime[μs],u32
1017-12-08 12:30:00,1
1018-09-28 12:56:00,1
1023-06-28 19:15:00,1
1023-08-29 12:19:00,1


- Setting bad dates to `null`

In [10]:
complaint_lf = complaint_lf.with_columns(pl.when(pl.col('cmplnt_from_date').dt.year() < 1900).then(None).otherwise(pl.col('cmplnt_from_date')).name.keep(),
                                         pl.when(pl.col('cmplnt_to_date').dt.year() < 1900).then(None).otherwise(pl.col('cmplnt_to_date')).name.keep())

### Handling duplicate values in `CMPLNT_NUM`

In [11]:
complaint_lf.select(pl.len()).collect().item()

8914838

In [12]:
complaint_lf = complaint_lf.unique('CMPLNT_NUM')
complaint_lf.select(pl.len()).collect().item()

8913734

### Handling Latitude and Longitude variables

In [13]:
unique_lat_lon = complaint_lf.select(pl.col('Lat_Lon').unique(),
                                     pl.col('Lat_Lon').unique()
                                     .str.strip_chars('()')
                                     .str.split_exact(",",1)
                                     .struct.rename_fields(['lat','lon'])
                                     .alias("fields")).unnest("fields").with_columns(
                                         pl.col('lat').str.strip_chars(' ').cast(pl.Decimal),
                                         pl.col('lon').str.strip_chars(' ').cast(pl.Decimal)
                                         )

In [14]:
unique_lat_lon_df = unique_lat_lon.collect().to_pandas()
unique_lat_lon_df.head()

Unnamed: 0,Lat_Lon,lat,lon
0,"(40.762134, -73.798082)",40.762134,-73.798082
1,"(40.71028, -74.004114)",40.71028,-74.004114
2,"(40.69468745, -73.955197)",40.69468745,-73.955197
3,"(40.743217, -73.952835)",40.743217,-73.952835
4,"(40.800016, -73.969734)",40.800016,-73.969734


### Loading NYC Zip Code Data

* [Modified Zip Code Tabulation Areas (MODZCTA)](https://data.cityofnewyork.us/Health/Modified-Zip-Code-Tabulation-Areas-MODZCTA-/pri4-ifjk/about_data) is downloaded as `.geojson` locally and stored in `data` directory.

In [15]:
import geopandas

unique_lat_lon_gdf = geopandas.GeoDataFrame(unique_lat_lon_df,
                             geometry=geopandas.points_from_xy(unique_lat_lon_df.lon, unique_lat_lon_df.lat),
                             crs="EPSG:4326")

fname = 'MODZCTA.geojson'
fpath = os.path.join(sys.path[-1], data_dir, fname)

assert os.path.exists(fpath), f'{os.path.abspath(fpath)} does not exists!'
geo_df = geopandas.read_file(fpath)

Extracting zip code

In [16]:
import numpy as np

zips = np.empty(unique_lat_lon_gdf.shape[0], dtype=object)
for i, geom in enumerate(geo_df.geometry):
    zips[unique_lat_lon_gdf.within(geom)] = geo_df.modzcta[i]
zips[zips==None] = ''
unique_lat_lon_gdf['zipcode'] = zips

In [17]:
lat_lon_zip_lf = pl.from_pandas(unique_lat_lon_gdf[['Lat_Lon','zipcode']]).lazy()
lat_lon_zip_lf.collect_schema()

Schema([('Lat_Lon', String), ('zipcode', String)])

Joining the Complait data with extracted zip code data

In [18]:
complaint_lf = complaint_lf.join(lat_lon_zip_lf, on='Lat_Lon')
complaint_lf.collect_schema()

Schema([('CMPLNT_NUM', String),
        ('ADDR_PCT_CD', Int64),
        ('KY_CD', Int64),
        ('OFNS_DESC', String),
        ('PD_CD', Int64),
        ('PD_DESC', String),
        ('CRM_ATPT_CPTD_CD', String),
        ('LAW_CAT_CD', String),
        ('BORO_NM', String),
        ('LOC_OF_OCCUR_DESC', String),
        ('PREM_TYP_DESC', String),
        ('JURIS_DESC', String),
        ('JURISDICTION_CODE', Int64),
        ('PARKS_NM', String),
        ('HADEVELOPT', String),
        ('HOUSING_PSA', String),
        ('X_COORD_CD', Int64),
        ('Y_COORD_CD', Int64),
        ('SUSP_AGE_GROUP', String),
        ('SUSP_RACE', String),
        ('SUSP_SEX', String),
        ('TRANSIT_DISTRICT', String),
        ('Latitude', Float64),
        ('Longitude', Float64),
        ('Lat_Lon', String),
        ('PATROL_BORO', String),
        ('STATION_NAME', String),
        ('VIC_AGE_GROUP', String),
        ('VIC_RACE', String),
        ('VIC_SEX', String),
        ('report_date', Date),
     

- Cleaning variables with null value as '(null)' to actual `null` values.

In [19]:
import polars.selectors as cs

complaint_lf = complaint_lf.with_columns(pl.when(cs.by_dtype(pl.String).str.contains('(null)')).then(None).otherwise(cs.by_dtype(pl.String)).name.keep())

- Cast `Int64` dtypes to `Int32` in polars

In [20]:
complaint_lf = complaint_lf.with_columns(cs.by_dtype(pl.Int64).cast(pl.Int32).name.keep())

### Schema adjustments

* Existing schema for the dataset in `ai4sg` DB is given below:
```sql
CREATE TABLE `nyc_crime` (
  `id` int NOT NULL,
  `CMPLNT_NUM` text,
  `ADDR_PCT_CD` double DEFAULT NULL,
  `BORO_NM` text,
  `CRM_ATPT_CPTD_CD` text,
  `HADEVELOPT` text,
  `HOUSING_PSA` text,
  `JURISDICTION_CODE` int DEFAULT NULL,
  `JURIS_DESC` text,
  `KY_CD` int DEFAULT NULL,
  `LAW_CAT_CD` text,
  `LOC_OF_OCCUR_DESC` text,
  `OFNS_DESC` text,
  `PARKS_NM` text,
  `PATROL_BORO` text,
  `PD_CD` double DEFAULT NULL,
  `PD_DESC` text,
  `PREM_TYP_DESC` text,
  `report_date` datetime DEFAULT NULL,
  `STATION_NAME` text,
  `SUSP_AGE_GROUP` text,
  `SUSP_RACE` text,
  `SUSP_SEX` text,
  `TRANSIT_DISTRICT` text,
  `VIC_AGE_GROUP` text,
  `VIC_RACE` text,
  `VIC_SEX` text,
  `X_COORD_CD` int DEFAULT NULL,
  `Y_COORD_CD` int DEFAULT NULL,
  `Latitude` double DEFAULT NULL,
  `Longitude` double DEFAULT NULL,
  `Lat_Lon` text,
  `New Georeferenced Column` text,
  `zipcode` varchar(10) DEFAULT NULL,
  `cmplnt_from_date` datetime DEFAULT NULL,
  `cmplnt_to_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
);
```

In [21]:
complaint_lf.collect_schema()

Schema([('CMPLNT_NUM', String),
        ('ADDR_PCT_CD', Int32),
        ('KY_CD', Int32),
        ('OFNS_DESC', String),
        ('PD_CD', Int32),
        ('PD_DESC', String),
        ('CRM_ATPT_CPTD_CD', String),
        ('LAW_CAT_CD', String),
        ('BORO_NM', String),
        ('LOC_OF_OCCUR_DESC', String),
        ('PREM_TYP_DESC', String),
        ('JURIS_DESC', String),
        ('JURISDICTION_CODE', Int32),
        ('PARKS_NM', String),
        ('HADEVELOPT', String),
        ('HOUSING_PSA', String),
        ('X_COORD_CD', Int32),
        ('Y_COORD_CD', Int32),
        ('SUSP_AGE_GROUP', String),
        ('SUSP_RACE', String),
        ('SUSP_SEX', String),
        ('TRANSIT_DISTRICT', String),
        ('Latitude', Float64),
        ('Longitude', Float64),
        ('Lat_Lon', String),
        ('PATROL_BORO', String),
        ('STATION_NAME', String),
        ('VIC_AGE_GROUP', String),
        ('VIC_RACE', String),
        ('VIC_SEX', String),
        ('report_date', Date),
     

### Cleaned data

Implementing `.collect()` method on the entire lazyFrame to execute all calculations and joins in sequence.

In [22]:
print(complaint_lf.explain(format='plain'))

 WITH_COLUMNS:
 [when(col("CMPLNT_NUM").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("CMPLNT_NUM")).alias("CMPLNT_NUM"), when(col("OFNS_DESC").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("OFNS_DESC")).alias("OFNS_DESC"), when(col("PD_DESC").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("PD_DESC")).alias("PD_DESC"), when(col("CRM_ATPT_CPTD_CD").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("CRM_ATPT_CPTD_CD")).alias("CRM_ATPT_CPTD_CD"), when(col("LAW_CAT_CD").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("LAW_CAT_CD")).alias("LAW_CAT_CD"), when(col("BORO_NM").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("BORO_NM")).alias("BORO_NM"), when(col("LOC_OF_OCCUR_DESC").str.contains([String((null))])).then(null.strict_cast(String)).otherwise(col("LOC_OF_OCCUR_DESC")).alias("LOC_OF_OCCUR_DESC"), when(col("PRE

In [23]:
complaint_lf = complaint_lf.sort('cmplnt_from_date')
complaint_lf.head().collect()

CMPLNT_NUM,ADDR_PCT_CD,KY_CD,OFNS_DESC,PD_CD,PD_DESC,CRM_ATPT_CPTD_CD,LAW_CAT_CD,BORO_NM,LOC_OF_OCCUR_DESC,PREM_TYP_DESC,JURIS_DESC,JURISDICTION_CODE,PARKS_NM,HADEVELOPT,HOUSING_PSA,X_COORD_CD,Y_COORD_CD,SUSP_AGE_GROUP,SUSP_RACE,SUSP_SEX,TRANSIT_DISTRICT,Latitude,Longitude,Lat_Lon,PATROL_BORO,STATION_NAME,VIC_AGE_GROUP,VIC_RACE,VIC_SEX,report_date,cmplnt_from_date,cmplnt_to_date,zipcode
str,i32,i32,str,i32,str,str,str,str,str,str,str,i32,str,str,str,i32,i32,str,str,str,str,f64,f64,str,str,str,str,str,str,date,datetime[μs],datetime[μs],str
"""24943522""",33,112,"""THEFT-FRAUD""",739,"""FRAUD,UNCLASSIFIED-FELONY""","""COMPLETED""","""FELONY""","""MANHATTAN""","""INSIDE""","""OTHER""","""N.Y. POLICE DEPT""",0,,,,1000252,246654,,,,,40.843669,-73.942165,"""(40.843669, -73.942165)""","""PATROL BORO MAN NORTH""",,"""25-44""","""BLACK""","""F""",2006-10-23,,2006-10-23 11:30:00,"""10032"""
"""73887209""",115,109,"""GRAND LARCENY""",421,"""LARCENY,GRAND FROM VEHICLE/MOT…","""COMPLETED""","""FELONY""","""QUEENS""",,"""STREET""","""N.Y. POLICE DEPT""",0,,,,1014826,211611,,,,,40.747447,-73.889651,"""(40.74744698, -73.88965055)""","""PATROL BORO QUEENS NORTH""",,"""25-44""","""ASIAN / PACIFIC ISLANDER""","""F""",2010-07-28,,2010-07-28 21:35:00,"""11372"""
"""183889918""",100,112,"""THEFT-FRAUD""",739,"""FRAUD,UNCLASSIFIED-FELONY""","""COMPLETED""","""FELONY""","""QUEENS""","""INSIDE""","""RESIDENCE - APT. HOUSE""","""N.Y. POLICE DEPT""",0,,,,1036753,152447,,,,,40.584953,-73.810975,"""(40.584953, -73.810975)""","""PATROL BORO QUEENS SOUTH""",,"""65+""","""BLACK""","""M""",2018-06-15,,2018-05-28 10:00:00,"""11693"""
"""141704704""",72,341,"""PETIT LARCENY""",338,"""LARCENY,PETIT FROM BUILDING,UN""","""COMPLETED""","""MISDEMEANOR""","""BROOKLYN""","""INSIDE""","""RESIDENCE - APT. HOUSE""","""N.Y. POLICE DEPT""",0,,,,983984,175071,,,,,40.647207,-74.000955,"""(40.647207, -74.000955)""","""PATROL BORO BKLYN SOUTH""",,"""25-44""","""WHITE""","""F""",2015-03-27,,2015-03-27 19:00:00,"""99999"""
"""157284894""",40,109,"""GRAND LARCENY""",439,"""LARCENY,GRAND FROM OPEN AREAS,…","""COMPLETED""","""FELONY""","""BRONX""",,"""TRANSIT - NYC SUBWAY""","""N.Y. TRANSIT POLICE""",1,,,,1004926,234532,,,,"""12""",40.810396,-73.925311,"""(40.81039618, -73.92531074)""","""PATROL BORO BRONX""","""JACKSON AVENUE""","""18-24""","""WHITE HISPANIC""","""F""",2016-10-04,,,"""10454"""


In [24]:
complaint_lf_len = complaint_lf.select(pl.len()).collect().item()
complaint_lf_len

8913268

- Breaking down the entire LazyFrame to chunks and loading the contents directly into MySQL DB. The chunk size can be increased for faster transfer speed depending on the memory available.

**Note:** The table in DB should not contain any rows prior to this operation.

In [None]:
from sqlalchemy import create_engine

# Create a SQLAlchemy engine
engine = create_engine("mysql+pymysql://admin:password@localhost:3306/vinay")

offset = 0
chunk_size = 1000000

while offset < complaint_lf_len:
    complaint_df = complaint_lf.slice(offset, chunk_size).collect()
    # Load data into MySQL using polars to_sql
    complaint_df.write_database('nyc_crime', connection=engine, if_table_exists='append')
    offset += chunk_size

### Exporting to `.csv`