In [1]:
import pandas as pd
from sodapy import Socrata
from datetime import datetime
from pymongo import MongoClient
import nest_asyncio
from dateutil.relativedelta import relativedelta

In [2]:
# initialize some variable
APP_TOKEN = "JvFbfQ0fr0F5UWOjkgBNor6R8"
USERNAME = "xw3759@nyu.edu"
PASSWORD = "^/l/vPC$T3R70E"
DATA_SOURCE = "data.ny.gov"
MONGO_URI = "mongodb://my-mongo-db:27017/"

### Getting the Ridership data

In [7]:
DATASET_ID = "wujg-7c2s"
DATABASE = 'ny_transit_data'
DB_COLLECTION = 'ridership'
RECORD_LIMIT_PER_MONTH = 5000000 # Adjust if a month's data exceeds this

# --- Date Range for Data Fetching ---
# Set the start and end year and month for the data you want to retrieve.
START_YEAR = 2024
START_MONTH = 12
END_YEAR = 2024
END_MONTH = 12 # The loop will include this month

In [8]:
"""
Fetches data from the Socrata API for a specified date range,
one month at a time.
"""
try:
    # Authenticated client to connect to Socrata API
    client = Socrata(DATA_SOURCE,
                     APP_TOKEN,
                     username=USERNAME,
                     password=PASSWORD,
                     timeout=3600) # Increased timeout for large queries

    print(f"Successfully authenticated with {DATA_SOURCE}")    

    mongo_client = MongoClient(MONGO_URI)

    # this is needed to make it async, otherwise it throws an error withou finishing the call
    #nest_asyncio.apply()

    # check if the DB is up
    try:
        mongo_client.admin.command('ping')
        print("DB connected!")
    except Exception as e:
        print("DB connection error!")
        print(e)
        
    # Select database
    db = mongo_client[DATABASE]
    
    # Select collection
    collection = db[DB_COLLECTION]
    
    # Before inserting, it's good practice to clear old data to avoid duplicates
    collection.delete_many({})
    
    # List to hold all the monthly dataframes
    #all_monthly_data = []

    # Generate the start and end dates for the loop
    start_date = datetime(START_YEAR, START_MONTH, 1)
    
    # The end_date for the loop condition is the first day of the month *after* the specified end month.
    end_date = datetime(END_YEAR, END_MONTH, 1) + relativedelta(months=1)

    current_date = start_date
    while current_date < end_date:
        # Format the start and end of the month for the SoQL query
        start_of_month = current_date.strftime('%Y-%m-01T00:00:00.000')
        # Calculate the end of the month
        end_of_month_date = current_date + relativedelta(months=1) - relativedelta(days=1)
        end_of_month = end_of_month_date.strftime('%Y-%m-%dT23:59:59.999')

        # Build the SoQL query to get all records within the current month
        soql_query = f"transit_timestamp between '{start_of_month}' and '{end_of_month}'"

        print(f"Fetching data for: {current_date.strftime('%Y-%m')}...")

        # Make the API call with the 'where' filter for the current month
        results = client.get(DATASET_ID, where=soql_query, limit=RECORD_LIMIT_PER_MONTH)

        if results:
            # Convert the list of dictionaries to a pandas DataFrame
            results_df = pd.DataFrame.from_records(results)
            print(f"Successfully fetched {len(results_df)} records for {current_date.strftime('%Y-%m')}.")
            #all_monthly_data.append(results_df)
            
            # Columns to convert to numbers
            numeric_cols = ['ridership', 'transfers', 'latitude', 'longitude']
            for col in numeric_cols:
                results_df[col] = pd.to_numeric(results_df[col], errors='coerce')
            
            # Column to convert to a datetime object
            results_df['transit_timestamp'] = pd.to_datetime(results_df['transit_timestamp'], errors='coerce')
            
            # Text columns are usually loaded as 'object' dtype by pandas, which is fine.
            # If you want to be explicit, you can convert them to the modern 'string' dtype.
            string_cols = [
                'transit_mode', 'station_complex_id', 'station_complex',
                'borough', 'payment_method', 'fare_class_category'
            ]
            
            for col in string_cols:
                results_df[col] = results_df[col].astype('string')

            print("\n--- Data Types After Conversion ---")
            print(results_df.info())
            
            # Now you can work with your data with the correct types
            print("\n--- First 5 Rows of Cleaned Data ---")
            print(results_df.head())

            # Convert the DataFrame to a list of dictionaries
            records_to_insert = results_df.to_dict('records')

            # Insert the records into the collection
            collection.insert_many(records_to_insert)
            
            print(f"Mongo DB : Successfully inserted {len(records_to_insert)} records into the '{collection.name}' collection.")
        else:
            print(f"No records found for {current_date.strftime('%Y-%m')}.")

        # Move to the next month
        current_date += relativedelta(months=1)

    """
    # Concatenate all the monthly dataframes into a single dataframe
    if all_monthly_data:
        final_df = pd.concat(all_monthly_data, ignore_index=True)
        print("\n--- Data Fetching Complete ---")
        print(f"Total records fetched: {len(final_df)}")
        print("--- First 5 rows of the combined data: ---")
        print(final_df.head())
    else:
        print("\nNo data was fetched for the specified date range.")
    """
except Exception as e:
    print(f"An error occurred: {e}")

finally:
    if 'client' in locals():
        client.close()
        print("Socrata client connection closed.")
    if 'mongo_client' in locals():
        mongo_client.close()
        print("Mongo DB client connection closed.")

Successfully authenticated with data.ny.gov
DB connected!
Fetching data for: 2024-12...
Successfully fetched 2446236 records for 2024-12.
Mongo DB : Successfully inserted 2446236 records into the 'ridership' collection.
Socrata client connection closed.
Mongo DB client connection closed.


### Getting the Ridership data for ML

In [9]:
DATASET_ID = "wujg-7c2s"
DATABASE = 'ny_transit_data'
DB_COLLECTION = 'ridershipML'
RECORD_LIMIT_PER_MONTH = 5000000 # Adjust if a month's data exceeds this

# --- Date Range for Data Fetching ---
# Set the start and end year and month for the data you want to retrieve.
START_YEAR = 2024
START_MONTH = 9
END_YEAR = 2024
END_MONTH = 12 # The loop will include this month

"""
Fetches data from the Socrata API for a specified date range,
one month at a time.
"""
try:
    # Authenticated client to connect to Socrata API
    client = Socrata(DATA_SOURCE,
                     APP_TOKEN,
                     username=USERNAME,
                     password=PASSWORD,
                     timeout=3600) # Increased timeout for large queries

    print(f"Successfully authenticated with {DATA_SOURCE}")    

    mongo_client = MongoClient(MONGO_URI)

    # this is needed to make it async, otherwise it throws an error withou finishing the call
    #nest_asyncio.apply()

    # check if the DB is up
    try:
        mongo_client.admin.command('ping')
        print("DB connected!")
    except Exception as e:
        print("DB connection error!")
        print(e)
        
    # Select database
    db = mongo_client[DATABASE]
    
    # Select collection
    collection = db[DB_COLLECTION]
    
    # Before inserting, it's good practice to clear old data to avoid duplicates
    collection.delete_many({})
    
    # List to hold all the monthly dataframes
    #all_monthly_data = []

    # Generate the start and end dates for the loop
    start_date = datetime(START_YEAR, START_MONTH, 1)
    
    # The end_date for the loop condition is the first day of the month *after* the specified end month.
    end_date = datetime(END_YEAR, END_MONTH, 1) + relativedelta(months=1)

    current_date = start_date
    while current_date < end_date:
        # Format the start and end of the month for the SoQL query
        start_of_month = current_date.strftime('%Y-%m-01T00:00:00.000')
        # Calculate the end of the month
        end_of_month_date = current_date + relativedelta(months=1) - relativedelta(days=1)
        end_of_month = end_of_month_date.strftime('%Y-%m-%dT23:59:59.999')

        # Build the SoQL query to get all records within the current month
        soql_query = f"transit_timestamp between '{start_of_month}' and '{end_of_month}'"

        print(f"Fetching data for: {current_date.strftime('%Y-%m')}...")

        # Make the API call with the 'where' filter for the current month
        results = client.get(DATASET_ID, where=soql_query, limit=RECORD_LIMIT_PER_MONTH)

        if results:
            # Convert the list of dictionaries to a pandas DataFrame
            results_df = pd.DataFrame.from_records(results)
            print(f"Successfully fetched {len(results_df)} records for {current_date.strftime('%Y-%m')}.")
            #all_monthly_data.append(results_df)
            
            # Columns to convert to numbers
            numeric_cols = ['ridership', 'transfers', 'latitude', 'longitude']
            for col in numeric_cols:
                results_df[col] = pd.to_numeric(results_df[col], errors='coerce')
            
            # Column to convert to a datetime object
            results_df['transit_timestamp'] = pd.to_datetime(results_df['transit_timestamp'], errors='coerce')
            
            # Text columns are usually loaded as 'object' dtype by pandas, which is fine.
            # If you want to be explicit, you can convert them to the modern 'string' dtype.
            string_cols = [
                'transit_mode', 'station_complex_id', 'station_complex',
                'borough', 'payment_method', 'fare_class_category'
            ]
            
            for col in string_cols:
                results_df[col] = results_df[col].astype('string')

            print("\n--- Data Types After Conversion ---")
            print(results_df.info())
            
            # Now you can work with your data with the correct types
            print("\n--- First 5 Rows of Cleaned Data ---")
            print(results_df.head())

            # Convert the DataFrame to a list of dictionaries
            records_to_insert = results_df.to_dict('records')

            # Insert the records into the collection
            collection.insert_many(records_to_insert)
            
            print(f"Mongo DB : Successfully inserted {len(records_to_insert)} records into the '{collection.name}' collection.")
        else:
            print(f"No records found for {current_date.strftime('%Y-%m')}.")

        # Move to the next month
        current_date += relativedelta(months=1)

    """
    # Concatenate all the monthly dataframes into a single dataframe
    if all_monthly_data:
        final_df = pd.concat(all_monthly_data, ignore_index=True)
        print("\n--- Data Fetching Complete ---")
        print(f"Total records fetched: {len(final_df)}")
        print("--- First 5 rows of the combined data: ---")
        print(final_df.head())
    else:
        print("\nNo data was fetched for the specified date range.")
    """
except Exception as e:
    print(f"An error occurred: {e}")

finally:
    if 'client' in locals():
        client.close()
        print("Socrata client connection closed.")
    if 'mongo_client' in locals():
        mongo_client.close()
        print("Mongo DB client connection closed.")

Successfully authenticated with data.ny.gov
DB connected!
Fetching data for: 2024-09...
Successfully fetched 2342789 records for 2024-09.

--- Data Types After Conversion ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2342789 entries, 0 to 2342788
Data columns (total 12 columns):
 #   Column               Dtype         
---  ------               -----         
 0   transit_timestamp    datetime64[ns]
 1   transit_mode         string        
 2   station_complex_id   string        
 3   station_complex      string        
 4   borough              string        
 5   payment_method       string        
 6   fare_class_category  string        
 7   ridership            float64       
 8   transfers            float64       
 9   latitude             float64       
 10  longitude            float64       
 11  georeference         object        
dtypes: datetime64[ns](1), float64(4), object(1), string(6)
memory usage: 214.5+ MB
None

--- First 5 Rows of Cleaned Data ---
  transit_t

### Getting the Station Data

In [15]:
DATASET_ID = "39hk-dx4f"
DATABASE = 'ny_transit_data'
DB_COLLECTION = 'station'
RECORD_LIMIT = 5000

In [18]:
try:
    # Authenticated client to connect to Socrata API
    client = Socrata(DATA_SOURCE,
                     APP_TOKEN,
                     username=USERNAME,
                     password=PASSWORD,
                     timeout=3600) # Increased timeout for large queries

    print(f"Successfully authenticated with {DATA_SOURCE}")

    # returned as JSON from API / converted to Python list of dictionaries by sodapy.
    results = client.get(DATASET_ID, limit=RECORD_LIMIT)

    # Convert to pandas DataFrame
    results_df = pd.DataFrame.from_records(results)

    # Columns to convert to numbers
    numeric_cols = ['station_id', 'complex_id', 'gtfs_latitude', 'gtfs_longitude','ada','ada_northbound','ada_southbound']
    for col in numeric_cols:
        results_df[col] = pd.to_numeric(results_df[col], errors='coerce')

    # For the boolean 'cbd' column ("TRUE" or "FALSE" strings)
    if 'cbd' in results_df.columns:
        results_df['cbd'] = results_df['cbd'].apply(lambda x: True if x == 'TRUE' else False)
    
    mongo_client = MongoClient(MONGO_URI)

    # check if the DB is up
    try:
        mongo_client.admin.command('ping')
        print("DB connected!")
    except Exception as e:
        print("DB connection error!")
        print(e)
        
    # Select database
    db = mongo_client[DATABASE]
    
    # Select collection
    collection = db[DB_COLLECTION]
    
    # Before inserting, it's good practice to clear old data to avoid duplicates
    collection.delete_many({})

    # Convert the DataFrame to a list of dictionaries
    records_to_insert = results_df.to_dict('records')

    # Insert the records into the collection
    collection.insert_many(records_to_insert)
            
    print(f"Mongo DB : Successfully inserted {len(results)} records into the '{collection.name}' collection.")
    


except Exception as e:
    print(f"An error occurred: {e}")

finally:
    if 'client' in locals():
        client.close()
        print("Socrata client connection closed.")
    if 'mongo_client' in locals():
        mongo_client.close()
        print("Mongo DB client connection closed.")

Successfully authenticated with data.ny.gov
DB connected!
Mongo DB : Successfully inserted 496 records into the 'station' collection.
Socrata client connection closed.
Mongo DB client connection closed.


### Getting the Weather Data

In [7]:
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from pymongo import MongoClient
import time

# --- Configuration ---
# NOTE: It is recommended to store sensitive keys in environment variables or a config file
API_KEY = 'b757af565adebb258d1eb84bc97c5870' 
MONGO_URI = "mongodb://my-mongo-db:27017/"
DATABASE = 'ny_transit_data'
DB_COLLECTION = 'weather_data'

NYC_CITY_ID = 5128581
BASE_URL = "https://history.openweathermap.org/data/2.5/history/city"

# --- Define Date Range for Iteration ---
start_date_str = "2024-09-01"
end_date_str = "2024-12-31"


# Use pandas to create a range of dates to loop through
try:
    date_range = pd.date_range(start=start_date_str, end=end_date_str, freq='D')
except ValueError as e:
    print(f"Error creating date range. Please check your date strings: {e}")
    exit()

# List to store the DataFrame for each day
all_daily_dfs = []

# Define the NYC timezone once
nyc_tz = ZoneInfo("America/New_York")

print(f"Starting data fetch for {len(date_range)} days...")

# Loop through each day in the specified range
for day in date_range:
    print(f"Fetching data for: {day.strftime('%Y-%m-%d')}")

    # Define the start and end of the current day
    start_of_day_naive = day.to_pydatetime() # 00:00:00
    end_of_day_naive = start_of_day_naive + timedelta(days=1) - timedelta(seconds=1) # 23:59:59

    # Localize to NYC timezone
    start_datetime_nyc = start_of_day_naive.replace(tzinfo=nyc_tz)
    end_datetime_nyc = end_of_day_naive.replace(tzinfo=nyc_tz)

    # Convert to UTC Unix timestamps for the API call
    start_timestamp_utc = int(start_datetime_nyc.timestamp())
    end_timestamp_utc = int(end_datetime_nyc.timestamp())

    # Construct the request URL for the current day
    request_url = f"{BASE_URL}?id={NYC_CITY_ID}&type=hour&start={start_timestamp_utc}&end={end_timestamp_utc}&appid={API_KEY}"
    # print(f"Request URL: {request_url}") # Uncomment for debugging

    try:
        time.sleep(5)
        response = requests.get(request_url)
        response.raise_for_status()  # Raises an HTTPError for bad responses (4xx or 5xx)
        
        data = response.json()
        
        # Check if the API returned any data points
        if data.get('cnt', 0) > 0 and 'list' in data:
            # Normalize the JSON data for the current day into a DataFrame
            daily_df = pd.json_normalize(
                data['list'],
                record_path='weather',
                meta=['dt',
                      ['main', 'temp'],
                      ['main', 'feels_like'],
                      ['main', 'pressure'],
                      ['main', 'humidity'],
                      ['wind', 'speed'],
                      ['wind', 'deg'],
                      ['wind', 'gust']],
                errors='ignore'
            )
            all_daily_dfs.append(daily_df)
            print(f"Successfully fetched {len(data['list'])} hourly records.")
        else:
            print("No data returned from API for this day.")

    except requests.exceptions.HTTPError as err:
        print(f"HTTP Error for {day.strftime('%Y-%m-%d')}: {err}")
    except requests.exceptions.RequestException as e:
        print(f"An error occurred for {day.strftime('%Y-%m-%d')}: {e}")
    
    # Be a good API citizen: wait a moment before the next request
    time.sleep(1)


# --- Combine, Process, and Insert Data ---

if not all_daily_dfs:
    print("\nNo data was fetched. Exiting without updating database.")
else:
    print("\nData fetching complete. Combining daily data into a single DataFrame...")
    # Concatenate all the daily DataFrames into one
    hourly_weather_df = pd.concat(all_daily_dfs, ignore_index=True)

    # --- Post-processing on the final DataFrame ---
    
    # Convert the 'dt' column from Unix timestamp to a human-readable NYC local time string
    hourly_weather_df['dt_nyc'] = hourly_weather_df['dt'].apply(
        lambda x: datetime.fromtimestamp(x, tz=timezone.utc).astimezone(nyc_tz).strftime('%Y-%m-%d %H:%M:%S %Z')
    )

    # Reorder columns to have the new datetime at the beginning
    all_columns = ['dt_nyc', 'dt', 'id', 'main', 'description', 'icon', 'main.temp', 'main.feels_like', 
                   'main.pressure', 'main.humidity', 'wind.speed', 'wind.deg', 'wind.gust']
    # Filter to only include columns that actually exist in the DataFrame to avoid errors
    existing_columns = [col for col in all_columns if col in hourly_weather_df.columns]
    hourly_weather_df = hourly_weather_df.reindex(columns=existing_columns)
    
    print(f"Total hourly records compiled: {len(hourly_weather_df)}")
    print("DataFrame columns:", hourly_weather_df.columns.tolist())
    print("Sample of final data:\n", hourly_weather_df.head())

    # --- Database Insertion ---
    try:
        print("\nConnecting to MongoDB...")
        mongo_client = MongoClient(MONGO_URI)
        mongo_client.admin.command('ping')
        print("MongoDB connection successful!")

        db = mongo_client[DATABASE]
        collection = db[DB_COLLECTION]

        # Clear old data from the collection before inserting new data
        print(f"Clearing existing data from collection '{DB_COLLECTION}'...")
        delete_result = collection.delete_many({})
        print(f"Cleared {delete_result.deleted_count} documents.")

        # Convert the final DataFrame to a list of dictionaries for insertion
        records_to_insert = hourly_weather_df.to_dict('records')

        # Insert the new, combined records into the collection
        print(f"Inserting {len(records_to_insert)} new records...")
        result = collection.insert_many(records_to_insert)
        
        print(f"Successfully inserted {len(result.inserted_ids)} records into the '{collection.name}' collection in the '{DATABASE}' database.")

    except Exception as e:
        print(f"An error occurred during the database operation: {e}")
    finally:
        if 'mongo_client' in locals() and mongo_client:
            mongo_client.close()
            print("MongoDB connection closed.")

Starting data fetch for 122 days...
Fetching data for: 2024-09-01
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-02
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-03
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-04
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-05
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-06
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-07
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-08
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-09
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-10
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-11
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-12
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-13
Successfully fetched 24 hourly records.
Fetching data for: 2024-09-14
Successfully fetched 24 