In [1]:
import pandas as pd
import numpy as np
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
from tqdm import tqdm
from collections import defaultdict
import requests
import ipyparallel as ipp
import geopandas as gpd
from shapely.geometry import Point
import matplotlib.pyplot as plt
import math
import time
from sklearn.model_selection import KFold
import seaborn as sns
from sklearn.metrics import r2_score
import logging
logging.getLogger("cmdstanpy").setLevel(logging.ERROR)
import itertools

In [5]:
def fetch_counts_for_year(year):
    counts_for_year = defaultdict(int)
    offset = 0
    limit = 10000
    while True:
        url = (
            f"https://data.cnra.ca.gov/api/3/action/datastore_search?"
            f"resource_id=bfa9f262-24a1-45bd-8dc8-138bc8107266"
            f"&q={year}&limit={limit}&offset={offset}"
        )
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            records = data['result']['records']
            for record in records:
                site_code = record.get('site_code')
                if record.get('gse_gwe') is None:
                    continue
                if site_code:
                    counts_for_year[site_code] += 1
            if len(records) < limit:
                break
            offset += limit
        else:
            print(f"Failed to fetch data for year {year}")
            break
    return counts_for_year

rc = ipp.Client()
dview = rc[:]

with dview.sync_imports():
    import requests
    from collections import defaultdict

dview.push({'fetch_counts_for_year': fetch_counts_for_year})

years = list(range(2000, 2025))

async_results = dview.map_async(fetch_counts_for_year, years)
results = async_results.get()

counts = defaultdict(lambda: defaultdict(int))
for year, year_counts in zip(years, results):
    for site_code, count in year_counts.items():
        counts[site_code][year] = count

obs_df = pd.DataFrame.from_dict(counts, orient='index')
obs_df = obs_df.fillna(0).astype(int)
obs_df.columns = [f'observations_{year}' for year in obs_df.columns]
obs_df = obs_df.reset_index().rename(columns={'index': 'site_code'})

importing requests on engine(s)
importing defaultdict from collections on engine(s)


In [6]:
columns_to_keep = ['site_code'] + [
    col for col in obs_df.columns 
    if col.startswith('observations_') and int(col.split('_')[1]) >= 2008
]

ov25 = obs_df[columns_to_keep]

In [7]:
observation_columns = [col for col in ov25.columns if col.startswith('observations_')]
ov25 = ov25[ov25[observation_columns].ge(25).all(axis=1)]

In [8]:
def fetch_all_records(site_code, year):
    all_records = []
    offset = 0
    limit = 1000
    while True:
        url = (
            f"https://data.cnra.ca.gov/api/3/action/datastore_search?"
            f"resource_id=bfa9f262-24a1-45bd-8dc8-138bc8107266"
            f"&q={site_code} {year}&limit={limit}&offset={offset}"
        )
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            records = data['result']['records']
            all_records.extend(records)
            if len(records) < limit:
                break
            offset += limit
        else:
            print(f"Failed to fetch data for {site_code} in {year}")
            break
    return all_records

def get_readings_for_site(site_code, years):
    all_readings = []
    for year in years:
        records = fetch_all_records(site_code, year)
        for record in records:
            reading = {
                'site_code': record.get('site_code'),
                'msmt_date': record.get('msmt_date'),
                'wlm_rpe': record.get('wlm_rpe'),
                'wlm_gse': record.get('wlm_gse'),
                'gwe': record.get('gwe')
            }
            all_readings.append(reading)
    return pd.DataFrame(all_readings)

In [None]:
rc = ipp.Client(timeout=1200)
dview = rc[:]

dview.push({
    'fetch_all_records': fetch_all_records,
    'get_readings_for_site': get_readings_for_site
})

dview.execute("""
import pandas as pd
import numpy as np
import requests
import itertools
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
from tqdm import tqdm
""")

years = list(range(2008, 2025))
site_codes = ov25['site_code'].unique()

def load_site_data(site_code, years):
    return get_readings_for_site(site_code, years)

from tqdm import tqdm
site_data = list(tqdm(
    dview.imap(load_site_data, site_codes, [years]*len(site_codes)),
    total=len(site_codes),
    desc="Loading site data"
))

In [2]:
data_dict = dict(zip(site_codes, site_data))
dview.push({'data_dict': data_dict})

def process_site(site_code, years, cps, sps):
    try:
        df = data_dict.get(site_code)
        if df is None or df.empty:
            return None

        df['msmt_date'] = pd.to_datetime(df['msmt_date'])
        df = df[['msmt_date', 'wlm_rpe']].rename(columns={'msmt_date': 'ds', 'wlm_rpe': 'y'})
        df = df.sort_values('ds')

        model = Prophet(changepoint_prior_scale=cps,
                        seasonality_prior_scale=sps,
                        seasonality_mode='additive',
                        yearly_seasonality=True)
        model.fit(df)
        
        cv_results = cross_validation(model,
                                      initial='730 days',
                                      period='90 days',
                                      horizon='90 days',
                                      parallel="processes")
        metrics_df = performance_metrics(cv_results)
        avg_metrics = metrics_df.mean()
        
        y_full = df['y'].values
        scale_mase = np.mean(np.abs(np.diff(y_full)))
        scale_rmsse = np.mean((np.diff(y_full))**2)
        
        mae_cv = np.mean(np.abs(cv_results['y'] - cv_results['yhat']))
        mse_cv = np.mean((cv_results['y'] - cv_results['yhat'])**2)
        
        mase_val = mae_cv / scale_mase if scale_mase != 0 else np.nan
        rmsse_val = np.sqrt(mse_cv / scale_rmsse) if scale_rmsse != 0 else np.nan
        
        avg_metrics_dict = avg_metrics.to_dict()
        avg_metrics_dict['mase'] = mase_val
        avg_metrics_dict['rmsse'] = rmsse_val
        
        return avg_metrics_dict
    except Exception as e:
        return None

dview.push({'process_site': process_site})

param_grid = {
    'changepoint_prior_scale': [0.001, 0.005, 0.01, 0.05, 0.1, 0.5],
    'seasonality_prior_scale': [0.01, 0.1, 0.5, 1.0, 10.0]
}

aggregated_results = []
grid = list(itertools.product(param_grid['changepoint_prior_scale'],
                              param_grid['seasonality_prior_scale']))

for cps, sps in tqdm(grid, total=len(grid), desc="Hyperparameter grid search"):
    async_results = dview.map_async(process_site, site_codes,
                                    [years]*len(site_codes),
                                    [cps]*len(site_codes),
                                    [sps]*len(site_codes))
    site_results = async_results.get()
    
    collected = {}
    n_valid = 0
    for res in site_results:
        if res is not None:
            n_valid += 1
            for key, value in res.items():
                collected.setdefault(key, []).append(value)
    
    if n_valid > 0:
        avg_metrics = {key: np.mean(vals) for key, vals in collected.items()}
        avg_metrics.update({
            'changepoint_prior_scale': cps,
            'seasonality_prior_scale': sps,
            'seasonality_mode': 'additive',
            'n_sites': n_valid
        })
        aggregated_results.append(avg_metrics)
    else:
        print(f"No valid results for parameters: cps={cps}, sps={sps}")

results_df = pd.DataFrame(aggregated_results)
results_df = results_df.sort_values('mase')

print("Aggregated Cross Validation Grid Search Results (averaged over sites):")
print(results_df)
print("\nBest Parameters based on aggregated MASE:")
print(results_df.iloc[0])

NameError: name 'site_codes' is not defined