# ECMWF++

Ensembling and debiasing of ECMWF forecasts

In [None]:
import os, sys
# Ensure notebook is being run from base repository directory
import os, sys
try:
    os.chdir("/home/{}/forecast_rodeo_ii".format(os.environ["USER"]))
except Exception as err:
    print(f"Warning: unable to change directory; {repr(err)}")
from subseasonal_toolkit.utils.notebook_util import isnotebook
if isnotebook():
    # Autoreload packages that are modified
    %load_ext autoreload
    %autoreload 2
else:
    from argparse import ArgumentParser
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from subseasonal_data.utils import get_measurement_variable
from subseasonal_toolkit.utils.general_util import printf, tic, toc
from subseasonal_toolkit.utils.experiments_util import (get_first_year, get_start_delta,
                                                        get_forecast_delta)
from subseasonal_toolkit.utils.models_util import (get_submodel_name, start_logger, log_params, get_forecast_filename,
                                                   save_forecasts)
from subseasonal_toolkit.utils.eval_util import get_target_dates, mean_rmse_to_score, save_metric
from subseasonal_toolkit.models.deb_ecmwf.ecmwf_utils import geometric_median, ssm

from subseasonal_data import data_loaders

import pdb

In [None]:
#
# Specify model parameters
#
model_name = "ecmwfpp"
if not isnotebook():
    # If notebook run as a script, parse command-line arguments
    parser = ArgumentParser()
    parser.add_argument("pos_vars",nargs="*")  # gt_id and horizon 
    parser.add_argument('--target_dates', '-t', default="std_test")
    # Fit intercept parameter if and only if this flag is specified
    parser.add_argument('--fit_intercept', '-i', default="False",
                        choices=['True', 'False'],
                        help="Fit intercept parameter if \"True\"; do not if \"False\"")    
    parser.add_argument('--train_years', '-y', default=20,
                       help="Number of years to use in debiasing (integer)")     
    parser.add_argument('--margin_in_days', '-m', default="None",
                       help="number of month-day combinations on either side of the target combination "
                            "to include when training; set to 0 include only target month-day combo; "
                            "set to None to include entire year")    
    parser.add_argument('--first_day', '-fd', default=1,
                        help="first available daily cfsv2 forecast (1 or greater) to average")
    parser.add_argument('--last_day', '-ld', default=1,
                        help="last available daily cfsv2 forecast (first_day or greater) to average")    
    parser.add_argument('--loss', '-l', default="mse", 
                        help="loss function: mse, rmse, skill, or ssm")
    parser.add_argument('--first_lead', '-fl', default=0, 
                        help="first ecmwf lead to average into forecast (0-29)")
    parser.add_argument('--last_lead', '-ll', default=29, 
                        help="last ecmwf lead to average into forecast (0-29)")
    parser.add_argument('--forecast_with', '-fw', default="c", 
                        help="Generate forecast using the control (c), "
                        "average perturbed (p), single perturbed (p1, ..., p50), "
                        "or perturbed-control ensemble (p+c) ECMWF forecast.")
    parser.add_argument('--debias_with', '-dw', default="c", 
                        help="Debias using the control (c), average perturbed (p), "
                        "or perturbed-control ensemble (p+c) ECMWF reforecast.")      
    
    args, opt = parser.parse_known_args()
    
    # Assign variables
    gt_id = args.pos_vars[0] # "contest_precip" or "contest_tmp2m"
    horizon = args.pos_vars[1] # "12w", "34w", or "56w"
    target_dates = args.target_dates
    fit_intercept = args.fit_intercept   
    if fit_intercept == "False":
        fit_intercept = False
    elif fit_intercept == "True":
        fit_intercept = True
    else:
        raise ValueError(f"unrecognized value {fit_intercept} for fit_intercept")    
    train_years = args.train_years
    if train_years != "all":
        train_years = int(train_years)
    else:
        train_yaers = np.inf
    if args.margin_in_days == "None":
        margin_in_days = None
    else:
        margin_in_days = int(args.margin_in_days)        
    first_day = int(args.first_day)
    last_day = int(args.last_day)        
    loss = args.loss
    first_lead = int(args.first_lead)
    last_lead = int(args.last_lead)
    debias_with = args.debias_with
    forecast_with = args.forecast_with 
else:
    # Otherwise, specify arguments interactively 
    gt_id = "us_precip_1.5x1.5"#"global_precip_p1_1.5x1.5" # us_precip_1.5x1.5, us_tmp2m_1.5x1.5, must use 1.5x1.5 forecasts for ecmwf
    horizon = "12w"
    target_dates = "std_paper_eval"
    fit_intercept = True    
    loss = "mse"
    train_years = 20
    margin_in_days = 28    
    if "tmp2m" in gt_id and (horizon == "12w"):
        first_day = 1
        last_day = 1        
        first_lead = 1
        last_lead = 1
    elif "precip" in gt_id and (horizon == "12w"):
        first_day = 1
        last_day = 1        
        first_lead = 1
        last_lead = 1
    elif "tmp2m" in gt_id and (horizon == "34w"):
        first_day = 1
        last_day = 7        
        first_lead = 15
        last_lead = 15
    elif "precip" in gt_id and (horizon == "34w"):
        first_day = 1
        last_day = 35        
        first_lead = 15
        last_lead = 15
    elif "tmp2m" in gt_id and (horizon == "56w"):
        first_day = 1
        last_day = 14         
        first_lead = 29
        last_lead = 29
    elif "precip" in gt_id and (horizon == "56w"):
        first_day = 1
        last_day = 21        
        first_lead = 29
        last_lead = 29
    debias_with = "p+c"
    forecast_with = "p+c"

In [None]:
""" 
Choose regression parameters and record standard settings
of these parameters
"""
x_cols = ['zeros']
if "tmp2m" in gt_id:
    base_col = 'ecmwf_tmp2m'
elif "precip" in gt_id:
    base_col = 'ecmwf_precip' 
group_by_cols = ['lat', 'lon']    

""" 
Process model parameters"""

# Get list of target date objects
target_date_objs = pd.Series(get_target_dates(
    date_str=target_dates, horizon=horizon))

# Identify measurement variable name
measurement_variable = get_measurement_variable(gt_id) # 'tmp2m' or 'precip'

# Column names for gt_col, clim_col and anom_col 
gt_col = measurement_variable
clim_col = measurement_variable+"_clim"
anom_col = get_measurement_variable(gt_id)+"_anom" # 'tmp2m_anom' or 'precip_anom'

# Store delta between target date and forecast issuance date
start_delta =  timedelta(days=get_start_delta(horizon, gt_id))
base_shift_delta = timedelta(days=get_forecast_delta(horizon))

In [None]:
# Don't save forecasts for years earlier than LAST_SAVE_YEAR
LAST_SAVE_YEAR = get_first_year(model_name) 

# Record model and submodel names
submodel_name = get_submodel_name(
    model_name, fit_intercept=fit_intercept,
    train_years=train_years, margin_in_days=margin_in_days,
    first_day=first_day, last_day=last_day, loss=loss, 
    first_lead=first_lead, last_lead=last_lead,
    forecast_with=forecast_with, debias_with=debias_with)
print(submodel_name)

if not isnotebook():
    # Save output to log file
    logger = start_logger(model=model_name,submodel=submodel_name, gt_id=gt_id,
                          horizon=horizon, target_dates=target_dates)
    
    # Store parameter values in log                                                                                                                        
    params_names = ['gt_id', 'horizon', 'target_dates',
                    'fit_intercept', 'train_years', 'margin_in_days',
                    'first_day', 'last_day', 'loss', 
                    'first_lead', 'last_lead', 'forecast_with', 'debias_with',
                    'base_col', 'x_cols', 'group_by_cols']    
    params_values = [eval(param) for param in params_names]
    log_params(params_names, params_values)
    
# Select estimator based on loss
if loss == "rmse":
    estimator = geometric_median 
elif loss == "ssm":
    estimator = ssm
else: 
    estimator = np.mean

In [None]:
# Load and process data
printf("Loading ecmwf data")
# Choose data shift based on horizon and first day to be averaged
base_shift = get_forecast_delta(horizon)

if "1.5x1.5" in gt_id:
    # Get the mask
    if gt_id.startswith('us_'):
        mask_df = data_loaders.get_us_mask(fname="us_1_5_mask.nc", sync=False)
        # Create list of forecast and reforecast data to load
        load_names = []
        if forecast_with == "p+c":
            forecast_prefix = "ef"
        else:
            # Construct forecast name prefix with format cf, pf, pf1, ..., pf50
            forecast_prefix = forecast_with[:1]+"f"+forecast_with[1:]
        load_names.append(forecast_prefix+"-forecast")
        if debias_with == "p+c":
            debias_prefix = "ef"
        else:
            # Construct forecast name prefix with format cf, pf
            debias_prefix = debias_with[:1]+"f"+debias_with[1:]
        load_names.append(debias_prefix+"-reforecast")            
    elif gt_id.startswith('global_'):
        # Global data
        mask_df = None    
        load_names = ["forecast", "reforecast"]
    else:
        raise ValueError(f"Unknown gt_id {gt_id}")
    
    """Load forecast and reforecast data."""     
    cols = ["iri_ecmwf_"+gt_id.split("_")[1]+"-{}.5d_shift{}".format(col, base_shift) 
            for col in range(first_lead, last_lead+1)]  
    
    load_data = {}
    for l in load_names:
        print("Loading data.")
        load_data[l] = data_loaders.get_forecast(
            f"ecmwf-{measurement_variable}-{gt_id.split('_')[0]}1_5-{l}", 
            mask_df=mask_df,
            shift=base_shift,
            sync=False)  
        
        # Undo-shifting of model_issuance_date so that it matches the start date
        if "reforecast" in l:
            try:
                load_data[l]['model_issuance_date'] = load_data[l][f'model_issuance_date_shift{base_shift}'] \
                    + timedelta(days=base_shift)        
            except:
                pdb.set_trace()
                load_data[l] = data_loaders.get_forecast(
                    f"ecmwf-{measurement_variable}-{gt_id.split('_')[0]}1_5-{l}", 
                    mask_df=mask_df,
                    shift=base_shift,
                    sync=False)                  

        printf("Averaging leads for forecast and debias data.")
        
        load_data[l][base_col] = load_data[l][cols].mean(axis=1)

        printf('Pivoting dataframe to have one row per start_date')
        if "reforecast" in l:
            load_data[l] = load_data[l][
                ['lat','lon','start_date', 'model_issuance_date', base_col]].set_index(
                ['lat','lon','start_date', 'model_issuance_date']).unstack(['lat','lon'])            
        else:
            load_data[l] = load_data[l][['lat','lon','start_date', base_col]].set_index(
                ['lat','lon','start_date']).unstack(['lat','lon'])     
            

        printf(f"Computing rolling mean over days {first_day}-{last_day}")
        days = last_day - first_day + 1
        if "reforecast" in l:
            load_data[l] = load_data[l].rolling(
                f"{days}d", on=load_data[l].index.get_level_values("start_date")
            ).mean().dropna(how='any')
        else:
            load_data[l] = load_data[l].rolling(f"{days}d").mean().dropna(how='any')
else:
    raise ValueError("ECMWF model only configured for 1.5 gt_ids for now.")            

if gt_id.startswith('us_'):
    # forecast_prefix in {"ef","cf","pf","pf1",...,"pf50"}
    forecast_data = load_data[f"{forecast_prefix}-forecast"]
    # debias_prefix in {"ef","cf","pf"}
    debias_data = load_data[f"{debias_prefix}-reforecast"]
else:
    forecast_data = load_data["forecast"]
    debias_data = load_data["reforecast"]

In [None]:
"""
Load and merge ground truth 
"""
printf('Pivoting ground truth to have one row per start_date')
gt = data_loaders.get_ground_truth(gt_id)\
        .loc[:,['lat','lon','start_date', gt_col]]

# Need gt for both the debias and the forecast dates
gt = gt.loc[gt.start_date.isin(
                debias_data.index.get_level_values("start_date") |
                forecast_data.index.get_level_values("start_date")),
            ['lat','lon','start_date', gt_col]].set_index(
            ['lat','lon','start_date']).unstack(['lat','lon'])

printf("Merging ground truth")
debias_data = debias_data.join(gt, how="left", on="start_date") 
forecast_data = forecast_data.join(gt, how="left", on="start_date") 

In [None]:
# The first target date for which an ECMWF forecast exists
# 1. FIRST_AVAILABLE_FORECAST_TARGET_DATE := (need to look this up; it's the first target date 
# for which an ECMWF forecast exists)
FIRST_FORECAST_TARGET_DATE = datetime.strptime("2015-05-14", "%Y-%m-%d")

# The first date in "std_ecmwf" target dates
# 2. FIRST_EVALUATION_TARGET_DATE := (this is the first date in our set of ABC-paper evaluation dates for ECMWF, 
# something like 2016-01-01)
FIRST_EVALUATION_TARGET_DATE = sorted(get_target_dates("std_paper_ecmwf"))[0] 


#2'. FIRST_TUNING_TARGET_DATE := (this is the first date used for tuning with num_years=3 and margin=None)
FIRST_TUNING_TARGET_DATE = datetime.strptime(f"{FIRST_EVALUATION_TARGET_DATE.year-3}-01-01", "%Y-%m-%d")

# 3. LAST_REFORECAST_ISSUANCE_DATE := the issuance date associated with FIRST_EVALUATION_TARGET_DATE. 
# [Translation: this is FIRST_EVALUATION_TARGET_DATE minus the offset depending on the horizon]
LAST_REFORECAST_ISSUANCE_DATE = FIRST_EVALUATION_TARGET_DATE - base_shift_delta

# print(f"FIRST_FORECAST_TARGET_DATE is: {FIRST_FORECAST_TARGET_DATE}")
# print(f"FIRST_EVALUATION_TARGET_DATE is: {FIRST_EVALUATION_TARGET_DATE}")
# print(f"FIRST_TUNING_TARGET_DATE is: {FIRST_TUNING_TARGET_DATE}")
# print(f"LAST_REFORECAST_ISSUANCE_DATE is: {LAST_REFORECAST_ISSUANCE_DATE}")

In [None]:
# 5. If target date < FIRST_FORECAST_TARGET_DATE:
# 5.a. Ignore any reforecast data that was produced after LAST_REFORECAST_ISSUANCE_DATE
#Get debias data
debias_data_refasf = debias_data[debias_data.index.get_level_values("model_issuance_date")<LAST_REFORECAST_ISSUANCE_DATE]

# 5. If target date < FIRST_FORECAST_TARGET_DATE:
# 5.b. Base forecast = ECMWF reforecast (if one exists, otherwise skip this date)
forecast_data_refasf = debias_data_refasf.reset_index().drop("model_issuance_date", axis=1, level=0).set_index("start_date")
forecast_data_refasf = forecast_data_refasf[forecast_data_refasf.index < FIRST_EVALUATION_TARGET_DATE]
data = forecast_data_refasf.sort_index()

# 5.c. Debiasing correction based on associated ECMWF reforecasts with start dates prior to the reforecast target date
bias_refasf = (debias_data_refasf[gt_col] - debias_data_refasf[base_col])
bias_refasf = bias_refasf.reset_index().drop("model_issuance_date", axis=1, level=0).set_index("start_date")
target = bias_refasf.sort_index()

In [None]:
# Make predictions for each target date
printf('Creating dataframes to store performance and date-based covariates')
tic()
X = pd.DataFrame(index=target.index, columns = ["delta", "dividend", "remainder"], 
                 dtype=np.float64)
toc()
printf('Initializing target date predictions to base column')
tic()
# Only form predictions for target dates in data matrix
valid_targets_refasf = data.index.intersection(target_date_objs)
preds_refasf = data.loc[valid_targets_refasf, base_col]
preds_refasf.index.name = "start_date"
# # Order valid targets by day of week
# valid_targets_refasf = sorted(valid_targets_refasf[valid_targets_refasf.weekday.argsort(kind='stable')])
toc()
days_per_year = 365.242199

In [None]:
# Compute debiasing correction
printf('Compute debiasing correction (ground-truth - base prediction) by month-day combination')

# Compute bias
bias = (debias_data[gt_col] - debias_data[base_col])

# Initialize bias per start_date dataframe in forecast dir
avg_bias = pd.DataFrame(columns=bias.columns, 
                        index=bias.index.get_level_values(
                            'model_issuance_date').unique().sort_values())

for (date, df) in bias.groupby(by="model_issuance_date"):
    # Get all forecasts within +/- days of the day/month of the current forecast
    # and within train years of the target year
    last_train_date = date - start_delta
    if margin_in_days is None:
        debias = bias[
                (bias.index.get_level_values("start_date") >= str(date.year - train_years)) &        
                (bias.index.get_level_values("start_date") <= last_train_date)]
    else:
        debias = bias[
                (bias.index.get_level_values("start_date") >= str(date.year - train_years)) &        
                (bias.index.get_level_values("start_date") <= last_train_date) &        
                (bias.index.get_level_values("model_issuance_date") <= date + timedelta(days=margin_in_days)) &
                (bias.index.get_level_values("model_issuance_date") >= date - timedelta(days=margin_in_days))]
    
    avg_bias.loc[date] = estimator(debias)

In [None]:
# Make predictions for each target date
printf('Creating dataframe to store performance')
rmses = pd.Series(index=target_date_objs, dtype=np.float64)

printf('Forming debiased predictions for target dates')
# Form predictions for target dates in data matrix
valid_targets = forecast_data.index.intersection(target_date_objs) # intersect with forecast data
valid_targets = avg_bias.index.intersection(valid_targets) # intersect with debiasing data

if fit_intercept:
    # Debias the data
    preds = forecast_data.loc[valid_targets, base_col] + avg_bias.loc[valid_targets]
else:
    # Do nothing 
    preds = forecast_data.loc[valid_targets, base_col]
preds.index.name = "start_date"

In [None]:
valid_targets = valid_targets.union(valid_targets_refasf)
for target_date_obj in valid_targets:
    # Skip if forecast already produced for this target
    target_date_str = datetime.strftime(target_date_obj, '%Y%m%d')
    forecast_file = get_forecast_filename(
        model=model_name, submodel=submodel_name, 
        gt_id=gt_id, horizon=horizon, 
        target_date_str=target_date_str)
    if target_date_obj <= FIRST_TUNING_TARGET_DATE:
        continue
    elif os.path.isfile(forecast_file):
        printf(f"\nprior forecast exists for target={target_date_obj}")
        pred = pd.read_hdf(forecast_file).set_index(['lat','lon']).pred
    elif target_date_obj >= FIRST_FORECAST_TARGET_DATE:
        printf(f'\nProcessing {model_name} forecast for {target_date_obj}')
        tic()
        # Get prediction 
        pred = preds.loc[target_date_obj,:]
        # Save prediction to file in standard format
#          if target_date_obj.year >= LAST_SAVE_YEAR:
        save_forecasts(
            preds.loc[[target_date_obj],:].unstack().rename("pred").reset_index(),
            model=model_name, submodel=submodel_name, 
            gt_id=gt_id, horizon=horizon, 
            target_date_str=target_date_str)
        toc()
    else:
        printf(f'\nProcessing {model_name} forecast for {target_date_obj}')
        tic()
#         printf(f"Preparing covariates for {target_date_str}")
        # Compute days from target date
        X['delta'] = (target_date_obj - target.index).days
        # Extract the dividend and remainder when delta is divided by the number of days per year
        # The dividend is analogous to the year
        # (Negative values will ultimately be excluded)
        X['dividend'] = np.floor(X.delta / days_per_year)
        # The remainder is analogous to the day of the year
        X['remainder'] = np.floor(X.delta % days_per_year)
        # Find the last observable training date for this target
        last_train_date = target_date_obj - start_delta
        # Restrict data based on training date, dividend, and remainder
        indic = (X.index <= last_train_date)
        if margin_in_days is not None:
            indic &= ((X.remainder <= margin_in_days) | (X.remainder >= 365-margin_in_days))
        if train_years != "all":
            indic = indic & (X.dividend < train_years)
        toc()
#         printf(f'Fitting {model_name} model with loss {loss} for {target_date_obj}')
        tic()
        if fit_intercept and not indic.any():
            printf(f'-Warning: no training data for {target_date_str}; using base prediction')
            # Do not adjust base prediction
            pred = 0
        elif fit_intercept:
            # Add learned prediction to base prediction
            pred = estimator(target.loc[indic,:])
            preds_refasf.loc[target_date_obj,:] += pred
        else:
            # Do not adjust base prediction
            pred = 0
        # Save prediction to file in standard format
#         if target_date_obj.year >= LAST_SAVE_YEAR:
        save_forecasts(
            preds_refasf.loc[[target_date_obj],:].unstack().rename("pred").reset_index(),
            model=model_name, submodel=submodel_name, 
            gt_id=gt_id, horizon=horizon, 
            target_date_str=target_date_str)
        toc()
    # Evaluate and store error if we have ground truth data
    tic()
    if target_date_obj in forecast_data.index:
        rmse = np.sqrt(np.square(pred - forecast_data.loc[target_date_obj, gt_col]).mean())
        rmses.loc[target_date_obj] = rmse
        printf("-rmse: {}, score: {}".format(rmse, mean_rmse_to_score(rmse)))
        mean_rmse = rmses.mean()
        printf("-mean rmse: {}, running score: {}".format(mean_rmse, mean_rmse_to_score(mean_rmse)))
    elif target_date_obj in target.index:
        rmse = np.sqrt(np.square(pred - target.loc[target_date_obj,:]).mean())
        rmses.loc[target_date_obj] = rmse
        print("-rmse: {}, score: {}".format(rmse, mean_rmse_to_score(rmse)))
        mean_rmse = rmses.mean()
        print("-mean rmse: {}, running score: {}".format(mean_rmse, mean_rmse_to_score(mean_rmse)))
    toc()

printf("Save rmses in standard format")
rmses = rmses.sort_index().reset_index()
rmses.columns = ['start_date','rmse']
save_metric(rmses, model=model_name, submodel=submodel_name, 
            gt_id=gt_id, horizon=horizon, target_dates=target_dates, 
            metric="rmse")