## Imports

In [36]:
from datetime import datetime

In [37]:
import dask.dataframe as dd
import pandas as pd

## Load data

In [38]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, memory_limit="4GB")
client  = Client(cluster)       # opens a dashboard at http://127.0.0.1:8787

Perhaps you already have a cluster running?
Hosting the HTTP server on port 51351 instead


In [39]:
from dask_mongo import read_mongo

# Connection
mongo_uri   = "mongodb://localhost:27017"
database    = "streamingDB"
traffic_coll  = "traffic_speeds"
collision_coll = "collisions_ts"


tr_bag = read_mongo(
    connection_kwargs={"host": mongo_uri},
    database=database,
    collection=traffic_coll,
    chunksize=5000,
)

col_bag = read_mongo(
    connection_kwargs={"host": mongo_uri},
    database=database,
    collection=collision_coll,
    chunksize=5000,
)

# # Pull everything (you can pass a query / projection to cut size)
# tr_df = read_mongo(
#         connection_string=mongo_uri,
#         database=database,
#         collection=traffic_coll,
#         partition_field="_id",     # how data are chunked; any indexed field works
#         partition_size=2000      # ≈ docs per partition
#       ).persist()

# col_df = read_mongo(
#         connection_string=mongo_uri,
#         database=database,
#         collection=collision_coll,
#         partition_field="_id",     # how data are chunked; any indexed field works
#         partition_size=2000      # ≈ docs per partition
#       ).persist()


In [5]:
col_df = col_bag.to_dataframe().persist()

In [34]:
tr_df = tr_bag.to_dataframe().persist()

2025-05-12 21:40:35,894 - distributed.worker - ERROR - Compute Failed
Key:       ('getitem-fused-dropna-9edc90a0c10136aa45c954ddbfbecce7', 280)
State:     executing
Function:  execute_task
args:      ((<function Fused._execute_task at 0x14c277b80>, {'getitem-fused-dropna-9edc90a0c10136aa45c954ddbfbecce7': ('dropna-019bfb88fe41b3757b94538738914395', 280), ('dropna-019bfb88fe41b3757b94538738914395', 280): (<function apply at 0x1049c5700>, <methodcaller: dropna>, [('getitem-a7b3ac0c6babf67c5e30ef729654d48c', 280)], {'subset': None}), ('getitem-a7b3ac0c6babf67c5e30ef729654d48c', 280): (<built-in function getitem>, ('assign-407565b7035aa249ba1b54af9751bf2d', 280), ['timestamp', 'street', 'speed_mph', 'c_lat', 'c_long']), ('assign-407565b7035aa249ba1b54af9751bf2d', 280): (<function assign at 0x14c250a60>, ('getitem-d18a172fc264dabe67b303d4f184de16', 280), 'c_lat', ('getitem-e34f35ad2e9c8757016bb4c4c37f1890', 280), 'c_long', ('getitem-dad594a220585f147cb0bacb9d4a2f46', 280)), ('getitem-e34f35

#### Viewing data

In [4]:
tr_df.head()

Unnamed: 0,_id,timestamp,street,coordinates,speed_mph
0,68226c3f361b9d779d8a98da,2024-01-01T00:03:03.000,WSE S BLLOMINGDALE ROAD - TYRELLAN AVENUE,"40.52581,-74.23039 40.52593,-74.228371 40.5261...",0.0
1,68226c5b361b9d779d8a98dc,2024-01-01T00:03:03.000,WSE N VICTORY BLVD - SOUTH AVENUE,"40.6020904,-74.1877 40.600331,-74.18943 40.597...",64.62
2,68226c5b361b9d779d8a98dd,2024-01-01T00:03:03.000,WSE N ARDEN AVENUE - VICTORY BLVD,"40.5902,-74.19332 40.57748,-74.19046 40.57623,...",62.75
3,68226c5b361b9d779d8a98de,2024-01-01T00:03:03.000,WSE N BLOOMUINGDALE ROAD - ARDEN AVENUE,"40.56042,-74.199391 40.55924,-74.20076 40.5585...",62.75
4,68226c5b361b9d779d8a98df,2024-01-01T00:03:03.000,WSE N TYRELLAN AVENUE - BLOOMINGDALE ROAD,"40.52561,-74.23039 40.5258705,-74.22618 40.526...",0.0


In [5]:
tr_df.tail()

Unnamed: 0,_id,timestamp,street,coordinates,speed_mph
4832,68226ea2361b9d779dabb745,2024-03-15T00:49:09.000,SIE E SOUTH AVENUE - RICHMOND AVENUE,"40.6210105,-74.168861 40.6207604,-74.168 40.61...",55.3
4833,68226ea2361b9d779dabb746,2024-03-15T00:49:09.000,SIE W BRADLEY AVENUE - WOOLEY AVENUE,"40.6077805,-74.14091 40.60826,-74.132101",62.13
4834,68226ea2361b9d779dabb747,2024-03-15T00:49:09.000,SIE W WOOLEY AVENUE - RICHMOND AVENUE,"40.6152105,-74.157401 40.61231,-74.15362 40.60...",62.13
4835,68226ea2361b9d779dabb748,2024-03-15T00:49:09.000,MLK S - SIE W WALKER STREET - RICHMOND AVENUE,"40.63092,-74.14592 40.62975,-74.14593 40.62877...",50.33
4836,68226ea2361b9d779dabb749,2024-03-15T00:49:09.000,SIE W RICHMOND AVENUE - SOUTH AVENUE,"40.6151706,-74.15738 40.61739,-74.16056 40.620...",62.13


In [6]:
tr_df.nunique('data_as_of').compute()

_id            2059244
timestamp        97650
street             123
coordinates        121
speed_mph          156
dtype: int64

In [8]:
tr_df.min().compute()

_id                             682134542721cb0278424a06
timestamp                        2025-05-11T12:19:03.000
street       11th ave n ganservoort - 12th ave @ 40th st
speed_mph                                            0.0
dtype: object

In [9]:
tr_df.max().compute()

_id                                   68213f23ba47b72ddd0c1bfb
timestamp                              2025-05-11T20:14:11.000
street       Whitestone Expwy S Exit 14 (Linden Pl) - VWE S...
speed_mph                                                66.48
dtype: object

In [10]:
print(tr_bag.take(1))

({'_id': ObjectId('682134542721cb0278424a06'), 'timestamp': '2025-05-11T19:14:03.000', 'street': 'CIP N Hempstead Tpk - LIE', 'speed_mph': 24.23},)


In [11]:
col_df.min().compute()

_id                             6821332d2721cb02783aa28f
collision_id                                     4063247
timestamp                            2020-01-01T00:07:00
lat                                                  0.0
lon                                            -74.25496
borough                                            BRONX
vehicle_types    ["''lime mope", None, None, None, None]
dtype: object

In [13]:
col_df.max().compute()

_id                    68213f95ba47b72ddd0c1cc3
collision_id                            4811371
timestamp                   2025-05-06T23:57:00
lat                                   40.912884
lon                                         0.0
borough                           STATEN ISLAND
vehicle_types    [None, None, None, None, None]
dtype: object

In [7]:
col_df.head()

Unnamed: 0,_id,collision_id,timestamp,lat,lon,borough,injured,killed,vehicle_types
0,68226c41361b9d779d8a98db,4691709,2024-01-01T01:00:00,40.81108,-73.9273,,1,0,"['Station Wagon/Sport Utility Vehicle', None, ..."
1,68226c5b361b9d779d8a98e6,4691881,2024-01-01T10:00:00,40.730442,-73.91367,,1,0,"['Station Wagon/Sport Utility Vehicle', 'Sedan..."
2,68226c5b361b9d779d8a98e8,4691988,2024-01-01T10:05:00,40.66684,-73.78941,,5,0,"['Van', 'Sedan', None, None, None]"
3,68226c5b361b9d779d8a98ea,4692305,2024-01-01T10:09:00,40.704594,-73.90826,QUEENS,2,0,"['Sedan', 'Station Wagon/Sport Utility Vehicle..."
4,68226c5b361b9d779d8a98ec,4691840,2024-01-01T10:15:00,40.679283,-73.83263,QUEENS,1,0,"['Sedan', 'Sedan', None, None, None]"


### Pandas part

In [3]:
import pandas as pd

In [41]:
tr_df = pd.read_json("../../../dummy_data/traffic_data.jsonl", lines=True)
tr_df.head()

Unnamed: 0,id,speed,travel_time,status,data_as_of,link_id,link_points,encoded_poly_line,encoded_poly_line_lvls,owner,transcom_id,borough,link_name
0,159,44.11,125,0,2025-04-28T21:29:03.000,4616252,"40.8563506,-73.87233 40.85219,-73.871371 40.85...",ewjxF`e{aM~X_EfLs@pRFbE^fUlClPlC`TdE`Gb@\\|HMt...,BBBBBBBBBBBBB,NYC_DOT_LIC,4616252,Bronx,BRP N WATSON AVENUE - FORDHAM ROAD
1,3,0.0,0,-101,2025-04-28T21:29:03.000,4616324,"40.76375,-73.999191 40.763521,-73.99935 40.762...",mtxwF\\|}sbMl@^~GpK\\|LrIbLlH??lK~G\\|FtD`C~@}...,BBBBBBBBBBBBBBB,NYC_DOT_LIC,4616324,Manhattan,12th ave @ 45th - 11 ave ganservoort st
2,450,0.0,0,-101,2025-04-28T21:29:03.000,4616346,"40.8500304,-73.944831 40.8492,-73.945241 40.84...",uoixFdjibMdDpAhDDnWk@rDJvD^hG\\|AzEpB~BzAbEfD\...,BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB...,NYC_DOT_LIC,4616346,Manhattan,Westside Hwy S GWB - 57th St
3,2,0.0,0,-101,2025-04-28T21:29:03.000,4616325,"40.73933,-74.01004 40.73895,-74.01012 40.7376,...",y{swFvavbMjANlGSvQn@fa@fBhQdA,BBBBBB,NYC_DOT_LIC,4616325,Manhattan,11th ave s ganservoort - west st @ spring st
4,433,0.0,0,-101,2025-04-28T21:29:03.000,4616215,"40.52561,-74.23039 40.5258705,-74.22618 40.526...",adjvF\\|badMs@iYaBsPcEB_\\|@vDyLWeHg@mUeF}L}EuIaF,BBBBBBBBBB,NYC_DOT_LIC,4616215,Staten Island,WSE N TYRELLAN AVENUE - BLOOMINGDALE ROAD


In [42]:
tr_df

Unnamed: 0,id,speed,travel_time,status,data_as_of,link_id,link_points,encoded_poly_line,encoded_poly_line_lvls,owner,transcom_id,borough,link_name
0,159,44.11,125,0,2025-04-28T21:29:03.000,4616252,"40.8563506,-73.87233 40.85219,-73.871371 40.85...",ewjxF`e{aM~X_EfLs@pRFbE^fUlClPlC`TdE`Gb@\\|HMt...,BBBBBBBBBBBBB,NYC_DOT_LIC,4616252,Bronx,BRP N WATSON AVENUE - FORDHAM ROAD
1,3,0.00,0,-101,2025-04-28T21:29:03.000,4616324,"40.76375,-73.999191 40.763521,-73.99935 40.762...",mtxwF\\|}sbMl@^~GpK\\|LrIbLlH??lK~G\\|FtD`C~@}...,BBBBBBBBBBBBBBB,NYC_DOT_LIC,4616324,Manhattan,12th ave @ 45th - 11 ave ganservoort st
2,450,0.00,0,-101,2025-04-28T21:29:03.000,4616346,"40.8500304,-73.944831 40.8492,-73.945241 40.84...",uoixFdjibMdDpAhDDnWk@rDJvD^hG\\|AzEpB~BzAbEfD\...,BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB...,NYC_DOT_LIC,4616346,Manhattan,Westside Hwy S GWB - 57th St
3,2,0.00,0,-101,2025-04-28T21:29:03.000,4616325,"40.73933,-74.01004 40.73895,-74.01012 40.7376,...",y{swFvavbMjANlGSvQn@fa@fBhQdA,BBBBBB,NYC_DOT_LIC,4616325,Manhattan,11th ave s ganservoort - west st @ spring st
4,433,0.00,0,-101,2025-04-28T21:29:03.000,4616215,"40.52561,-74.23039 40.5258705,-74.22618 40.526...",adjvF\\|badMs@iYaBsPcEB_\\|@vDyLWeHg@mUeF}L}EuIaF,BBBBBBBBBB,NYC_DOT_LIC,4616215,Staten Island,WSE N TYRELLAN AVENUE - BLOOMINGDALE ROAD
...,...,...,...,...,...,...,...,...,...,...,...,...,...
5995,213,9.94,228,0,2025-04-28T17:23:02.000,4456450,"40.80069,-73.92878 40.8013005,-73.930181 40.80...",i{_xFzefbMyBvGUlACt@Rj@d@f@z@@`@W\g@bA_DTk@b@i...,BBBBBBBBBBBBBBBBBBBBBBB,MTA Bridges & Tunnels,4456450,Manhattan,FDR N - TBB E 116TH STREET - MANHATTAN TRUSS
5996,141,43.49,160,0,2025-04-28T17:23:02.000,4456478,"40.772251,-73.919891 40.77391,-73.9222 40.7747...",qizwFhndbMkIlMeD`DyIbGyJ`HsOnK{OzKcBf@mBPoCKkB...,BBBBBBBBBBBBBBBBBBBBBBBBB,MTA Bridges & Tunnels,4456478,Queens,BE S TBB EXIT RAMP - QUEENS ANCHORAGE
5997,140,38.52,72,0,2025-04-28T17:23:02.000,4456479,"40.79789,-73.91988 40.79771,-73.92004 40.79758...",yi_xFfndbMb@^Xb@ThAEbB_@nByAbEm@fAkAbDiAlDo@nB...,BBBBBBBBBBBBBBB,MTA Bridges & Tunnels,4456479,Queens,BE S TBB EXIT RAMP - MANHATTAN LIFT SPAN
5998,202,46.60,47,0,2025-04-28T17:23:02.000,4456483,"40.789536,-73.78631 40.7894,-73.78765 40.78897...",qu}wFlkjaMXjGtAzJ@nB_@tC]~@s@lAuAlAuCbB??mAh@m...,BBBBBBBBBBBBBBBB,NYC_DOT_LIC,4456483,Queens,CIP N ramp to TNB - TNB Queens Anchorage


In [5]:
col_df = pd.read_json("../../../dummy_data/collision_data.jsonl", lines=True)
col_df.tail()

Unnamed: 0,crash_date,crash_time,on_street_name,off_street_name,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,...,latitude,longitude,location,contributing_factor_vehicle_3,vehicle_type_code_3,cross_street_name,contributing_factor_vehicle_4,vehicle_type_code_4,contributing_factor_vehicle_5,vehicle_type_code_5
5995,2021-04-24T00:00:00.000,2025-05-11 14:40:00,MADISON AVENUE,EAST 83 STREET,0,0,0,0,0,0,...,40.778904,-73.96024,"{'latitude': '40.778904', 'longitude': '-73.96...",,,,,,,
5996,2021-04-23T00:00:00.000,2025-05-11 10:19:00,SHEFFIELD AVENUE,BLAKE AVENUE,0,0,0,0,0,0,...,,,,,,,,,,
5997,2021-04-24T00:00:00.000,2025-05-11 23:10:00,SPENCER AVENUE,218 STREET,4,0,0,0,0,0,...,40.729355,-73.747665,"{'latitude': '40.729355', 'longitude': '-73.74...",Unspecified,Sedan,,,,,
5998,2021-04-24T00:00:00.000,2025-05-11 01:27:00,CROSS BRONX EXPY,,0,0,0,0,0,0,...,,,,Unspecified,Sedan,,,,,
5999,2021-04-24T00:00:00.000,2025-05-11 03:18:00,ASHFORD STREET,ARLINGTON AVENUE,0,0,0,0,0,0,...,40.68054,-73.88674,"{'latitude': '40.68054', 'longitude': '-73.886...",,,,,,,


## Data Preprocessing

### Collision dataframe

In [7]:
col_df.head()

Unnamed: 0,_id,collision_id,timestamp,lat,lon,borough,injured,killed,vehicle_types
0,68226c41361b9d779d8a98db,4691709,2024-01-01T01:00:00,40.81108,-73.9273,,1,0,"['Station Wagon/Sport Utility Vehicle', None, ..."
1,68226c5b361b9d779d8a98e6,4691881,2024-01-01T10:00:00,40.730442,-73.91367,,1,0,"['Station Wagon/Sport Utility Vehicle', 'Sedan..."
2,68226c5b361b9d779d8a98e8,4691988,2024-01-01T10:05:00,40.66684,-73.78941,,5,0,"['Van', 'Sedan', None, None, None]"
3,68226c5b361b9d779d8a98ea,4692305,2024-01-01T10:09:00,40.704594,-73.90826,QUEENS,2,0,"['Sedan', 'Station Wagon/Sport Utility Vehicle..."
4,68226c5b361b9d779d8a98ec,4691840,2024-01-01T10:15:00,40.679283,-73.83263,QUEENS,1,0,"['Sedan', 'Sedan', None, None, None]"


In [8]:
col_df = col_df[col_df['timestamp'].str[:4] == '2025']

In [9]:
col_df['crash_score'] = (col_df['injured'].astype(int) + 3*(col_df['killed'].astype(int)))
col_df['crash_score']

Dask Series Structure:
npartitions=23
    int64
      ...
    ...  
      ...
      ...
Dask Name: getitem, 13 expressions
Expr=(Assign(frame=Filter(frame=FromGraph(5717809), predicate=FunctionMap(frame=FromGraph(5717809)['timestamp'], accessor='str', attr='__getitem__', args=(slice(None, 4, None),), kwargs={}) == 2025)))['crash_score']

In [10]:
col_columns = ['timestamp','lat','lon','crash_score']

In [11]:
col_df = col_df[col_columns]

In [12]:
col_df = col_df.dropna().reset_index(drop=True)

In [13]:
col_df = col_df.map_partitions(
    lambda df: df.assign(timestamp=df['timestamp'].apply(lambda x: x.split(":")[0] + ":00:00")),
    meta=col_df
)

In [14]:
col_df

Unnamed: 0_level_0,timestamp,lat,lon,crash_score
npartitions=23,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,string,float64,float64,int64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [15]:
col_df = col_df.assign(
    timestamp = dd.to_datetime(col_df["timestamp"], errors="coerce")   # if you haven’t yet
)

In [16]:
col_df

Unnamed: 0_level_0,timestamp,lat,lon,crash_score
npartitions=23,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,datetime64[ns],float64,float64,int64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [17]:
col_df = col_df.persist()

In [18]:
# col_df.to_csv("./local_files/collision.csv", index = False)

### Traffic dataframe

In [19]:
tr_df.head()

Unnamed: 0,_id,timestamp,street,coordinates,speed_mph
0,68226c3f361b9d779d8a98da,2024-01-01T00:03:03.000,WSE S BLLOMINGDALE ROAD - TYRELLAN AVENUE,"40.52581,-74.23039 40.52593,-74.228371 40.5261...",0.0
1,68226c5b361b9d779d8a98dc,2024-01-01T00:03:03.000,WSE N VICTORY BLVD - SOUTH AVENUE,"40.6020904,-74.1877 40.600331,-74.18943 40.597...",64.62
2,68226c5b361b9d779d8a98dd,2024-01-01T00:03:03.000,WSE N ARDEN AVENUE - VICTORY BLVD,"40.5902,-74.19332 40.57748,-74.19046 40.57623,...",62.75
3,68226c5b361b9d779d8a98de,2024-01-01T00:03:03.000,WSE N BLOOMUINGDALE ROAD - ARDEN AVENUE,"40.56042,-74.199391 40.55924,-74.20076 40.5585...",62.75
4,68226c5b361b9d779d8a98df,2024-01-01T00:03:03.000,WSE N TYRELLAN AVENUE - BLOOMINGDALE ROAD,"40.52561,-74.23039 40.5258705,-74.22618 40.526...",0.0


In [20]:
tr_df

Unnamed: 0_level_0,_id,timestamp,street,coordinates,speed_mph
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,string,string,string,string,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [21]:
tr_df = tr_df[tr_df['timestamp'].str[:4] == '2025']

In [22]:
tr_df = tr_df.map_partitions(
    lambda df: df.assign(timestamp=df['timestamp'].apply(lambda x: x.split(":")[0] + ":00:00")),
    meta=tr_df
)

In [23]:
# tr_df = tr_df.assign(
#     timestamp = dd.to_datetime(tr_df["timestamp"], errors="coerce")   # if you haven’t yet
# )

In [24]:
tr_df = tr_df.persist()

In [25]:
tr_df.loc[0,'coordinates']

Dask Series Structure:
npartitions=3310
    string
       ...
     ...  
       ...
       ...
Dask Name: try_loc, 2 expressions
Expr=LocUnknown(frame=FromGraph(1a93548), iindexer=slice(0, 0, None), cindexer='coordinates')

In [26]:
def compute_centroid(link_points_str):
    try:
        points = [tuple(map(float, pair.split(',')))
                  for pair in link_points_str.strip().split()]
        if not points:
            return None, None
        lats, lons = zip(*points)
        return sum(lats) / len(lats), sum(lons) / len(lons)
    except Exception:
        return None, None

# wrap it so we return a named Series
def centroid_series(s):
    lat, lon = compute_centroid(s)
    return pd.Series({"c_lat": lat, "c_long": lon})

In [27]:
# Change for Dask

tr_df[["c_lat", "c_long"]] = tr_df["coordinates"].apply(
    centroid_series,
    meta={"c_lat": "f8", "c_long": "f8"}
)

In [28]:
tr_df

Unnamed: 0_level_0,_id,timestamp,street,coordinates,speed_mph,c_lat,c_long
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,string,string,string,string,float64,float64,float64
,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [29]:
tr_cols = ['timestamp','street','speed_mph','c_lat','c_long']

In [30]:
tr_df = tr_df[tr_cols]

In [31]:
tr_df

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,string,string,float64,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [32]:
tr_df = tr_df.dropna()
tr_df

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,string,string,float64,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [33]:
tr_df = tr_df.persist()

2025-05-12 21:39:23,396 - distributed.worker - ERROR - Compute Failed
Key:       ('getitem-fused-dropna-9edc90a0c10136aa45c954ddbfbecce7', 995)
State:     executing
Function:  execute_task
args:      ((<function Fused._execute_task at 0x13d8005e0>, {'getitem-fused-dropna-9edc90a0c10136aa45c954ddbfbecce7': ('dropna-019bfb88fe41b3757b94538738914395', 995), ('dropna-019bfb88fe41b3757b94538738914395', 995): (<function apply at 0x106215700>, <methodcaller: dropna>, [('getitem-a7b3ac0c6babf67c5e30ef729654d48c', 995)], {'subset': None}), ('getitem-a7b3ac0c6babf67c5e30ef729654d48c', 995): (<built-in function getitem>, ('assign-407565b7035aa249ba1b54af9751bf2d', 995), ['timestamp', 'street', 'speed_mph', 'c_lat', 'c_long']), ('assign-407565b7035aa249ba1b54af9751bf2d', 995): (<function assign at 0x13d7d84c0>, ('getitem-d18a172fc264dabe67b303d4f184de16', 995), 'c_lat', ('getitem-e34f35ad2e9c8757016bb4c4c37f1890', 995), 'c_long', ('getitem-dad594a220585f147cb0bacb9d4a2f46', 995)), ('getitem-e34f35

In [55]:
# tr_df.to_csv("./local_files/traffic.csv", index = False)

2025-05-12 21:35:46,977 - distributed.worker - ERROR - Compute Failed
Key:       ('getitem-fused-dropna-122bcd0de3ad76b7e9d2e76b88360674', 310)
State:     executing
Function:  execute_task
args:      ((<function Fused._execute_task at 0x126ad8040>, {'getitem-fused-dropna-122bcd0de3ad76b7e9d2e76b88360674': ('dropna-2f76d7a7cfd5a04dc70cdf91ca33aee3', 310), ('dropna-2f76d7a7cfd5a04dc70cdf91ca33aee3', 310): (<function apply at 0x1032c5700>, <methodcaller: dropna>, [('getitem-7051085895216949ba5952785ec89575', 310)], {'subset': None}), ('getitem-7051085895216949ba5952785ec89575', 310): (<built-in function getitem>, ('assign-9aa5c010ac19aa872f9d84ad69a3d92f', 310), ['timestamp', 'street', 'speed_mph', 'c_lat', 'c_long']), ('assign-9aa5c010ac19aa872f9d84ad69a3d92f', 310): (<function assign at 0x125708ee0>, ('getitem-87b80841e15f6f43df6c54740761459f', 310), 'c_lat', ('getitem-42765c960ae7b5d5684876986d82017c', 310), 'c_long', ('getitem-11039a055e3118d09660e32b1c8b9a92', 310)), ('getitem-87b808

#### vehicle count

In [32]:
tr_df["timestamp"] = dd.to_datetime(tr_df["timestamp"], errors="coerce")

In [33]:
tr_df

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,datetime64[ns],string,float64,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [34]:
veh_per_tick = (
    tr_df
      .groupby(["street", "timestamp"])
      .size()                        # Series with name “size”
      .to_frame("veh_count")        # DataFrame with column “veh_count”
      .reset_index()                # back out street_name, ts as columns
)

In [35]:
veh_per_tick

Unnamed: 0_level_0,street,timestamp,veh_count
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,string,datetime64[ns],int64
,...,...,...


In [36]:
veh_per_tick = veh_per_tick.set_index("street", shuffle="tasks")

In [37]:
veh_per_tick

Unnamed: 0_level_0,timestamp,veh_count
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,datetime64[ns],int64
,...,...


In [38]:
def add_prev_timestamp(df):
    df = df.sort_values("timestamp")
    df["prev_ts"] = df["timestamp"].shift()            # previous reading
    return df

veh_sorted = veh_per_tick.map_partitions(
    add_prev_timestamp,
    meta={
        "timestamp":        "datetime64[ns]",
        "veh_count": "int64",
        "prev_ts":   "datetime64[ns]"
    }
)

In [39]:
veh_sorted["delta_min"] = (
    (veh_sorted["timestamp"] - veh_sorted["prev_ts"]).dt.total_seconds()/60
)

In [40]:
veh_sorted["veh_per_min"] = (
    veh_sorted["veh_count"] / veh_sorted["delta_min"]
)

In [207]:
veh_sorted = veh_sorted.persist()

In [208]:
veh_sorted

Unnamed: 0_level_0,timestamp,veh_count,prev_ts,delta_min,veh_per_min
npartitions=41,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
11th ave n ganservoort - 12th ave @ 40th st,datetime64[ns],int64,datetime64[ns],float64,float64
11th ave s ganservoort - west st @ spring st,...,...,...,...,...
...,...,...,...,...,...
West St S Spring St - BBT Manhattan Portal outbound,...,...,...,...,...
Whitestone Expwy S Exit 14 (Linden Pl) - VWE S MP8.65 (Exit 13 Northern Blvd),...,...,...,...,...


In [209]:
# original raw data
raw = tr_df                  # already converted data_as_of → ts in previous step
# raw = raw.assign(
#     ts = dd.to_datetime(raw["timestamp"], errors="coerce")   # if you haven’t yet
# )

# rate table from previous answer
if "street" not in veh_sorted.columns:
    veh_rate = veh_sorted.reset_index()
veh_rate = veh_rate[["street", "timestamp", "veh_per_min"]]   # keep only what you need
# 'street' is the index after our earlier set_index; turn it into a column:

In [210]:
veh_rate

Unnamed: 0_level_0,street,timestamp,veh_per_min
npartitions=41,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,string,datetime64[ns],float64
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [41]:
# # Put the same index on both; this lets Dask do a hash‑based join partition‑wise
# raw      = raw.set_index("street", shuffle="tasks")
# veh_rate = veh_rate.set_index("street", shuffle="tasks")

In [211]:
veh_rate

Unnamed: 0_level_0,street,timestamp,veh_per_min
npartitions=41,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,string,datetime64[ns],float64
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [212]:
raw

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long
npartitions=410,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,datetime64[ns],string,float64,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [213]:
# now merge on both keys:
merged = dd.merge(
    raw,
    veh_rate,
    on=["street", "timestamp"],
    how="left"
)

In [214]:
merged = merged.persist()

In [215]:
merged.head()

Unnamed: 0,timestamp,street,speed_mph,c_lat,c_long,veh_per_min
0,2024-01-01 03:00:00,CIP N ramp to TNB - TNB Queens Anchorage,0.0,40.790168,-68.65306,0.2
1,2024-01-01 03:00:00,CIP N ramp to TNB - TNB Queens Anchorage,0.0,40.790168,-68.65306,0.2
2,2024-01-01 03:00:00,CIP N ramp to TNB - TNB Queens Anchorage,0.0,40.790168,-68.65306,0.2
3,2024-01-01 03:00:00,CIP N ramp to TNB - TNB Queens Anchorage,0.0,40.790168,-68.65306,0.2
4,2024-01-01 03:00:00,CIP N ramp to TNB - TNB Queens Anchorage,0.0,40.790168,-68.65306,0.2


In [216]:
merged = merged[
  (merged["veh_per_min"] > 0) &
  ~merged["veh_per_min"].isin([float('inf'), float('-inf')])
].dropna(subset=["veh_per_min"])

### Joining

In [42]:
import numpy as np
import pandas as pd
from scipy.spatial import cKDTree

In [43]:
tr_df = tr_df.persist()

In [44]:
f_col_df = col_df[col_df['timestamp'].dt.date == pd.Timestamp('2025-01-01').date()]
f_col_df

Unnamed: 0_level_0,timestamp,lat,lon,crash_score
npartitions=23,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,datetime64[ns],float64,float64,int64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [45]:
f_col_df.shape[0].compute()

183

In [62]:
tr_df.head()

Unnamed: 0,timestamp,street,speed_mph,c_lat,c_long
0,2024-01-01,WSE S BLLOMINGDALE ROAD - TYRELLAN AVENUE,0.0,40.533127,-74.225573
1,2024-01-01,WSE N VICTORY BLVD - SOUTH AVENUE,64.62,40.595544,-74.191317
2,2024-01-01,WSE N ARDEN AVENUE - VICTORY BLVD,62.75,40.573416,-74.191708
4,2024-01-01,WSE N TYRELLAN AVENUE - BLOOMINGDALE ROAD,0.0,40.536165,-74.224091
5,2024-01-01,GOW N 92ND STREET - 7TH AVENUE,52.19,40.625985,-74.01929


In [63]:
tr_df

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,datetime64[ns],string,float64,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [46]:
f_tr_df = tr_df[tr_df['timestamp'].dt.date == pd.Timestamp('2025-01-01').date()]
f_tr_df

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,datetime64[ns],string,float64,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [60]:
f_tr_df.shape[0].compute()

25344

In [61]:
f_tr_df.head()



Unnamed: 0,timestamp,street,speed_mph,c_lat,c_long


In [48]:
# 1a. Materialize only the columns we need
# col_pdf = col_df.compute()
f_col_pdf = f_col_df.compute()

# 1b. For each timestamp, build a KDTree on (lat, lon) and store scores
trees = {}
for ts, grp in f_col_pdf.groupby("timestamp"):
    coords = grp[["lat", "lon"]].to_numpy()
    scores = grp["crash_score"].to_numpy()
    # cKDTree for fast nearest-neighbor queries
    trees[ts] = {
        "tree":  cKDTree(coords),
        "scores": scores
    }


In [54]:
def attach_crash_score(df_part: pd.DataFrame) -> pd.DataFrame:
    # start with a default 0 column
    df_part["crash_score"] = 0.0

    for idx, row in df_part.iterrows():
        ts = row["timestamp"]
        if ts in trees:
            tree = trees[ts]["tree"]
            scores = trees[ts]["scores"]
            # query the nearest collision to (c_lat, c_long)
            dist, loc = tree.query([row["c_lat"], row["c_long"]])
            df_part.loc[idx, "crash_score"] = scores[loc]
        # else leave crash_score = 0.0
    return df_part

In [55]:
meta = dict(tr_df._meta.dtypes.to_dict())
meta["crash_score"] = "float64"

# Apply the mapper
tr_with_scores = f_tr_df.map_partitions(
    attach_crash_score,
    meta=meta
)

In [56]:
tr_with_scores = tr_with_scores.persist()

In [57]:
tr_with_scores

Unnamed: 0_level_0,timestamp,street,speed_mph,c_lat,c_long,crash_score
npartitions=3310,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,datetime64[ns],string,float64,float64,float64,float64
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [58]:
tr_with_scores.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


Unnamed: 0,timestamp,street,speed_mph,c_lat,c_long,crash_score


## Training

In [None]:
import dask.dataframe as dd
import dask
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDRegressor
from dask_ml.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import pickle
from pymongo import MongoClient

In [None]:
# ── 1. Balance your tr_with_scores (assumes it already exists) ────────────────
zero_df    = tr_with_scores[tr_with_scores.crash_score == 0]
nonzero_df = tr_with_scores[tr_with_scores.crash_score != 0]

n_nonzero = nonzero_df.crash_score.count().compute()
n_zero    = zero_df.crash_score.count().compute()
frac      = min(1, n_nonzero / n_zero)

balanced = dd.concat([
    zero_df.sample(frac=frac, random_state=42),
    nonzero_df
]).sample(frac=1, random_state=42).persist()

In [None]:
# ── 2. Split features / label ──────────────────────────────────────────────────
features = ["c_lat", "c_long", "veh_per_sec", "speed_mph"]
X        = balanced[features]
y        = balanced["crash_score"]

from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

In [None]:
# ── 3. Wrap an SGDRegressor for incremental fitting ───────────────────────────
sgd = SGDRegressor(
    penalty="l2",
    max_iter=1,         # one pass per call
    tol=None,           # disable internal stopping
    learning_rate="constant",
    eta0=0.01,
    random_state=42
)
inc = Incremental(sgd)

In [None]:
# ── 4. Train for multiple epochs, recording metrics ───────────────────────────
n_epochs = 10
train_mse, test_mse = [], []
train_r2,  test_r2  = [], []

for epoch in range(1, n_epochs + 1):
    # one pass over the training set
    inc.fit(X_train, y_train)

    # predictions (lazy)
    y_train_pred = inc.predict(X_train)
    y_test_pred  = inc.predict(X_test)

    # compute metrics
    tmse = mean_squared_error(y_train, y_train_pred).compute()
    t2   = r2_score(y_train, y_train_pred).compute()
    vmse = mean_squared_error(y_test,  y_test_pred).compute()
    v2   = r2_score(y_test,  y_test_pred).compute()

    train_mse.append(tmse)
    train_r2.append(t2)
    test_mse.append(vmse)
    test_r2.append(v2)

    print(f"Epoch {epoch:2d} — train MSE: {tmse:.3f}, R²: {t2:.3f} | "
          f"val MSE: {vmse:.3f}, R²: {v2:.3f}")

In [None]:
# ── 5. Plot loss (MSE) ────────────────────────────────────────────────────────
plt.figure()
plt.plot(range(1, n_epochs+1), train_mse, label="Train MSE")
plt.plot(range(1, n_epochs+1), test_mse,  label="Test MSE")
plt.xlabel("Epoch")
plt.ylabel("Mean Squared Error")
plt.title("Training vs. Test Loss")
plt.legend()
plt.show()

In [None]:
# ── 6. Plot accuracy (R²) ─────────────────────────────────────────────────────
plt.figure()
plt.plot(range(1, n_epochs+1), train_r2, label="Train R²")
plt.plot(range(1, n_epochs+1), test_r2,  label="Test R²")
plt.xlabel("Epoch")
plt.ylabel("R² Score")
plt.title("Training vs. Test R²")
plt.legend()
plt.show()

In [None]:
# ── 7. Save the trained model into MongoDB ────────────────────────────────────
client = MongoClient("mongodb://localhost:27017/")
db     = client["mongo_db"]
models = db["models"]

# pickle the Incremental wrapper (it contains the fitted SGDRegressor)
model_bin = pickle.dumps(inc)
models.insert_one({
    "name": "crash_score_model",
    "epoch": n_epochs,
    "model": model_bin
})

print("Model saved to MongoDB collection 'models'.")