# Skill profiling

This notebook will do some general skill profiling of the analog forecast.  

In [1]:
import time
from itertools import product
from multiprocessing.pool import Pool
import numpy as np
import pandas as pd
import xarray as xr
# import dask
# from dask.distributed import Client

# local
from analog_forecast import make_forecast, spatial_subset, run_rmse_over_time
from config import data_dir
import luts

### Goal

The goal is to compute the error for the analog forecast method and a naive forecast method, for 50 dates. For now, those dates will be randomly chosen, but this workflow may be adapted to accept user supplied dates. The product here should be a table of results - errors between the forecast and "observed" values.

Some facts:

* forecasts will be made for the 14 days post-reference date
* forecasts will use 5 analogs
* a forecast for a given date is the mean of the corresponding subsequent dates across all analogs
* we are not mixing variables or spatial domains or weighting

### Processing strategy

We have some large data files - daily data for the northern hemisphere for our variables of interest - that will end up being read completely into memory because of the search of analogs over the entire time series. Additionally, the naive forecasting will be sampling many of the time steps. Being ~45GB (or ~23GB for the raw (i.e. non-anomaly-based) files), it will make sense to read the dataset completely into memory and then iterate over the possible groups. So we will iterate over the data files at the lowest level, which are grouped by variable and data type (raw vs anomaly).

### Naive profiling

I believe we only need to simulate the naive forecasts for each domain and variable, not for every reference date. This assumes that the distribution of "skill" (RMSE for now) for the naive forecast is the same for every day of the year. For each spatial domain and variable, we are attempting to simulate the distribution of a naive forecast skill based on selecting uniformly random analogs from the complete historical time series. 

So, we can create a table of naive forecast skill for all combinations of spatial domain and variable, which can then be joined with a table of analog forecast results for useful comparisons. Define some functions to do this that can be used to iterate over all combinations:

In [15]:
def forecast_and_error(sub_da, times, ref_date):
    forecast = make_forecast(sub_da, times, ref_date)
    err = sub_da.sel(time=forecast.time.values) - forecast
    return (err ** 2).mean(axis=(1, 2)).drop("time")


def _find_analogs(da, ref_date):
    """Differs from module implementation of same name to operate on already loaded dataset"""
    # compute RMSE between ref_date and all preceding dates 
    #  for the specified variable and spatial domain
    rmse_da = run_rmse_over_time(da, ref_date)
    # sort before dropping duplicated years
    rmse_da = rmse_da.sortby(rmse_da)
    # drop duplicated years.
    # This is being done because analogs might occur in the same year as the
    #  reference date and notes from meetings with collaborators indicate that
    #  there should only be one analog per year, as was the case for the
    #  previous iteration of the algorithm.
    keep_indices = ~pd.Series(rmse_da.time.dt.year).duplicated()
    analogs = rmse_da.isel(time=keep_indices)
    # subset to first 5 analogs for now
    analogs = analogs.isel(time=slice(5))

    return analogs


def run_analog_forecast(da, ref_date):
    analogs = _find_analogs(da, ref_date)
    analog_error = forecast_and_error(da, analogs.time.values, ref_date)
    
    # start a table entry
    err_df = pd.DataFrame({
        "reference_date": ref_date,
        "forecast_day_number": np.arange(14) + 1,
        "forecast_error": analog_error.values,
    })
    
    return err_df


def profile_analog_forecast(da, ref_dates, ncpus=2):
    """Profile the analog forecast for a given data array at the different ref_dates.
    Return a dataframe of results.
    """
    results = [run_analog_forecast(sub_da, date) for date in ref_dates]
    err_df = pd.concat(results)
    
    # these attributes are constant for all reference dates
    err_df["variable"] = varname
    err_df["spatial_domain"] = spatial_domain
    err_df["anomaly_search"] = use_anom
    # reorder columns
    err_df = err_df[[
        "variable", 
        "spatial_domain", 
        "anomaly_search", 
        "reference_date", 
        "forecast_day_number", 
        "forecast_error"
    ]]
    
    return err_df


def get_naive_sample_dates(all_times, naive_ref_date):
    """Constructs list of all dates to be queried in some fashion for an instance of the naive forecast"""
    analog_times = list(np.random.choice(all_times, 5, replace=False))
    naive_ref_date = pd.to_datetime(naive_ref_date + " 12:00:00").to_numpy()
    # if any sample times are closer than 15 days, re-sample
    while np.any((np.diff(sorted(analog_times + [naive_ref_date])) / (10**9 * 60 * 60 * 24)).astype(int) <= 14):
        analog_times = list(np.random.choice(all_times, 5, replace=False))
    
    all_dates = []
    for t in analog_times + [naive_ref_date]:
        all_dates.extend(pd.date_range(t, t + pd.to_timedelta(14, unit="d")))
    
    return all_dates, analog_times


def profile_naive_forecast(da, n=1000, ncpus=16):
    """Profiles the naive forecast method using a single data array with time, latitude, and longitude dimensions.
    Return a dataframe of results.
    """
    results = []
    for i in range(n):
        # significant speed-up in pooling achieved by first sub-selecting the times of interest from in-memory datarray
        #  times of interest will be the naive analog dates, the reference date, and the 14 days after all of them.
        # (not sure if the above really applies with non-Pool-based method now, but it shouldn't hurt)
        all_naive_dates, naive_analog_dates = get_naive_sample_dates(sub_da.time.values[:-15], naive_ref_date)
        results.append(forecast_and_error(sub_da.sel(time=all_naive_dates), naive_analog_dates, naive_ref_date))
    
    sim_rmse = xr.concat(results, pd.Index(range(n), name="sim"))

    err_df = pd.DataFrame({
        "variable": da.name,
        "spatial_domain": spatial_domain,
        "anomaly_search": use_anom,
        "forecast_day_number": np.arange(14) + 1,
        "naive_2.5": sim_rmse.reduce(np.percentile, dim="sim", q=2.5),
        "naive_50": sim_rmse.reduce(np.percentile, dim="sim", q=50),
        "naive_97.5": sim_rmse.reduce(np.percentile, dim="sim", q=97.5),
    })
    
    return err_df

Set the reference dates for the analog forecasting, and take the first one to be the reference date for the naive forecast simulation.

In [3]:
# open arbitrary dataset to get a constant set of reference dates for analog profiling
with xr.open_dataset(data_dir.joinpath("era5_2m_temperature_hour12_1959_2021.nc")) as ds:
    np.random.seed(907)
    ref_dates = np.random.choice(ds.time.values[:-15], 10, replace=False)
    ref_dates = [pd.to_datetime(date).strftime("%Y-%m-%d") for date in ref_dates]
    
# arbitrary reference date for naive forecasts
naive_ref_date = ref_dates[0]

Iterate over the variable/data type combinations, load the data, then iterate over spatial domains for both forecasts, iterating over reference dates only for the analog forecasts.

In [18]:
%%time
# accumulators for results
naive_results = []
analog_results = []

tic = time.perf_counter()
for varname, use_anom in product(luts.varnames_lu.keys(), [True, False]):
    fp_lu_key = {True: "anom_filename", False: "filename"}[use_anom]
    fp = data_dir.joinpath(luts.varnames_lu[varname][fp_lu_key])
    ds = xr.load_dataset(fp)
    
    for spatial_domain in luts.spatial_domains:
        bbox = luts.spatial_domains[spatial_domain]["bbox"]
        sub_da = spatial_subset(ds[varname], bbox)
        
        # profile the analog forecast by computing for all dates
        analog_results.append(profile_analog_forecast(sub_da, ref_dates))
        # profile the naive forecast
        naive_results.append(profile_naive_forecast(sub_da))
    print(f"{fp} done, {round((time.perf_counter() - tic) / 60)}m elapsed")

/atlas_scratch/kmredilla/analog_forecast/era5_2m_temperature_anom_1959_2021.nc done, 20m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_2m_temperature_hour12_1959_2021.nc done, 43m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_mean_sea_level_pressure_anom_1959_2021.nc done, 63m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_mean_sea_level_pressure_hour12_1959_2021.nc done, 91m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_sea_surface_temperature_anom_1959_2021.nc done, 114m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_sea_surface_temperature_hour12_1959_2021.nc done, 140m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_geopotential_anom_1959_2021.nc done, 162m elapsed
/atlas_scratch/kmredilla/analog_forecast/era5_geopotential_hour12_1959_2021.nc done, 187m elapsed
CPU times: user 1h 31min 34s, sys: 1h 34min 17s, total: 3h 5min 51s
Wall time: 3h 7min 12s


In [57]:
naive_df = pd.concat(naive_results)
analog_df = pd.concat(analog_results)

In [58]:
analog_df.round(3).to_csv("analog_profiling_results.csv", index=False)
naive_df.round(3).to_csv("naive_profiling_results.csv", index=False)