In [None]:
from sodapy import Socrata
import polars as pl
import pandas as pd
import os
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta


import warnings
warnings.filterwarnings("ignore")

## Ingest NYC 311 Service Request Data

Ingesting history from 2020-01-01

In [None]:
# get the API token 
ENV = pd.read_json("env.json", typ="series")
APP_TOKEN = ENV["APP_TOKEN"]


In [None]:
client = Socrata("data.cityofnewyork.us", APP_TOKEN, timeout=120)

# file path to save the cached raw data 
raw_file_path = "data/cache_raw_data"
os.makedirs(raw_file_path, exist_ok=True)

# all cached raw files have the same naming convention as "nyc_311_data_YYYY_MM_DD_to_YYYY_MM_DD.parquet"
# the first YYYY_MM_DD is the data start date and the second YYYY_MM_DD is the data through date
# for example, "nyc_311_data_2020_01_01_to_2020_01_31.parquet" contains requests submitted from 2020-01-01 to 2020-01-31
# in the backfill process, I pull data one month at a time
# I want the ingestion code to be rerunable and efficient. I don't want to download the same data again if it has already been downloaded
# to do that, I check the maximum data through date in the cached raw data directory
# if the maximum date exists, I use the next day as the beginning date for the next API call
# if no files exist in the cached raw data directory, I set the start date to 2020-01-01

# find the maximum data through date from the downloaded files
downloaded_files = set(os.listdir(raw_file_path))
thru_date_str_list = [re.search(r'to_(\d{4}_\d{2}_\d{2})\.parquet', file).group(1) for file in downloaded_files]
thru_date_str_list = [datetime.strptime(date_str, '%Y_%m_%d') for date_str in thru_date_str_list]

# if there are already downloaded files, then start date is the day after the latest thru_date
# if there are no downloaded files, then set start date to 2020-01-01
if thru_date_str_list:
    start_date = max(thru_date_str_list) + relativedelta(days=1)
else:
    start_date = datetime(2020, 1, 1)

# set end date to today
end_date = datetime.today()

# set the initial start date for the loop to be the start date we just defined 
date = start_date

while date < end_date:
    # if the next month is not fully covered, set the thru_date to end date (today)
    if date + relativedelta(months=1) - relativedelta(days=1) < end_date:
        thru_date = (date + relativedelta(months=1)) - relativedelta(days=1)
    else:
        thru_date = end_date

    # save the data to a file named "nyc_311_data_YYYY_MM_DD_to_YYYY_MM_DD.parquet"
    file_name = f"nyc_311_data_{date.strftime('%Y_%m_%d')}_to_{thru_date.strftime('%Y_%m_%d')}.parquet"
    # get the full file path by joining the directory and file name
    output_path = os.path.join(raw_file_path, file_name)

    # define the where clause for the API call
    where_clause = (
            f"created_date >= '{date.strftime('%Y-%m-%dT00:00:00')}' "
            f"AND created_date <= '{thru_date.strftime('%Y-%m-%dT23:59:59')}'"
        )
    print(f"Pulling data from {date.strftime('%Y-%m-%d')} to {thru_date.strftime('%Y-%m-%d')}")
    
    # make the API call to get the data
    json_data = client.get("erm2-nwe9", where=where_clause, limit=20000000)
    # save the data to a polar dataframe first, then write it to a parquet file
    df = pl.from_dicts(json_data)
    df.write_parquet(output_path)

    # repeat the process for the next month
    date = thru_date + relativedelta(days=1)


Pulling data from 2020-02-01 to 2020-02-29
Pulling data from 2020-03-01 to 2020-03-31
Pulling data from 2020-04-01 to 2020-04-30
Pulling data from 2020-05-01 to 2020-05-31
Pulling data from 2020-06-01 to 2020-06-30
Pulling data from 2020-07-01 to 2020-07-31
Pulling data from 2020-08-01 to 2020-08-31
Pulling data from 2020-09-01 to 2020-09-30
Pulling data from 2020-10-01 to 2020-10-31
Pulling data from 2020-11-01 to 2020-11-30
Pulling data from 2020-12-01 to 2020-12-31
Pulling data from 2021-01-01 to 2021-01-31
Pulling data from 2021-02-01 to 2021-02-28
Pulling data from 2021-03-01 to 2021-03-31
Pulling data from 2021-04-01 to 2021-04-30
Pulling data from 2021-05-01 to 2021-05-31
Pulling data from 2021-06-01 to 2021-06-30
Pulling data from 2021-07-01 to 2021-07-31
Pulling data from 2021-08-01 to 2021-08-31
Pulling data from 2021-09-01 to 2021-09-30
Pulling data from 2021-10-01 to 2021-10-31
Pulling data from 2021-11-01 to 2021-11-30
Pulling data from 2021-12-01 to 2021-12-31
Pulling dat

In [135]:
# Load data from parquet files to pandas DF

all_parquet_files = [os.path.join(raw_file_path, f) for f in os.listdir(raw_file_path)]

dfs = []
for file in all_parquet_files:
    df = pd.read_parquet(file)
    dfs.append(df)

final_df = pd.concat(dfs, ignore_index=True)

print(final_df.shape)
display(final_df.head())

(17899464, 41)


Unnamed: 0,unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,...,landmark,facility_type,vehicle_type,bridge_highway_name,bridge_highway_segment,taxi_pick_up_location,bridge_highway_direction,road_ramp,taxi_company_borough,due_date
0,63248362,2024-12-01T00:00:03.000,2024-12-02T12:01:57.000,HPD,Department of Housing Preservation and Develop...,HEAT/HOT WATER,ENTIRE BUILDING,RESIDENTIAL BUILDING,10462,764 BRADY AVENUE,...,,,,,,,,,,
1,63249651,2024-12-01T00:00:07.000,2024-12-01T02:00:18.000,NYPD,New York City Police Department,Noise - Residential,Loud Music/Party,Residential Building/House,10466,655 EAST 230 STREET,...,EAST 230 STREET,,,,,,,,,
2,63258990,2024-12-01T00:01:04.000,2025-04-29T00:00:00.000,DOB,Department of Buildings,Building/Use,Illegal Conversion Of Residential Building/Space,,11419,107-23 115 STREET,...,,,,,,,,,,
3,63248965,2024-12-01T00:01:05.000,2024-12-01T05:52:17.000,NYPD,New York City Police Department,Blocked Driveway,No Access,Street/Sidewalk,11377,32-28 68 STREET,...,68 STREET,,,,,,,,,
4,63250662,2024-12-01T00:01:19.000,2024-12-01T00:01:19.000,DOHMH,Department of Health and Mental Hygiene,Rodent,Condition Attracting Rodents,3+ Family Apt. Building,11379,66-29 74 STREET,...,74 STREET,,,,,,,,,


In [136]:
# clean the raw data

# select columns of interest
cols_of_interest = [
    "unique_key",
    "created_date",
    "closed_date",
    "agency",
    "agency_name",
    "complaint_type",
    "descriptor",
    "incident_zip",
    "city",
    "facility_type",
    "status",
    "due_date",
    "resolution_action_updated_date",
    "borough",
    "x_coordinate_state_plane",
    "y_coordinate_state_plane",
    "open_data_channel_type",
    "latitude",
    "longitude"
]
df_cleaned = final_df[cols_of_interest]

# change column types
for col in df_cleaned.columns:
    if 'date' in col or 'time' in col:
        df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
    elif 'coordinate' in col or 'latitude' in col or 'longitude' in col:
        df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')

# create some derived columns
df_cleaned['days_to_close'] = (df_cleaned['closed_date'] - df_cleaned['created_date']).dt.days
df_cleaned['created_month'] = pd.to_datetime(df_cleaned['created_date']).dt.to_period('M').dt.to_timestamp()
df_cleaned['closed_month'] = pd.to_datetime(df_cleaned['closed_date']).dt.to_period('M').dt.to_timestamp()



In [140]:
# save cleaned data to a parquet file
os.makedirs('data/cache_clean_data', exist_ok=True)
df_cleaned.to_parquet("data/cache_clean_data/cleaned_data.parquet", index=False)

## Ingest NYC Population by Zipcode Data

sourced from https://data.census.gov/

In [141]:
nyc_population = pd.read_csv("data/nyc_population.csv", skiprows=1)
nyc_population.rename(columns={"Geographic Area Name": "zip", "Estimate!!Total": "population", "Margin of Error!!Total": "population_margin_of_error"}, inplace=True)
nyc_population.drop(columns=['Unnamed: 4'], inplace=True)
nyc_population['zip'] = nyc_population['zip'].str.replace('ZCTA5 ', '', regex=False)

In [142]:
display(nyc_population.head())

Unnamed: 0,Geography,zip,population,population_margin_of_error
0,860Z200US06390,6390,53,34
1,860Z200US10001,10001,29079,1790
2,860Z200US10002,10002,75517,3196
3,860Z200US10003,10003,53825,2771
4,860Z200US10004,10004,3875,914


In [143]:
nyc_population.to_csv("data/cache_clean_data/nyc_population.csv", index=False)