# Data Collection

Electricity usage data downloaded from PG&E public datasets: https://pge-energydatarequest.com/public_datasets/

Weather data scraping from Weather Underground: https://www.wunderground.com/history/monthly/us/ca

Spend tens of hours on scraping weather data:

1. scraping weather station list from Weather Underground Interactive Map: 
    1. understand logic: data is requested by tile number, which is determined by scaling level
    2. search for PG&E service area and set the bounding tiles
    3. scrape station ids, 11141 in total.
2. scrape weather data by station id. The data is too big so that this plan was abandoned at first: 11141 stations * 5 years * 12 months * 1 second/request 669,000 seconds = 7.7 days
3. explore other 2 apis: NOAA and Open Weather Map
    1. NOAA has hundreds of datatypes but only 2 to 3 datatypes are available for the zipcodes, no temperature.
    2. Open Weather Map is limited.
4. back to Weather Underground again, but use multi-threading to speed up the process. With 100 async concurrent requests, it takes about 3 hours to scrape 11141 stations.

After getting json files, the next step is to merge into the main file (all codes have been written):
1. combine json files (daily observation) into dataframes (monthly average) and save to local. One dataframe file for one station. (Ongoing: 4 more hours needed)
2. convert zipcodes from main file into latitude and longitude.
3. find the nearest weather station data for each zipcode for each month.
4. for missing data, fill wth 1) same month of nearby years 2) second nearest weather station data

Next step: modeling

This is a time series forecasting problem, with monthly data, external variables and large dataset.

1. baseline model: SARIMA
   - ARIMA model is classical time series model, but it is not suitable for seasonal changes.
   - Among its variations, SARIMA is the most suitable one for 1) obvious seasonal changes 2) external variables.
2. feature engineering based on baseline model result
3. main model: LightGBM
    - LightGBM is fast and suitable for large dataset.
4. model evaluation metrics: MAE and RMSE

# Weather data collection

## 1 Scrape station ids

Manually set tile numbers (x_min=316, x_max=352, y_min=764, y_max=815), according to PG&E serving area.

In [None]:
from time import sleep
import requests
import json

def generate_tile_list(x_min=316, x_max=352, y_min=764, y_max=815):
    tile_list = []
    for x in range(x_min, x_max + 1):
        for y in range(y_min, y_max + 1):
            tile_list.append((x, y))
    return tile_list


tiles = generate_tile_list()
tile_count = len(tiles)
print(f"Tile count: {tile_count}")

def extract_ids(data):
    lis = []
    for time_period in data.values():
        if isinstance(time_period, dict) and 'features' in time_period:
            for feature in time_period['features']:
                if 'id' in feature:
                    lis.append(feature['id'])
    return lis

for i, tile in enumerate(tiles):
    x, y = tile
    url = (f"https://api3.weather.com/v2/vector-api/products/614/features?x={x}&y={y}"
           f"&lod=12"
           f"&apiKey=e1f10a1e78da46f5b10a1e78da96f525"
           f"&tile-size=512"
           f"&time=1745128800000-1745129700000:26&time=1745127900000-1745128800000:41&time=1745127000000-1745127900000:56&time=1745126100000-1745127000000:71"
           f"&stepped=true")
    try:
        r = requests.get(url)
    except requests.exceptions.RequestException as e:
        print("index: ", i)
        print("Error in tile ", tile)
        print(f"Request failed: {e}")
        break

    if r.status_code != 200:
        print("index: ", i)
        print("Error in tile ", tile)
        print(f"Request failed with status code: {r.status_code}")
        break

    try:
        text = json.loads(r.text)
    except json.JSONDecodeError as e:
        print("index: ", i)
        print("Error in tile ", tile)
        print(f"JSON decode failed: {e}")
        print("Status code: ", r.status_code)
        print(r.text)
        break
    id_list = extract_ids(text)
    print(url)
    print(id_list)
    # write ids to file
    with open("data/Weather Station IDs.txt", "a") as f:
        for s_id in id_list:
            f.write(f"{s_id}\n")
    print(f"Tile {tile} done ({i}/{tile_count})")
    sleep(0.5)

Scrape station locations (latitude, longitude)

In [None]:
import csv
from time import sleep
import requests

def load_station_ids(file_path):
    """
    Load and deduplicate weather station IDs from a text file.

    Parameters:
    - file_path: str, path to the text file containing station IDs

    Returns:
    - ids: list of str, unique sorted station IDs

    Sample Usage:
    ids = load_station_ids("Weather Station IDs.txt")
    """
    with open(file_path, "r") as f:
        ids = f.readlines()
    ids = [s.strip() for s in ids]
    ids = sorted(list(set(ids)))
    print(f"Total loaded station IDs: {len(ids)}")
    return ids

def load_completed_stations(file_path):
    """
    Load the set of completed station IDs from an existing CSV file.

    Parameters:
    - file_path: str, path to the CSV file with completed stations

    Returns:
    - completed: set of str, completed station IDs

    Sample Usage:
    completed = load_completed_stations("station_location.csv")
    """
    completed = set()
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                completed.add(row["s_id"])
    except FileNotFoundError as e:
        print(f"Error reading completed file: {e}")
    return completed

def fetch_station_data(s_id):
    """
    Fetch station data from the weather API.

    Parameters:
    - s_id: str, station ID

    Returns:
    - data: dict or None, JSON response parsed as a dictionary or None if failed

    Sample Usage:
    data = fetch_station_data("STATION123")
    """
    url = (f"https://api.weather.com/v2/pws/observations/current?"
           f"apiKey=e1f10a1e78da46f5b10a1e78da96f525"
           f"&stationId={s_id}"
           f"&numericPrecision=decimal&format=json&units=e")
    try:
        response = requests.get(url)
        data = json.loads(response.text)
        return data
    except requests.exceptions.RequestException as e:
        print(f"{s_id} Request failed: {e}")
        print(f"URL: {url}")
    except json.JSONDecodeError as e:
        print(f"{s_id} JSON decode failed: {e}")
        print(response.text)
    return None

def save_station_location(s_id, latitude, longitude, icaoCode, postalCode, file_path):
    """
    Save station location data to a CSV file.

    Parameters:
    - s_id: str, station ID
    - latitude: float, latitude value
    - longitude: float, longitude value
    - icaoCode: str or None, ICAO code
    - postalCode: str or None, postal code
    - file_path: str, path to the CSV file

    Sample Usage:
    save_station_location("STATION123", 40.0, -75.0, "KABC", "12345", "station_location.csv")
    """
    with open(file_path, "a") as f:
        f.write(f"{s_id},{latitude},{longitude},{icaoCode},{postalCode}\n")

def scrape_station_data():
    """
    Main function to process weather station IDs and fetch their location data.
    """
    ids = load_station_ids("data/Weather Station IDs.txt")
    completed = load_completed_stations("data/station_location.csv")
    tasks = [s_id for s_id in ids if s_id not in completed]
    total_tasks = len(ids)
    if tasks:
        print(f"[INFO] Total tasks: {total_tasks}, completed: {len(completed)}, remaining: {len(tasks)}")
    
    for i, s_id in enumerate(ids):
        print(f"Processing station: {s_id} ({i + 1}/{total_tasks})")
        data = fetch_station_data(s_id)
        if data:
            for observation in data.get('observations', []):
                latitude = observation.get('lat')
                longitude = observation.get('lon')
                icaoCode = observation.get('icaoCode')
                postalCode = observation.get('postalCode')
                if latitude is not None and longitude is not None:
                    save_station_location(s_id, latitude, longitude, icaoCode, postalCode, "station_location.csv")
        sleep(0.3)

scrape_station_data()

## 2 Scrape weather data by station id

In [None]:
import asyncio
import aiohttp
import time
from datetime import datetime, timedelta
import pandas as pd

async def fetch_and_save_weather_data_async(session, year, month, api_key, station_id):
    """
    Asynchronously fetch weather data for a given year and month and save it as a local JSON file.
    Skips the request if the file already exists.

    Parameters:
    - session: aiohttp.ClientSession, aiohttp client session
    - year: int, year to request data for
    - month: int, month to request data for
    - api_key: str, your Weather API key
    - station_id: str, weather station ID

    Returns:
    - None

    Sample Usage:
    await fetch_and_save_weather_data_async(session, 2024, 5, "APIKEY", "STATION123")
    """
    filename = f"{station_id}_{year}{month:02d}.json"
    filepath = os.path.join("data/weather_json", filename)
    if os.path.exists(filepath):
        print(f"[{station_id}] {filepath} already exists, skipping request.")
        return

    start_date_str = f"{year}{month:02d}01"
    if month == 12:
        end_year = year
        end_month = month
        end_day = 31
    else:
        end_year = year
        end_month = month + 1
        end_day = 1
    end_date = datetime(end_year, end_month, end_day) - timedelta(days=1)
    end_date_str = end_date.strftime("%Y%m%d")

    api_url = f"https://api.weather.com/v2/pws/history/daily?stationId={station_id}&format=json&units=e&startDate={start_date_str}&endDate={end_date_str}&numericPrecision=decimal&apiKey={api_key}"

    try:
        async with session.get(api_url) as response:
            response.raise_for_status()  # Raise HTTPError if request fails
            data = await response.json()

            os.makedirs(os.path.dirname(filepath), exist_ok=True)
            with open(filepath, 'w') as f:
                json.dump(data, f, indent=4)
            print(f"[{station_id}] Successfully saved data for {year}-{month:02d} to {filepath}")

    except aiohttp.ClientError as e:
        print(f"[{station_id}] aiohttp error while requesting data for {year}-{month:02d}: {e}")
    except json.JSONDecodeError as e:
        print(f"[{station_id}] JSON decode error while parsing response for {year}-{month:02d}: {e}")
    except Exception as e:
        print(f"[{station_id}] Unknown error while processing data for {year}-{month:02d}: {e}")

async def process_station(session, api_key, station_id, start_year, end_year):
    """
    Request weather data for all months in the given year range for a single weather station.

    Parameters:
    - session: aiohttp.ClientSession, aiohttp client session
    - api_key: str, your Weather API key
    - station_id: str, weather station ID
    - start_year: int, start year
    - end_year: int, end year

    Returns:
    - None

    Sample Usage:
    await process_station(session, "APIKEY", "STATION123", 2020, 2024)
    """
    tasks = []
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):
            task = asyncio.create_task(fetch_and_save_weather_data_async(session, year, month, api_key, station_id))
            tasks.append(task)
    await asyncio.gather(*tasks)
    print(f"[{station_id}] Completed all year requests.")

async def main(api_key, station_ids, start_year, end_year, concurrency_limit=10):
    """
    Concurrently request weather data for all provided weather stations.

    Parameters:
    - api_key: str, your Weather API key
    - station_ids: list of str, list of weather station IDs
    - start_year: int, start year
    - end_year: int, end year
    - concurrency_limit: int, maximum number of concurrent requests

    Returns:
    - None

    Sample Usage:
    await main("APIKEY", ["STATION1", "STATION2"], 2020, 2024, concurrency_limit=10)
    """
    async with aiohttp.ClientSession() as session:
        semaphore = asyncio.Semaphore(concurrency_limit)  # Limit concurrency

        async def limited_process_station(station_id):
            async with semaphore:
                await process_station(session, api_key, station_id, start_year, end_year)

        station_tasks = [limited_process_station(station_id) for station_id in station_ids]
        await asyncio.gather(*station_tasks)

if __name__ == "__main__":
    api_key = "YOUR API HERE"  # Replace with your actual API key
    start_year = 2020
    end_year = 2024
    concurrency_limit = 100  # Adjust concurrency based on network and API limits

    # Read station IDs from CSV file
    ids = pd.read_csv("data/station_ids.csv")
    station_ids = ids["station_id"].tolist()
    # station_ids = ["KCAANGEL3"]  # Replace with your list of 11,141 station IDs

    # Measure execution time
    start_time = time.time()

    asyncio.run(main(api_key, station_ids, start_year, end_year, concurrency_limit))

    end_time = time.time()
    elapsed_time = end_time - start_time

    print(f"All weather station data requests and saves completed, total elapsed time: {elapsed_time:.2f} seconds")

## 3 Format Weather data: Daily to Month

In [None]:
import os
import json
import pandas as pd
import threading
import glob
import pyarrow.feather as feather

def optimized_json_formatter(data: dict) -> pd.DataFrame:
    """
    Optimized JSON formatter function to convert API response into a DataFrame.

    Parameters:
    - data: dict, JSON response from the API

    Returns:
    - df: pd.DataFrame, formatted DataFrame containing observations

    Sample Usage:
    df = optimized_json_formatter(json_data)
    """
    observations = data.get("observations", [])
    if not observations:
        return pd.DataFrame()

    records = []
    for obs in observations:
        record = {
            "stationID": obs.get("stationID"),
            "obsTimeLocal": pd.to_datetime(obs.get("obsTimeLocal")),
            "lat": obs.get("lat"),
            "lon": obs.get("lon"),
            "precipRate": obs.get("imperial", {}).get("precipRate"),
        }

        # Include fields containing "avg" in their names
        for key, value in obs.items():
            if "avg" in key.lower():
                record[key] = value
        for key, value in obs.get("imperial", {}).items():
            if "avg" in key.lower():
                record[key] = value
        records.append(record)

    return pd.DataFrame(records)

def process_station_data(json_file_pattern, output_dir):
    """
    Process all JSON files for a single weather station, merge them into a DataFrame,
    and save as a Feather file.

    Parameters:
    - json_file_pattern: str, glob pattern matching all JSON files of the station
    - output_dir: str, directory to save the Feather file

    Returns:
    - None

    Sample Usage:
    process_station_data("data/weather_json/STATION123_*.json", "data/weather_month")
    """
    all_records = []
    json_files = glob.glob(json_file_pattern)
    station_id = None

    for json_file in json_files:
        try:
            with open(json_file, 'r') as f:
                data = json.load(f)
            df = optimized_json_formatter(data)
            if not df.empty:
                all_records.append(df)
            if station_id is None and not df.empty:
                station_id = df['stationID'].iloc[0]
        except FileNotFoundError:
            print(f"Error: File not found {json_file}")
        except json.JSONDecodeError:
            print(f"Error: Failed to decode JSON file {json_file}")
        except Exception as e:
            print(f"Error while processing file {json_file}: {e}")

    if all_records and station_id:
        combined_df = pd.concat(all_records, ignore_index=True)
        output_feather_path = os.path.join(output_dir, f"{station_id}.feather")
        try:
            feather.write_feather(combined_df, output_feather_path)
            print(f"Successfully saved {station_id} data to {output_feather_path}")
        except Exception as e:
            print(f"Error while saving Feather file for {station_id}: {e}")

def daily_to_month_main(json_dir, output_feather_dir, num_threads=5):
    """
    Main function to traverse the JSON file directory and create threads for each weather station.

    Parameters:
    - json_dir: str, directory containing all JSON files
    - output_feather_dir: str, directory to save Feather files
    - num_threads: int, number of concurrent threads to process data

    Returns:
    - None

    Sample Usage:
    daily_to_month_main("data/weather_json", "data/weather_month", 10)
    """
    os.makedirs(output_feather_dir, exist_ok=True)
    station_ids = set()
    json_files = glob.glob(os.path.join(json_dir, "*.json"))
    for file in json_files:
        try:
            station_id = file.split(os.sep)[-1].split('_')[0]
            station_ids.add(station_id)
        except Exception:
            print(f"Failed to parse filename: {file}")

    threads = []
    for station_id in sorted(list(station_ids)):
        output_feather_path = os.path.join(output_feather_dir, f"{station_id}.feather")
        if os.path.exists(output_feather_path):
            print(f"{output_feather_path} already exists.")
            continue

        json_file_pattern = os.path.join(json_dir, f"{station_id}_*.json")
        thread = threading.Thread(target=process_station_data, args=(json_file_pattern, output_feather_dir))
        threads.append(thread)
        thread.start()
        if len(threads) >= num_threads:
            for thread in threads:
                thread.join()
            threads = []

    # Wait for any remaining threads to finish
    for thread in threads:
        thread.join()

    print("All weather station JSON files have been converted to Feather files.")

json_data_dir = "data/weather_json"  # Replace with your JSON file directory
output_feather_dir = "data/weather_month"  # Replace with your desired Feather file directory
num_processing_threads = 50  # Adjust the number of threads based on your CPU cores

daily_to_month_main(json_data_dir, output_feather_dir, num_processing_threads)

## 4 Abstract zipcode list

In [None]:
import pandas as pd
import pgeocode
from tqdm import tqdm
import pickle

# Load raw data
raw1 = pd.read_csv("PGE_2020-2024_Data/PGE_Combined_2020_2024.csv")  # Assume your raw data is in this CSV file
raw = raw1.copy()

# Get unique list of ZIP codes
zipcodes = raw['ZIPCODE'].unique()

def zipcode_to_latlon(zipcode, country='US'):
    """
    Convert a ZIP code to its latitude and longitude using pgeocode.

    Parameters:
    - zipcode: str or int, the ZIP code to convert
    - country: str, country code (default: 'US')

    Returns:
    - latitude: float or None, the latitude of the ZIP code
    - longitude: float or None, the longitude of the ZIP code

    Sample Usage:
    lat, lon = zipcode_to_latlon('94103')
    """
    nomi = pgeocode.Nominatim(country)
    location = nomi.query_postal_code(zipcode)
    if location.empty or pd.isnull(location.latitude):
        return None, None
    return location.latitude, location.longitude

def convert_zipcode_to_latlon(zipcodes):
    """
    Convert a list of ZIP codes to a list of (latitude, longitude) tuples.

    Parameters:
    - zipcodes: list of str or int, ZIP codes to convert

    Returns:
    - results: list of tuples, each tuple contains (latitude, longitude)

    Sample Usage:
    results = convert_zipcode_to_latlon(['94103', '10001'])
    """
    results = []
    for zipcode in tqdm(zipcodes, desc="Processing ZIPCODE"):
        lat, lon = zipcode_to_latlon(zipcode)
        results.append((lat, lon))
    return results

# Create a dictionary mapping ZIP codes to (latitude, longitude)
zipcode_lat_lon_dict = {}
for zipcode in tqdm(zipcodes, desc="Processing ZIPCODE"):
    lat, lon = zipcode_to_latlon(zipcode)
    if lat is not None and lon is not None:
        zipcode_lat_lon_dict[zipcode] = (lat, lon)
    else:
        print(f"ZIPCODE {zipcode} could not be converted to latitude and longitude.")

# Save the dictionary to a pickle file
with open("data/zipcode_lat_lon_dict.pkl", "wb") as f:
    pickle.dump(zipcode_lat_lon_dict, f)

print("ZIP code to latitude/longitude conversion completed and saved to data/zipcode_lat_lon_dict.pkl.")

## 5 Merge monthly weather data and pivot

In [None]:
import os
import glob
import pandas as pd
import pyarrow.feather as feather

def load_and_merge_feather(input_dir="data/weather_month"):
    """
    Load all Feather files from the specified directory and merge them into a single DataFrame.

    Parameters:
    - input_dir: str, path to the directory containing Feather files

    Returns:
    - merged_df: pd.DataFrame, combined DataFrame of all Feather files; empty DataFrame if none found

    Sample Usage:
    weather_raw = load_and_merge_feather("data/weather_month")
    """
    all_dfs = []
    feather_files = glob.glob(os.path.join(input_dir, "*.feather"))
    print(f"Found {len(feather_files)} Feather files.")
    for file in feather_files:
        try:
            df = feather.read_feather(file)
            all_dfs.append(df)
        except Exception as e:
            print(f"Error loading {file}: {e}")

    if all_dfs:
        merged_df = pd.concat(all_dfs, ignore_index=True)
        return merged_df
    else:
        return pd.DataFrame()

# Load and merge weather data
weather_raw = load_and_merge_feather()

if weather_raw.empty:
    print("No weather data found.")
else:
    print("Successfully loaded and merged weather data.")
    print(f"Merged weather data shape: {weather_raw.shape}")

weather = weather_raw.copy()

# Prepare monthly weather summary
weather_monthly = weather.copy()
weather_monthly['year_month'] = weather_monthly['obsTimeLocal'].dt.strftime('%Y-%m')
weather_monthly = weather_monthly.drop(columns=['obsTimeLocal'])
print(weather_monthly['year_month'].head())

# Group by station and month, calculate monthly averages
weather_monthly = weather_monthly.groupby(['stationID', 'year_month']).mean(numeric_only=True).reset_index()
weather_monthly = weather_monthly[['stationID', 'year_month', 'lat', 'lon'] + [col for col in weather_monthly.columns if col not in ['stationID', 'year_month', 'lat', 'lon']]]
print(f"Weather data after pivoting shape: {weather_monthly.shape}")
print(weather_monthly.head())

# Split year_month into separate YEAR and MONTH columns
weather_monthly[['YEAR', 'MONTH']] = weather_monthly['year_month'].str.split('-', expand=True)
weather_monthly['MONTH'] = weather_monthly['MONTH'].astype(int)
weather_monthly = weather_monthly.drop(columns=['year_month'])
print("Created YEAR and MONTH columns and removed year_month column.")
print(f"Final weather data shape: {weather_monthly.shape}")
print(weather_monthly.head())

# Save the processed monthly weather data to Feather format
weather_monthly.to_feather("data/weather.feather")
print("Saved final monthly weather data to data/weather.feather.")

## 6 map stationID to lat, lon

In [None]:
import pickle

# Get unique list of stationID, lat, lon
station_loc = weather_monthly[['stationID', 'lat', 'lon']].drop_duplicates()

# Create a mapping from stationID to (lat, lon)
station_loc_mapping = {}
for _, row in station_loc.iterrows():
    station_id = row['stationID']
    lat = row['lat']
    lon = row['lon']
    station_loc_mapping[station_id] = (lat, lon)

# Save the mapping to a pickle file
with open("data/station_loc_mapping.pkl", "wb") as f:
    pickle.dump(station_loc_mapping, f)

print("Saved stationID to (lat, lon) mapping to data/station_loc_mapping.pkl.")

## 7 Map zipcode to top 10 nearest stationIDs

In [None]:
import pickle
from geopy.distance import geodesic
import concurrent.futures
from tqdm import tqdm

# Load ZIP code to (lat, lon) mapping
with open("data/zipcode_lat_lon_dict.pkl", "rb") as f:
    zipcode_lat_lon_dict = pickle.load(f)

# Load stationID to (lat, lon) mapping
with open("data/station_loc_mapping.pkl", "rb") as f:
    station_loc_mapping = pickle.load(f)

def find_top_nearest(lat_lon_pair, station_loc_mapping, top_n):
    """
    Find the top N nearest weather stations to a given (latitude, longitude) pair.

    Parameters:
    - lat_lon_pair: tuple, (latitude, longitude)
    - station_loc_mapping: dict, mapping from stationID to (latitude, longitude)
    - top_n: int, number of nearest stations to return

    Returns:
    - nearest_stations: tuple, top N nearest station IDs

    Sample Usage:
    nearest = find_top_nearest((37.7749, -122.4194), station_loc_mapping, 5)
    """
    lat, lon = lat_lon_pair
    distances = [
        (geodesic((lat, lon), coords).km, station_id)  # (distance, station_id)
        for station_id, coords in station_loc_mapping.items()
    ]
    # Get the top N nearest station IDs
    return tuple(station_id for _, station_id in sorted(distances, key=lambda x: x[0])[:top_n])

def lat_lon_map_to_weather_multithread(zip_lat_lon_mapping,
                                       station_loc_mapping,
                                       top_n,
                                       max_workers=32):
    """
    Map ZIP codes to their nearest weather stations using multithreading.

    Parameters:
    - zip_lat_lon_mapping: dict, mapping from ZIP code to (latitude, longitude)
    - station_loc_mapping: dict, mapping from stationID to (latitude, longitude)
    - top_n: int, number of nearest stations to find
    - max_workers: int, maximum number of threads

    Returns:
    - results: dict, mapping from ZIP code to tuple of nearest station IDs

    Sample Usage:
    results = lat_lon_map_to_weather_multithread(zipcode_lat_lon_dict, station_loc_mapping, 10, 50)
    """
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(find_top_nearest, lat_lon, station_loc_mapping, top_n): zipcode
            for zipcode, lat_lon in zip_lat_lon_mapping.items()
        }
        for future in tqdm(concurrent.futures.as_completed(futures),
                           total=len(futures), desc="Processing ZIPCODE"):
            zipcode = futures[future]
            results[zipcode] = future.result()  # tuple of nearest station IDs
    return results

# Run the mapping process
near_lat_lon = lat_lon_map_to_weather_multithread(
    zipcode_lat_lon_dict, station_loc_mapping, top_n=10, max_workers=50)

# Save the results to a pickle file
with open("data/near_lat_lon.pkl", "wb") as f:
    pickle.dump(near_lat_lon, f)

print("Saved ZIP code to nearest station mapping to data/near_lat_lon.pkl.")


## 8 Merge weather data with main data

In [None]:
import pickle
import pandas as pd
import pyarrow.feather as feather

with open("data/near_lat_lon.pkl", "rb") as f:
    near_lat_lon = pickle.load(f)
weather = pd.read_feather("data/weather.feather")
raw = pd.read_csv("PGE_2020-2024_Data/PGE_Combined_2020_2024.csv")

import pandas as pd
import numpy as np
from typing import Tuple, Dict, List
from tqdm import tqdm
import os

def fill_weather_to_raw(weather: pd.DataFrame, raw: pd.DataFrame, near_lat_lon: Dict[str, Tuple[str]], save_path: str = "data/merged.feather") -> Tuple[pd.DataFrame, List[str]]:
    """
    Fills weather data into raw DataFrame based on nearest weather stations, matching by year and month.
    Supports breakpoint continuation by saving progress to a Feather file.
    
    Args:
        weather: DataFrame with weather data including stationID, YEAR, MONTH, and weather metrics
        raw: DataFrame with ZIPCODE, YEAR, MONTH, and other columns
        near_lat_lon: Dictionary mapping ZIPCODE to tuple of nearest stationIDs (ordered nearest to farthest)
        save_path: Path to save the merged DataFrame for breakpoint continuation (default: 'data/merged.feather')
    
    Returns:
        merged_df: Raw DataFrame with added weather columns
        zipcodes_missing_weather: List of ZIPCODEs where no weather data could be matched
    """
    # Ensure the save directory exists
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    
    # Weather columns to merge (excluding stationID, lat, lon, YEAR, MONTH)
    weather_cols = weather.columns.difference(['stationID', 'lat', 'lon', 'YEAR', 'MONTH']).tolist()
    
    # Convert YEAR in raw to string for consistency with weather
    raw = raw.copy()
    raw['YEAR'] = raw['YEAR'].astype(str)
    
    # Check if a saved Feather file exists for continuation
    if os.path.exists(save_path):
        print(f"Resuming from saved file: {save_path}")
        merged_df = pd.read_feather(save_path)
        # Ensure all weather columns exist in the loaded DataFrame
        for col in weather_cols:
            if col not in merged_df.columns:
                merged_df[col] = np.nan
    else:
        # Initialize output DataFrame
        merged_df = raw.copy()
        for col in weather_cols:
            merged_df[col] = np.nan
    
    # Track ZIPCODEs with no weather data
    zipcodes_missing_weather = []
    
    # Group weather data by stationID, YEAR, MONTH for efficient lookup
    weather_grouped = weather.groupby(['stationID', 'YEAR', 'MONTH'])[weather_cols].mean().reset_index()
    
    # Identify rows that still need processing (where all weather columns are NaN)
    rows_to_process = merged_df[merged_df[weather_cols].isna().all(axis=1)]
    
    # For each row that needs processing with progress bar
    for idx, row in tqdm(rows_to_process.iterrows(), total=rows_to_process.shape[0], desc="Processing rows"):
        zipcode = str(row['ZIPCODE'])
        year = row['YEAR']
        month = row['MONTH']
        
        if zipcode not in near_lat_lon:
            zipcodes_missing_weather.append(zipcode)
            continue
        
        # Get the list of nearest stations
        stations = near_lat_lon[zipcode]
        weather_found = False
        
        # Try strategies in order
        for station in stations:
            # Strategy 1: Same station, same year, same month
            match = weather_grouped[
                (weather_grouped['stationID'] == station) &
                (weather_grouped['YEAR'] == year) &
                (weather_grouped['MONTH'] == month)
            ]
            if not match.empty:
                for col in weather_cols:
                    merged_df.at[idx, col] = match.iloc[0][col]
                weather_found = True
                break
        
        if weather_found:
            continue
            
        # Strategy 2: Same station, near years, same month
        for station in stations:
            near_years = [str(int(year) + i) for i in [-1, 1, -2, 2]]  # Try ±1, ±2 years
            for near_year in near_years:
                match = weather_grouped[
                    (weather_grouped['stationID'] == station) &
                    (weather_grouped['YEAR'] == near_year) &
                    (weather_grouped['MONTH'] == month)
                ]
                if not match.empty:
                    for col in weather_cols:
                        merged_df.at[idx, col] = match.iloc[0][col]
                    weather_found = True
                    break
            if weather_found:
                break
        
        if weather_found:
            continue
            
        # Strategy 3: Next nearest station, same year, same month, etc.
        for station in stations[1:]:  # Skip the first station
            match = weather_grouped[
                (weather_grouped['stationID'] == station) &
                (weather_grouped['YEAR'] == year) &
                (weather_grouped['MONTH'] == month)
            ]
            if not match.empty:
                for col in weather_cols:
                    merged_df.at[idx, col] = match.iloc[0][col]
                weather_found = True
                break
            if weather_found:
                break
        
        if not weather_found:
            zipcodes_missing_weather.append(zipcode)
        
        # Save progress to Feather file every 100 rows
        if idx % 100 == 0:
            merged_df.to_feather(save_path)
    
    # Save final merged DataFrame
    merged_df.to_feather(save_path)
    
    # Remove duplicates from zipcodes_missing_weather
    zipcodes_missing_weather = list(set(zipcodes_missing_weather))
    
    return merged_df, zipcodes_missing_weather

merged_df, zipcodes_missing_weather = fill_weather_to_raw(
    weather=weather,
    # try 5 rows first
    raw=raw,
    near_lat_lon=near_lat_lon,
    save_path="data/merged.feather"
)
merged_df.head()