# Benchmarking Elasticsearch: NYC Congestion Pricing Traffic Analysis
### By Group 3: Joshua Pasaye, Kelsey Kwon, and Anna Prunty-Burkart

In [1]:
#!pip install elasticsearch==8.6.0

In [2]:
# launch ElasticSearch in Docker container
!docker compose -f docker-compose-elasticsearch.yml up -d

[1A[1B[0G[?25l[+] Running 1/1
 [32m✔[0m Container finalproject-elasticsearch-1  [32mRunning[0m                         [34m0.0s [0m
[?25h

In [3]:
import time, json
import pandas as pd
import numpy as np
from elasticsearch import Elasticsearch, helpers
from elasticsearch.helpers import parallel_bulk, BulkIndexError, scan
from datetime import datetime

In [4]:
# Create Elasticsearch client instance
es = Elasticsearch("http://localhost:9200")

# Connection testing
print(es.ping())   # should return True if connected

True


In [5]:
# For daily_ridership df: Calculate average doc size, number of rows, estimated index size 
# Load a small sample (10k rows)
df_sample = pd.read_csv("UPDATED MTA_Daily_Ridership_and_Traffic__Beginning_2020_20250823_final.csv", nrows=10000)

# Convert sample to JSON bytes
docs = df_sample.to_dict(orient="records")
json_bytes = sum(len(json.dumps(doc)) for doc in docs)

avg_doc_size = json_bytes / len(docs)
print(f"Average doc size ≈ {avg_doc_size:.1f} bytes")

row_count = sum(1 for _ in open("UPDATED MTA_Daily_Ridership_and_Traffic__Beginning_2020_20250823_final.csv")) - 1
estimated_index_size_bytes = avg_doc_size * row_count

print(f"Rows: {row_count:,}")
print(f"Estimated index size: {estimated_index_size_bytes / (1024**3):.2f} GB")

Average doc size ≈ 56.4 bytes
Rows: 14,119
Estimated index size: 0.00 GB


In [6]:
# For crz df: Calculate average doc size, number of rows, estimated index size 
# Load a small sample (10k rows)
df_sample = pd.read_csv("UPDATED MTA_Congestion_Relief_Zone_Vehicle_Entries__Beginning_2025_20250823.csv", nrows=10000)

# Convert sample to JSON bytes
docs = df_sample.to_dict(orient="records")
json_bytes = sum(len(json.dumps(doc)) for doc in docs)

avg_doc_size = json_bytes / len(docs)
print(f"Average doc size ≈ {avg_doc_size:.1f} bytes")

row_count = sum(1 for _ in open("UPDATED MTA_Congestion_Relief_Zone_Vehicle_Entries__Beginning_2025_20250823.csv")) - 1
estimated_index_size_bytes = avg_doc_size * row_count

print(f"Rows: {row_count:,}")
print(f"Estimated index size: {estimated_index_size_bytes / (1024**3):.2f} GB")

Average doc size ≈ 419.9 bytes
Rows: 2,322,432
Estimated index size: 0.91 GB


In [7]:
# For tunnel_bridge_crossing df: Calculate average doc size, number of rows, estimated index size 
# Load a small sample (10k rows)
df_sample = pd.read_csv("UPDATED MTA_Bridges_and_Tunnels_Hourly_Crossings__Beginning_2019_20250823.csv", nrows=10000)

# Convert sample to JSON bytes
docs = df_sample.to_dict(orient="records")
json_bytes = sum(len(json.dumps(doc)) for doc in docs)

avg_doc_size = json_bytes / len(docs)
print(f"Average doc size ≈ {avg_doc_size:.1f} bytes")

row_count = sum(1 for _ in open("UPDATED MTA_Bridges_and_Tunnels_Hourly_Crossings__Beginning_2019_20250823.csv")) - 1
estimated_index_size_bytes = avg_doc_size * row_count

print(f"Rows: {row_count:,}")
print(f"Estimated index size: {estimated_index_size_bytes / (1024**3):.2f} GB")

Average doc size ≈ 335.4 bytes
Rows: 11,608,864
Estimated index size: 3.63 GB


In [8]:
# Load json file with select columns per dataset
with open("nyc_congestion_datasets.json", "r") as f:
    nyc_congestion_datasets_select_col = json.load(f)

In [9]:
# Prep data to ensure ES compatibility
def actions_from_chunk(index_name, df):
    # Replace infinite values with None and convert NaN values to None
    df = df.replace([np.inf, -np.inf], None).where(pd.notnull(df), None)    
    # Convert each DataFrame row into a dictionary for Elasticsearch bulk indexing
    for doc in df.to_dict(orient="records"):
        yield {"_index": index_name, "_source": doc}

In [10]:
# Index CSV into Elasticsearch
def index_csv(es, csv_path, index_name, columns, chunk_size=100000, bulk_size=20000):
    # Delete index if it exists, then create a fresh one
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
    es.indices.create(
        index=index_name,
        settings={
            "refresh_interval": "-1" # disable autorefresh
        }
    )

    # Read CSV in chunks and send to Elasticsearch
    for chunk in pd.read_csv(csv_path, chunksize=chunk_size, usecols=columns):
        actions = actions_from_chunk(index_name, chunk)
        list(parallel_bulk(es, actions, chunk_size=bulk_size, raise_on_error=False))

    # Refresh index to make data searchable
    es.indices.refresh(index=index_name)

    print(f"{index_name} indexed successfully.")

In [11]:
# Run indexing via json file
start_all = time.time()

for cfg in nyc_congestion_datasets_select_col:
    index_csv(
        es,
        csv_path=cfg["csv_path"],
        index_name=cfg["index_name"],
        columns=cfg["columns"]
    )

print(f"\n Total indexing time: {time.time() - start_all:.1f} seconds")

daily_ridership indexed successfully.
crz indexed successfully.
tunnel_bridge_crossing indexed successfully.

 Total indexing time: 210.6 seconds


##  Is NYC removing cars from Lower Manhattan?

### Traffic in the 7 months before the toll: May–Dec 2024 vs. same period in prior years

In [12]:
# Measure time
start_time = time.time()

# filter tunnel_bridge_crossing by Direction 
query = {
    "bool": {
        "must": [
            {"wildcard": {"Direction.keyword": "*to Manhattan*"}}
        ]
    }
}

# Use helpers.scan to retrieve all matching documents
hits = []
for doc in helpers.scan(
    es,
    index="tunnel_bridge_crossing",
    query={"query": query},
    size=10000  # batch size per scroll
):
    hits.append(doc["_source"])

# Convert to DataFrame
df = pd.DataFrame(hits)

# Create 'year' column from date
df['Date'] = pd.to_datetime(df['Date'])
df['year'] = df['Date'].dt.year

# Filter for May 1 to Dec 31 and removing the year 2025
df = df[((df['Date'].dt.month > 5) | ((df['Date'].dt.month == 5) 
       & (df['Date'].dt.day >= 1))) 
       & (df['Date'].dt.year != 2025)]

# Aggregate sum of Traffic Count by year
daily_traffic_pre_tax_df = df.groupby('year')['Traffic Count'].sum().reset_index()
daily_traffic_pre_tax_df = daily_traffic_pre_tax_df.sort_values('year')

daily_traffic_pre_tax_df_elapsed_time = time.time() - start_time

# Output
print(f"Query took {daily_traffic_pre_tax_df_elapsed_time:.1f} seconds")
print(daily_traffic_pre_tax_df)

Query took 42.0 seconds
   year  Traffic Count
0  2019       62753970
1  2020       46481171
2  2021       60860738
3  2022       63150116
4  2023       64291873
5  2024       64740227


### Traffic in the 7 months since the toll: Jan-July 205 vs. same period in prior years

In [13]:
# Measure time
start_time = time.time()

# filter tunnel_bridge_crossing by Direction 
query = {
    "bool": {
        "must": [
            {"wildcard": {"Direction.keyword": "*to Manhattan*"}}
        ]
    }
}

# Use helpers.scan to retrieve all matching documents
hits = []
for doc in helpers.scan(
    es,
    index="tunnel_bridge_crossing",
    query={"query": query},
    size=10000  # batch size per scroll
):
    hits.append(doc["_source"])



# Convert to DataFrame
df = pd.DataFrame(hits)

# Create 'year' column from date
df['Date'] = pd.to_datetime(df['Date'])
df['year'] = df['Date'].dt.year

# Filter for Jan 5 to July 31
df = df[
    ((df['Date'].dt.month == 1) & (df['Date'].dt.day >= 5)) |
    ((df['Date'].dt.month > 1) & (df['Date'].dt.month < 7)) | 
    ((df['Date'].dt.month == 7) & (df['Date'].dt.day <= 31))
]

# Aggregate sum of Traffic Count by year
daily_traffic_post_tax_df = df.groupby('year')['Traffic Count'].sum().reset_index()
daily_traffic_post_tax_df = daily_traffic_post_tax_df.sort_values('year')

daily_traffic_post_tax_df_elapsed_time = time.time() - start_time

# Output
print(f"Query took {daily_traffic_post_tax_df_elapsed_time:.1f} seconds")
print(daily_traffic_post_tax_df)

Query took 43.0 seconds
   year  Traffic Count
0  2019       51934669
1  2020       35336442
2  2021       46547415
3  2022       51466478
4  2023       53105894
5  2024       53412048
6  2025       52462105


## Did NYC subway ridership increase?  

### Compare average daily ridership across all modes of public transit since start of toll program

In [14]:
# Measure time
start_time = time.time()

# Filter to January 5-July 31, 2025
query = {
    "range": {
        "Date": {
            "gte": "2025-01-05T00:00:00", 
            "lt": "2025-08-01T00:00:00"    
        }  
    }
}

# Use helpers.scan to retrieve all matching documents
hits = []
for doc in helpers.scan(
    es,
    index="daily_ridership",
    query={"query": query},
):
    hits.append(doc["_source"])

# Convert to DataFrame
df = pd.DataFrame(hits)

# Aggregate average daily ridershipt by transit mode
ridership_mode_df = df.groupby('Mode')['Count'].mean().reset_index()
ridership_mode_df = ridership_mode_df.sort_values('Count', ascending=False)

ridership_mode_df_elapsed_time = time.time() - start_time

# Output
print(f"Query took {ridership_mode_df_elapsed_time:.1f} seconds")
print(ridership_mode_df)

Query took 0.6 seconds
          Mode         Count
8       Subway  3.488259e+06
2          Bus  1.235365e+06
1           BT  9.223840e+05
3  CBD Entries  5.563550e+05
4  CRZ Entries  4.912280e+05
5         LIRR  2.176223e+05
6          MNR  1.862816e+05
0          AAR  3.644817e+04
7          SIR  5.808284e+03


### Average daily Subway ridership pre-toll by year

In [15]:
# Compare average daily ridership across all modes of public transit since start of toll program

# Measure time
start_time = time.time()

# query index, filter to January 5-July 31, 2025
query = {
    "bool": { 
        "must": [
            {"term": {"Mode.keyword": "Subway"}}
        ],
        "must_not": [
            {
                "range": {
                    "Date": {
                        "gte": "2025-01-01T00:00:00",
                        "lt": "2025-08-01T00:00:00"  
                    }
                }
            }
        ]
    }
}

# Use helpers.scan to retrieve all matching documents
hits = []
for doc in helpers.scan(
    es,
    index="daily_ridership",
    query={"query": query},  
):
    hits.append(doc["_source"])

# Convert to DataFrame
df = pd.DataFrame(hits)

# Create 'month' column from date
df['Date'] = pd.to_datetime(df['Date'])
df['year'] = df['Date'].dt.year

# Aggregate average daily ridership by transit mode
# Changed 'Month' to 'month' to match the column name created above
subway_yearly_ridership_df = df.groupby('year')['Count'].mean().reset_index()

subway_yearly_ridership_df_elapsed_time = time.time() - start_time

# Output
print(f"Query took {subway_yearly_ridership_df_elapsed_time:.1f} seconds")
print(subway_yearly_ridership_df)

Query took 0.1 seconds
   year         Count
0  2020  1.209467e+06
1  2021  2.081672e+06
2  2022  2.773989e+06
3  2023  3.151280e+06
4  2024  3.262847e+06


### Average daily Subway ridership post-toll by month
Compare average daily ridership across all modes of public transit since start of toll program.

In [16]:
# Measure time
start_time = time.time()

# filter to January 5-July 31, 2025
query = {
    "bool": { 
        "must": [
            {
                "term": {
                    "Mode.keyword": "Subway"
                }
            },
            {
                "range": {
                    "Date": {
                        "gte": "2025-01-05T00:00:00", 
                        "lt": "2025-08-01T00:00:00"    
                    }  
                }
            }
        ]
    }
}

# Use helpers.scan to retrieve all matching documents
hits = []
for doc in helpers.scan(
    es,
    index="daily_ridership",
    query={"query": query},  
    size=10000  # batch size per scroll
):
    hits.append(doc["_source"])

# Convert to DataFrame
df = pd.DataFrame(hits)

# Create 'month' column from date
df['Date'] = pd.to_datetime(df['Date'])
df['month'] = df['Date'].dt.month 

# Aggregate average daily ridership by transit mode
# Changed 'Month' to 'month' to match the column name created above
subway_monthly_ridership_df = df.groupby('month')['Count'].mean().reset_index()

subway_monthly_ridership_df_elapsed_time = time.time() - start_time

# Output
print(f"Query took {subway_monthly_ridership_df_elapsed_time:.1f} seconds")
print(subway_monthly_ridership_df)

Query took 0.0 seconds
   month         Count
0      1  3.257162e+06
1      2  3.361130e+06
2      3  3.494515e+06
3      4  3.666954e+06
4      5  3.627925e+06
5      6  3.551575e+06
6      7  3.424240e+06


##  Is NYC Adding Revenue? 

### Estimated revenue by month

In [17]:
# Measure time
start_time = time.time()

# Pull all documents from crz index
hits = []
for doc in scan(es, index="crz"):
    hits.append(doc["_source"])

df = pd.DataFrame(hits)

# Create function for est_revenue column
def calculate_revenue(row):
    vehicle_class = row['Vehicle Class']
    time_period = row['Time Period']
    entries = row['CRZ Entries']

    # Assign rate based on vehicle class and time period
    if vehicle_class == '1 - Cars, Pickups and Vans' and time_period == 'Peak':
        rate = 9.00
    elif vehicle_class == '1 - Cars, Pickups and Vans' and time_period == 'Overnight':
        rate = 2.25
    elif vehicle_class == '5 - Motorcycles' and time_period == 'Peak':
        rate = 4.50
    elif vehicle_class == '5 - Motorcycles' and time_period == 'Overnight':
        rate = 1.05
    elif vehicle_class in ['2 - Single-Unit Trucks', '4 - Buses'] and time_period == 'Peak':
        rate = 14.40
    elif vehicle_class in ['2 - Single-Unit Trucks', '4 - Buses'] and time_period == 'Overnight':
        rate = 3.60
    elif vehicle_class == '3 - Multi-Unit Trucks' and time_period == 'Peak':
        rate = 21.60
    elif vehicle_class == '3 - Multi-Unit Trucks' and time_period == 'Overnight':
        rate = 5.40
    elif vehicle_class == 'TLC Taxi/FHV':
        rate = 1.125
    else:
        rate = 0.0

    # Calculate estimated revenue
    return entries * rate

df['est_revenue'] = df.apply(calculate_revenue, axis=1)

# Create dataframe with estimated revenue per month

# Ensure Toll Date is datetime
df['Toll Date'] = pd.to_datetime(df['Toll Date'])

# Extract month
df['month'] = df['Toll Date'].dt.month

# Filter for months before August
df_before_august = df[df['month'] < 8]

# Group by month and sum revenue
monthly_revenue_df = df_before_august.groupby('month')['est_revenue'].sum().reset_index()

monthly_revenue_df_elapsed_time = time.time() - start_time

print(f"Query took {monthly_revenue_df_elapsed_time:.1f} seconds")
monthly_revenue_df

Query took 27.2 seconds


Unnamed: 0,month,est_revenue
0,1,71620380.0
1,2,73823960.0
2,3,84320420.0
3,4,83696480.0
4,5,87983630.0
5,6,83504490.0
6,7,84338660.0


### Estimated revenue by vehicle class

In [18]:
# Measure time
start_time = time.time()

# Group by month and sum revenue
revenue_vehicle_class_df = df_before_august.groupby('Vehicle Class')['est_revenue'].sum().reset_index()

revenue_vehicle_class_df_elapsed_time = time.time() - start_time

print(f"Query took {revenue_vehicle_class_df_elapsed_time:.1f} seconds")
revenue_vehicle_class_df

Query took 0.2 seconds


Unnamed: 0,Vehicle Class,est_revenue
0,"1 - Cars, Pickups and Vans",449727900.0
1,2 - Single-Unit Trucks,48113580.0
2,3 - Multi-Unit Trucks,4861388.0
3,4 - Buses,22672710.0
4,5 - Motorcycles,1516298.0
5,TLC Taxi/FHV,42396140.0


In [19]:
# Print
print(f'ridership_mode_df query took {daily_traffic_pre_tax_df_elapsed_time:.1f} seconds \n')
print(f'daily_traffic_post_tax_df query took {daily_traffic_post_tax_df_elapsed_time:.1f} seconds \n')
print(f'ridership_mode_df query took {ridership_mode_df_elapsed_time:.2f} seconds \n')
print(f'subway_yearly_ridership_df query took {subway_yearly_ridership_df_elapsed_time:.2f} seconds \n')
print(f'subway_monthly_ridership_df query took {subway_monthly_ridership_df_elapsed_time:.2f} seconds \n')
print(f'monthly_revenue_df query took {monthly_revenue_df_elapsed_time:.1f} seconds \n')
print(f'revenue_vehicle_class_df query took {revenue_vehicle_class_df_elapsed_time:.2f} seconds \n')


ridership_mode_df query took 42.0 seconds 

daily_traffic_post_tax_df query took 43.0 seconds 

ridership_mode_df query took 0.64 seconds 

subway_yearly_ridership_df query took 0.08 seconds 

subway_monthly_ridership_df query took 0.03 seconds 

monthly_revenue_df query took 27.2 seconds 

revenue_vehicle_class_df query took 0.18 seconds 

