In [None]:
#!/usr/bin/env python

import pandas as pd
from sodapy import Socrata
import time
import os
from dotenv import load_dotenv
import requests
from requests.exceptions import RequestException
from backoff import on_exception, expo

# Load environment variables from ../.api_key
load_dotenv(dotenv_path='../.api_key')

# Get the MTA_API_KEY from environment variables
mta_api_key = os.getenv('MTA_API_KEY')

# Check if the API key was loaded
if not mta_api_key:
    raise ValueError("MTA_API_KEY not found in environment variables. Please check your .api_key file.")


# Initialize the Socrata client with the API key
client = Socrata("data.ny.gov", mta_api_key, timeout=60)

# Dataset identifier for MTA Subway Data
dataset_identifier = "wujg-7c2s"

# Dataset identifier for MTA Bus Data
# dataset_identifier = "kv7t-n8in"

# Filter data from January 1st, 2022 onward using the correct date field
where_clause = "transit_timestamp >= '2020-01-01T00:00:00'"

# Set the number of records to fetch per batch
limit = 50000  # Fetch 50,000 rows per batch


In [None]:
#!/usr/bin/env python

import pandas as pd
from sodapy import Socrata
import time
import os
from dotenv import load_dotenv
import json
from datetime import datetime, timedelta
import requests

# Load environment variables from ../.api_key
load_dotenv(dotenv_path='../.api_key')

# Get the MTA_API_KEY from environment variables
mta_api_key = os.getenv('MTA_API_KEY')

# Check if the API key was loaded
if not mta_api_key:
    raise ValueError("MTA_API_KEY not found in environment variables. Please check your .api_key file.")

# Initialize the Socrata client with the API key
client = Socrata("data.ny.gov", mta_api_key, timeout=60)

# Set the date range for data fetching
start_date = datetime(2020, 1, 1)
end_date = datetime(2024, 10, 24)  # Adjust as needed

# Time delta for each batch (e.g., one day)
delta = timedelta(days=1)

# State management
state_file = 'fetch_state.json'

if os.path.exists(state_file):
    # Load state from file
    with open(state_file, 'r') as f:
        state = json.load(f)
        current_date_str = state.get('current_date')
        if current_date_str:
            current_date = datetime.fromisoformat(current_date_str)
        else:
            current_date = start_date
    print(f"Resuming from date {current_date.date()}")
else:
    # Initialize state if no state file exists
    current_date = start_date
    state = {}

# Create output directory if it doesn't exist
output_dir = 'mta_subway_data'
# output_dir = 'mta_bus_data'
os.makedirs(output_dir, exist_ok=True)

while current_date <= end_date:
    max_retries = 3
    retry_count = 0
    while retry_count < max_retries:
        try:
            next_date = current_date + delta

            # Format dates for the WHERE clause
            current_date_str = current_date.strftime('%Y-%m-%dT%H:%M:%S')
            next_date_str = next_date.strftime('%Y-%m-%dT%H:%M:%S')

            where_clause = f"transit_timestamp >= '{current_date_str}' AND transit_timestamp < '{next_date_str}'"

            # Fetch data for the current date range
            results = client.get(
                dataset_identifier,
                where=where_clause,
                limit=1000000  # Increase limit as needed
            )

            if not results:
                print(f"No data returned for date {current_date.date()}.")
            else:
                results_df = pd.DataFrame.from_records(results)

                # Write to CSV
                if not results_df.empty:
                    # Create a filename for the current date
                    output_file = os.path.join(output_dir, f'{output_dir}_{current_date.date()}.csv')
                    
                    # Write the data to the file
                    results_df.to_csv(output_file, index=False)
                    print(f"Data for {current_date.date()} saved to {output_file}")

            # Update current_date for the next iteration
            current_date = next_date

            # Update state
            state['current_date'] = current_date.isoformat()

            # Save state to file
            with open(state_file, 'w') as f:
                json.dump(state, f)

            # Optional: Sleep to respect API rate limits
            time.sleep(1)

            # If successful, break out of the retry loop
            break

        except requests.exceptions.Timeout:
            retry_count += 1
            if retry_count < max_retries:
                print(f"Timeout error occurred. Retrying in 60 seconds... (Attempt {retry_count} of {max_retries})")
                time.sleep(60)
            else:
                print(f"Max retries reached. Stopping the process.")
                # Close the Socrata client connection before exiting
                client.close()
                exit(1)
        except Exception as e:
            print(f"An error occurred: {e}")
            # Close the Socrata client connection before exiting
            client.close()
            exit(1)

# Close the Socrata client connection
client.close()


In [None]:
import pandas as pd
import os
from datetime import datetime, timedelta
import pytz

def process_daily_csvs_for_hourly_ridership(input_dir):
    hourly_ridership = {}
    
    # Get all CSV files in the input directory
    csv_files = [f for f in os.listdir(input_dir) if f.endswith('.csv')]

    # Sort the csv_files list by the file name in ascending order
    csv_files.sort()
    
    for csv_file in csv_files:
        print(f"Processing {csv_file}")

        file_path = os.path.join(input_dir, csv_file)
        df = pd.read_csv(file_path)
        
        # Convert timestamp to datetime
        df['datetime'] = pd.to_datetime(df['transit_timestamp'])
        
        # Localize to UTC first, then convert to Eastern Time
        df['datetime'] = df['datetime'].dt.tz_localize('UTC', ambiguous='NaT').dt.tz_convert('US/Eastern')
        
        # Create hour key (YYYY-MM-DD HH:00:00)
        df['hour_key'] = df['datetime'].dt.floor('h', ambiguous='NaT')
        
        # Group by hour and sum ridership
        hourly_sum = df.groupby('hour_key')['ridership'].sum()
        
        # Update the hourly_ridership dictionary
        for hour, ridership in hourly_sum.items():
            if 0 <= ridership <= 1000000:  # Assuming max 1,000,000 riders per hour as a sanity check
                hourly_ridership[hour] = hourly_ridership.get(hour, 0) + ridership
    
    # Convert to DataFrame
    df_hourly = pd.DataFrame.from_dict(hourly_ridership, orient='index', columns=['total_ridership'])
    df_hourly.index.name = 'hour'
    df_hourly = df_hourly.sort_index()
    
    return df_hourly

# Process the CSV files and get hourly ridership
print("Processing CSV files for hourly ridership:")
hourly_ridership_df = process_daily_csvs_for_hourly_ridership(output_dir)

print("Hourly ridership calculation complete.")
print(f"Total hours processed: {len(hourly_ridership_df)}")

# Display the first few rows of the result
print("\nFirst few rows of hourly ridership:")
print(hourly_ridership_df.head())

# Save the result to a CSV file
output_file = 'hourly_bus_ridership.csv'
hourly_ridership_df.to_csv(output_file)
print(f"\nHourly ridership data saved to {output_file}")
