In [1]:
import polars as pl             # faster and more efficient than pandas
import pyarrow as pa
import pyarrow.parquet as pq
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
import urllib.parse
# base_url = 'https://data.cityofnewyork.us/api/v3/views/p937-wjvj/query.csv'
block_size = 100_000  # Number of rows per block
#output_parquet = 'rodent_inspection_data.parquet'
output_parquet = '311_service_requests_data.parquet'

base_url = 'https://data.cityofnewyork.us/resource/erm2-nwe9.csv'
max_workers = 4 # Number of threads for parallel processing

columns = [ # channel_type --> open_data_channel_type
    "unique_key", "created_date", "closed_date", "agency", "agency_name",
    "complaint_type", "descriptor", "location_type", "incident_zip",
    "city", "status", "resolution_action_updated_date", "borough", "open_data_channel_type"
]

def download_block (offset):
    # Socrata API in SQL -- pending to test SODA 3
    soql = (
        f"SELECT {', '.join(columns)} "
        f"LIMIT {block_size} OFFSET {offset}"  
    )
    
    encoded_query = urllib.parse.quote(soql, safe='')
    url = f"{base_url}?$query={encoded_query}"
    
    try: 
        df_block = pl.read_csv(url, 
                               columns=columns, 
                               schema_overrides={"incident_zip": pl.Utf8}
                               )
        if df_block.height == 0:
            return None
        table = df_block.to_arrow() # File type transformation to arrow
        return (offset, table)
    except Exception as e:
        print(f"Error downloading block at offset {offset}: {e}")
        return None

In [None]:
# download loop and offset counter

offset = 0
first_block = True
writer = None

while True:
    # Use ThreadPoolExecutor to download multiple blocks in parallel
    offsets = [offset + i*block_size for i in range(max_workers)]
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(download_block, o): o for o in offsets}
        
        all_done = False
        for future in as_completed(futures):
            result = future.result()
            if result is None:
                all_done = True
                continue
            
            block_offset, table = result
            
            # Initialize ParquetWriter on first block
            if first_block:
                writer = pq.ParquetWriter('311_service_requests_data.parquet', table.schema, compression='snappy')
                first_block = False
            
            writer.write_table(table)
            print(f"Downloaded and wrote block at offset {block_offset}")
    if all_done:
        break
    offset += max_workers * block_size
    
# Closing file writer
if writer:
    writer.close()

print("All data downloaded and saved to Parquet.")

In [4]:
# building dimensional models --> modify application, create single build

import urllib.parse
base_url = 'https://data.cityofnewyork.us/resource/p937-wjvj.csv'
block_size = 100_000  # Number of rows per block
output_parquet = 'rodent_inspection_data.parquet'
#output_parquet = '311_service_requests_data.parquet'

#base_url = 'https://data.cityofnewyork.us/resource/erm2-nwe9.csv'
max_workers = 4 # Number of threads for parallel processing

columns = [ # inspection dimension
    "job_ticket_or_work_order_id", "job_id", "job_progress", "inspection_date", "result",
    "borough", "inspection_type", "zip_code", "nta" # added attributes for analysis
]

def download_block (offset):
    # Socrata API in SQL -- pending to test SODA 3
    soql = (
        f"SELECT {', '.join(columns)} "
        f"LIMIT {block_size} OFFSET {offset}"  
    )
    
    encoded_query = urllib.parse.quote(soql, safe='')
    url = f"{base_url}?$query={encoded_query}"
    
    try: 
        df_block = pl.read_csv(url, 
                               columns=columns, 
                               schema_overrides={"zip_code": pl.Utf8}
                               )
        if df_block.height == 0:
            return None
        table = df_block.to_arrow() # File type transformation to arrow
        return (offset, table)
    except Exception as e:
        print(f"Error downloading block at offset {offset}: {e}")
        return None

In [5]:
# download loop and offset counter

offset = 0
first_block = True
writer = None

while True:
    # Use ThreadPoolExecutor to download multiple blocks in parallel
    offsets = [offset + i*block_size for i in range(max_workers)]
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(download_block, o): o for o in offsets}
        
        all_done = False
        for future in as_completed(futures):
            result = future.result()
            if result is None:
                all_done = True
                continue
            
            block_offset, table = result
            
            # Initialize ParquetWriter on first block
            if first_block:
                writer = pq.ParquetWriter('rodent_inspection_data.parquet', table.schema, compression='snappy')
                first_block = False
            
            writer.write_table(table)
            print(f"Downloaded and wrote block at offset {block_offset}")
    if all_done:
        break
    offset += max_workers * block_size
    
# Closing file writer
if writer:
    writer.close()

print("All data downloaded and saved to Parquet.")

Downloaded and wrote block at offset 200000
Downloaded and wrote block at offset 0
Downloaded and wrote block at offset 300000
Downloaded and wrote block at offset 100000
Downloaded and wrote block at offset 700000
Downloaded and wrote block at offset 500000
Downloaded and wrote block at offset 400000
Downloaded and wrote block at offset 600000
Downloaded and wrote block at offset 800000
Downloaded and wrote block at offset 1000000
Downloaded and wrote block at offset 1100000
Downloaded and wrote block at offset 900000
Downloaded and wrote block at offset 1400000
Downloaded and wrote block at offset 1200000
Downloaded and wrote block at offset 1300000
Downloaded and wrote block at offset 1500000
Downloaded and wrote block at offset 1600000
Downloaded and wrote block at offset 1900000
Downloaded and wrote block at offset 1800000
Downloaded and wrote block at offset 1700000
Downloaded and wrote block at offset 2000000
Downloaded and wrote block at offset 2300000
Downloaded and wrote bloc

In [6]:
import polars as pl

stg = pl.read_parquet("311_service_requests_data.parquet")

# converting created_date from string to datetime (ISO-ish: 2025-03-31T17:43:18.000)
stg = stg.with_columns(
    pl.col("created_date").str.strptime(
        pl.Datetime,
        format="%Y-%m-%dT%H:%M:%S%.3f",
        strict=False   # empty or invalid values will return null
    )
)

# converting closed_date and resolution_action_updated_date to datetime
stg = stg.with_columns(
    pl.col("closed_date").str.strptime(
        pl.Datetime,
        format="%Y-%m-%dT%H:%M:%S%.3f",
        strict=False
    ),
    pl.col("resolution_action_updated_date").str.strptime(
        pl.Datetime,
        format="%Y-%m-%dT%H:%M:%S%.3f",
        strict=False
    )
)

In [10]:

# location dimension build | select query
dim_location = (
    stg.select([
        pl.lit("NY").alias("state"),
        pl.lit("USA").alias("country"),
        "city",
        "borough",
        pl.col("incident_zip").alias("zip_code")
    ]).unique().with_row_index("location_dim_id") # surrogate key
)
# creating parquet file
dim_location.write_parquet("dim_location.parquet")

# time dimension [ created_date ] 
dim_time = (
    stg.select([
        pl.col("created_date").dt.date().alias("date"),
        pl.col("created_date").dt.year().alias("year"),
        pl.col("created_date").dt.month().alias("month"),
        pl.col("created_date").dt.strftime("%Y-%m").alias("YYYY-MM"),
        pl.col("created_date").dt.strftime("%b").alias("month_name"), # abbreviated month name
        pl.col("created_date").dt.day().alias("day"),
        pl.col("created_date").dt.week().alias("week"),
        pl.col("created_date").dt.quarter().alias("quarter"),
    ]).unique().with_row_index("time_dim_id")
)
# creating parquet file
dim_time.write_parquet("dim_time.parquet")
print("dim_time\n",
      "status: created\n")

# agency dimension
dim_agency = (
    stg.select([
        "agency",
        "agency_name"
    ]).unique().with_row_index("agency_dim_id")
)
# creating parquet file
dim_agency.write_parquet("dim_agency.parquet")
print("dim_agency\n",
      "status: created\n")

# status dimension
dim_status = (
    stg.select(pl.col("status").alias("status_type"))
    .unique().with_row_index("status_dim_id")
)
# creating parquet file
dim_status.write_parquet("dim_status.parquet")
print("dim_status\n",
      "status: created\n")

# channel type dimension
dim_channel = (
    stg.select(pl.col("open_data_channel_type").alias("channel_type"))
    .unique().with_row_index("channel_dim_id")
)
# creating parquet file
dim_channel.write_parquet("dim_channel_type.parquet")
print("dim_channel\n",
      "status: created\n")

# complaint type dimension
dim_complaint = (
    stg.select(pl.col("complaint_type").alias("complaint_name"))
    .unique().with_row_index("complaint_type_dim_id")
)
# creating parquet file
dim_complaint.write_parquet("dim_complaint_type.parquet")
print("dim_complaint\n",
      "status: created\n")


dim_time
 status: created

dim_agency
 status: created

dim_status
 status: created

dim_channel
 status: created

dim_complaint
 status: created



In [11]:
print(stg.select(pl.col("created_date").tail()))
print(stg.schema)

shape: (10, 1)
┌─────────────────────┐
│ created_date        │
│ ---                 │
│ datetime[ms]        │
╞═════════════════════╡
│ 2022-01-17 08:17:24 │
│ 2022-01-17 07:00:12 │
│ 2022-01-18 08:32:10 │
│ 2022-01-17 07:14:00 │
│ 2022-01-18 10:16:45 │
│ 2022-01-18 14:20:16 │
│ 2022-01-18 13:03:16 │
│ 2022-01-19 00:47:42 │
│ 2022-01-18 03:10:02 │
│ 2022-01-18 11:30:24 │
└─────────────────────┘
Schema({'unique_key': Int64, 'created_date': Datetime(time_unit='ms', time_zone=None), 'closed_date': Datetime(time_unit='ms', time_zone=None), 'agency': String, 'agency_name': String, 'complaint_type': String, 'descriptor': String, 'location_type': String, 'incident_zip': String, 'city': String, 'status': String, 'resolution_action_updated_date': Datetime(time_unit='ms', time_zone=None), 'borough': String, 'open_data_channel_type': String})


In [13]:
# creating 311 service requests fact table
fact = (
    stg
    # location join
    .join(dim_location, left_on=["city", "borough", "incident_zip"], 
                        right_on=["city", "borough", "zip_code"], how="left")
    
    # time join
    .join(dim_time.select(["time_dim_id", "date"]), left_on=pl.col("created_date").dt.date(), right_on=pl.col("date"),how="left")
    
    # agency join
    .join(dim_agency, on=["agency", "agency_name"], how="left")
    
    # status join
    .join(dim_status, left_on="status", right_on="status_type", how="left")
    
    # channel type join
    .join(dim_channel, left_on="open_data_channel_type", right_on="channel_type", how="left")
    
    # complaint type join
    .join(dim_complaint, left_on="complaint_type", right_on="complaint_name", how="left")
    
    # aggregate to match the fact grain
    .group_by([
        "location_dim_id",
        "time_dim_id",
        "channel_dim_id",
        "complaint_type_dim_id",
        "agency_dim_id",
        "status_dim_id"
    ]).agg(
        pl.count("unique_key").alias("complaint_count")
    )
)

fact.write_parquet("fact_311_service_requests.parquet")

In [15]:
print(fact.tail())

shape: (5, 7)
┌──────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ location_dim ┆ time_dim_id ┆ channel_dim ┆ complaint_t ┆ agency_dim_ ┆ status_dim_ ┆ complaint_c │
│ _id          ┆ ---         ┆ _id         ┆ ype_dim_id  ┆ id          ┆ id          ┆ ount        │
│ ---          ┆ u32         ┆ ---         ┆ ---         ┆ ---         ┆ ---         ┆ ---         │
│ u32          ┆             ┆ u32         ┆ u32         ┆ u32         ┆ u32         ┆ u32         │
╞══════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╡
│ 18           ┆ 4818        ┆ 3           ┆ 126         ┆ 11          ┆ 9           ┆ 3           │
│ 1989         ┆ 2972        ┆ 0           ┆ 90          ┆ 624         ┆ 9           ┆ 1           │
│ 2094         ┆ 3557        ┆ 0           ┆ 83          ┆ 159         ┆ 1           ┆ 1           │
│ 2993         ┆ 233         ┆ 3           ┆ 123         ┆ 1039        ┆ 9   

In [16]:
# creating rodent inspection data dimensional model

rodent_stg = pl.read_parquet("rodent_inspection_data.parquet")

# Convert created_date from string → datetime (ISO-ish: 2025-03-31T17:43:18.000)
rodent_stg = rodent_stg.with_columns(
    pl.col("inspection_date").str.strptime(
        pl.Datetime,
        format="%Y-%m-%dT%H:%M:%S%.3f",
        strict=False   # invalid/empty → null
    )
)

In [17]:
'''
# inspection dimension
    "job_ticket_or_work_order_id", "job_id", "job_progress", "inspection_date", "result",
    "borough", "inspection_type", "zip_code" # added attributes for analysis
    
    '''

# creating location | no 'city' column, renaming nta to neighborhood
dim_rodent_location = (
    rodent_stg.select([
    #    pl.lit("NY").alias("state"),
    #    pl.lit("USA").alias("country"),
        pl.col("nta").alias("neighborhood"),
        "borough",
        "zip_code"
    ]).unique().with_row_index("location_dim_id") # surrogate key
)
# creating parquet file
dim_rodent_location.write_parquet("dim_rodent_location.parquet")

# time dimension [ created_date ] 
dim_rodent_time = (
    rodent_stg.select([
        pl.col("inspection_date").dt.date().alias("date"),
        pl.col("inspection_date").dt.year().alias("year"),
        pl.col("inspection_date").dt.month().alias("month"),
        pl.col("inspection_date").dt.strftime("%Y-%m").alias("YYYY-MM"),
        pl.col("inspection_date").dt.strftime("%b").alias("month_name"), # abbreviated month name
        pl.col("inspection_date").dt.day().alias("day"),
        pl.col("inspection_date").dt.week().alias("week"),
        pl.col("inspection_date").dt.quarter().alias("quarter"),
    ]).unique().with_row_index("time_dim_id")
)
# creating parquet file
dim_rodent_time.write_parquet("dim_rodent_time.parquet")
print("dim_rodent_time\n",
      "status: created\n")

# inspection 'status' dimension
dim_rodent_inspection = (
    rodent_stg.select([
        "job_ticket_or_work_order_id",
        "job_id",
        "inspection_type", # added, not included in documentation
        "job_progress"
    ]).unique().with_row_index("inspection_dim_id")
)
# creating parquet file
dim_rodent_inspection.write_parquet("dim_rodent_inspection.parquet")
print("dim_rodent_inspection\n",
      "status: created\n")

# inspection dimension
dim_rodent_result = (
    rodent_stg.select(
        pl.col("result").alias("inspection_result").unique())
    .unique()
    .with_row_index("inspection_result_dim_id")
)
# creating parquet file
dim_rodent_result.write_parquet("dim_rodent_result.parquet")
print("dim_rodent_result\n",
      "status: created\n")


dim_rodent_time
 status: created

dim_rodent_inspection
 status: created

dim_rodent_result
 status: created



In [18]:
# creating rodent inspection fact table

fact_rodent = (
    rodent_stg
    # location join
    .join(
        dim_rodent_location,
        left_on=["borough", "zip_code", "nta"],
        right_on=["borough", "zip_code", "neighborhood"],
        how="left"
    )
    
    # time join
    .join(
        dim_rodent_time.select(["time_dim_id", "date"]),
        left_on=pl.col("inspection_date").dt.date(),
        right_on=pl.col("date"),
        how="left"
    )
    
    # inspection join
    .join(
        dim_rodent_inspection,
        on=["job_ticket_or_work_order_id", "job_id", "inspection_type", "job_progress"],
        how="left"
    )
    
    # result join
    .join(
        dim_rodent_result,
        left_on="result",
        right_on="inspection_result",
        how="left"
    )
    .group_by([
        "location_dim_id",
        "time_dim_id",
        "inspection_dim_id",
        "inspection_result_dim_id"
    ]).agg(
        pl.count().alias("inspection_count")
    )
)

fact_rodent.write_parquet("fact_rodent_inspection.parquet")

(Deprecated in version 0.20.5)
  pl.count().alias("inspection_count")


In [20]:
print(fact_rodent.head())

shape: (5, 5)
┌─────────────────┬─────────────┬───────────────────┬──────────────────────────┬──────────────────┐
│ location_dim_id ┆ time_dim_id ┆ inspection_dim_id ┆ inspection_result_dim_id ┆ inspection_count │
│ ---             ┆ ---         ┆ ---               ┆ ---                      ┆ ---              │
│ u32             ┆ u32         ┆ u32               ┆ u32                      ┆ u32              │
╞═════════════════╪═════════════╪═══════════════════╪══════════════════════════╪══════════════════╡
│ 249             ┆ 802         ┆ 176277            ┆ 0                        ┆ 1                │
│ 463             ┆ 2919        ┆ 2251705           ┆ 1                        ┆ 1                │
│ 249             ┆ 1981        ┆ 1350765           ┆ 0                        ┆ 2                │
│ 1061            ┆ 3204        ┆ 2357081           ┆ 5                        ┆ 1                │
│ 977             ┆ 573         ┆ 949585            ┆ 1                        ┆ 1    