In [2]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import dask.dataframe as dd
from shapely import wkt
import warnings
warnings.filterwarnings("ignore")

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [2]:
trafficData = dd.read_csv("data/DOT_Traffic_Speeds_NBE_20240430.csv", on_bad_lines = 'skip', blocksize = '16MB') # Data with ~12 million rows
zipcodeData = pd.read_csv('data/Modified_Zip_Code_Tabulation_Areas__MODZCTA_.csv')

In [3]:
import dask.config
dask.config.config['dataframe']['convert-string'] = False

def getCleanData(trafficDf):
    columns = [
        "ID",
        "LINK_ID",
        "ENCODED_POLY_LINE",
        "ENCODED_POLY_LINE_LVLS",
        "OWNER",
        "TRANSCOM_ID",
        "LINK_NAME"
    ]

    trafficDf = trafficDf.drop(columns = columns)
    return trafficDf

def getExplodedDataLink(trafficDf):
    '''
        Link points column in orignal data is a Space separated string of latitude,longitudes
        This functions splits them into individual lat,long pairs and explodes it to later map each pair to a zipcode provided in the zipcode column.
    '''
    trafficDf["LINK_POINTS"] = trafficDf['LINK_POINTS'].apply(lambda x : x.split(' ')[:-1],meta = list)
    trafficDf = trafficDf.explode('LINK_POINTS')
    trafficDf['pointWKT'] = trafficDf['LINK_POINTS'].apply(lambda x: f'Point({x.split(",")[1]} {x.split(",")[0]})' if len(x.split(",")) == 2 else np.nan)
    trafficDf = trafficDf.dropna(subset=['pointWKT'])
    return trafficDf

def getExplodedDataTraffic(trafficDf):
    '''
        Same utility as above
    '''
    trafficDf["LINK_POINTS"] = trafficDf['LINK_POINTS'].apply(lambda x : x.split(' ')[:-1],meta = list)
    trafficDf = trafficDf.explode('LINK_POINTS')
    return trafficDf

def getZipcodeCol(trafficDf):
    '''
        After exploding the day scales up a lotttt
        Here link points are extracted into a seperated Dataframe and mapped to a zipcode
        Then this DF is joined with the traffic Data.
        This will save a lot of time as it works on unique lat long pairs only and doesnt have to go through all the rows of traffic data
    '''
    linkPointsCol = trafficDf[['LINK_POINTS']]
    linkPointsCol = linkPointsCol.drop_duplicates()
    linkPointsCol = getExplodedDataLink(linkPointsCol)
    linkPointsCol = linkPointsCol.drop_duplicates()
    def getZipcode(pointWKT):
        point = wkt.loads(pointWKT)
        for i in range(len(zipcodeData.index)):
            r = zipcodeData.iloc[i]
            polygon = wkt.loads(r.the_geom)
            if polygon.contains(point):
                return r.MODZCTA
        return np.NAN
    
    linkPointsCol['Zipcode'] = linkPointsCol['pointWKT'].apply(lambda x : getZipcode(x), meta = int)
    linkPointsCol = linkPointsCol.drop(columns = ['pointWKT'])
    linkPointsCol = linkPointsCol.dropna(subset = ['Zipcode'])
    return linkPointsCol
    
def mapTrafficZipcode(trafficDf, linkPointsCol):
    trafficDf = getExplodedDataTraffic(trafficDf)
    trafficDf = trafficDf.merge(linkPointsCol, on = 'LINK_POINTS')
    return trafficDf

def getSplitDate(trafficDf):
    trafficDf['dateTime'] = trafficDf['DATA_AS_OF'].apply(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M:%S %p'), meta = datetime)
    trafficDf['year'] = trafficDf['dateTime'].apply(lambda x: x.year, meta = int)
    trafficDf['month'] = trafficDf['dateTime'].apply(lambda x: x.month, meta = int)
    trafficDf['date'] = trafficDf['dateTime'].apply(lambda x: x.day, meta = int)
    trafficDf['hour'] = trafficDf['dateTime'].apply(lambda x: x.hour, meta = int)
    trafficDf = trafficDf.drop(columns = ['dateTime'])
    return trafficDf

def groupbyAvgHour(trafficDf):
    '''
        Grouping by to get average traffic speed of any given hour or any given lat long pair.
        This is where the problem is. 
        This code keeps on running for days
    '''
    trafficDf = trafficDf.groupby(['year', 'month', 'date', 'hour','LINK_POINTS','Zipcode','BOROUGH']).agg({'SPEED':'mean'})
    return trafficDf

In [4]:
cleanData = getCleanData(trafficData.copy())
PointZipcode = getZipcodeCol(cleanData)
finalTrafficDf = mapTrafficZipcode(cleanData, PointZipcode)
finalTrafficDf = getSplitDate(finalTrafficDf)

In [5]:
finalTrafficDf.to_csv("data/output/History-*.csv", index = False)

['/Users/neelgandhi/Big Dat/Final Project/output/History-000.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-001.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-002.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-003.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-004.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-005.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-006.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-007.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-008.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-009.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-010.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-011.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-012.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-013.csv',
 '/Users/neelgandhi/Big Dat/Final Project/output/History-014.c

In [49]:
finalTrafficDf = groupbyAvgHour(finalTrafficDf)

In [6]:
savedData = dd.read_csv("data/output/History-*.csv")
savedData = groupbyAvgHour(savedData)
savedData = savedData.reset_index()
savedData.head(20)

Unnamed: 0,year,month,date,hour,LINK_POINTS,Zipcode,BOROUGH,SPEED
0,2023,12,31,11,"40.80151,-73.93066",10035.0,Manhattan,26.417619
1,2023,12,31,11,"40.8014,-73.93111",10035.0,Manhattan,26.417619
2,2023,12,31,11,"40.8012304,-73.93129",10035.0,Manhattan,26.417619
3,2023,12,31,11,"40.80096,-73.93141",10035.0,Manhattan,26.417619
4,2023,12,31,11,"40.8007405,-73.93133",10035.0,Manhattan,26.417619
5,2023,12,31,11,"40.800501,-73.93106",10035.0,Manhattan,26.417619
6,2023,12,31,11,"40.80026,-73.93057",10035.0,Manhattan,26.417619
7,2023,12,31,11,"40.81,-73.93002",10451.0,Manhattan,26.417619
8,2023,12,31,11,"40.7997606,-73.92972",10035.0,Manhattan,26.417619
9,2023,12,31,11,"40.7994205,-73.929511",10035.0,Manhattan,26.417619


In [7]:
savedData.to_csv("data/groupedOutput/*.csv", index = False)

['/Users/neelgandhi/Big Dat/Final Project/groupedOutput/0.csv']