# Reflective Error - Part II

## A Real World Example in Hydrology
This second notebook reproduces the second set of experiments, the application of the Reflective Error functions within a hydrological context, in the Environmental Data Science paper "Reflective Error: A Metric for s Predictive Performance at Extreme Events" and is designed to provide a step by step guide on the application of both the metric and the associated loss function.  As explained in the paper, the metric and loss function are designed to give a generalisable quantification of model error with respect to the underlying probability distribution that best fits the data.  This notebook follows on from the [Part I notebook](./reflective-error-part-I.ipynb).

To get started, we'll import all of the prerequisite libraries, including numpy, pandas, scipy, xarray, and pytorch, and we'll also set some global parameters for graph presentation, random number seeds for reproducability, and torch device configuration.

In [36]:
import numpy as np
import pandas as pd
import scipy as sp
import xarray as xr
import torch
import torch.nn as nn
import cdsapi
import matplotlib.pyplot as plt
import matplotlib.ticker as mtk

plt.rcParams['font.size'] = 20
plt.rcParams['font.family'] = 'serif'
fontlibrary = str('font.' + 'serif')
plt.rcParams[fontlibrary] = ['Times New Roman']

np.random.seed(42)
torch.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

As per the [Part I notebook](./reflective-error-part-I.ipynb), the metrics that we'll be using in this notebook are Root Mean Squared Error (RMSE), R$^2$, and Reflective Error (RE).  For RE, $\Psi_{\phi}(y_i)$ is the weighting to be applied to the error at any given $y$ based on the underlying, probability distribution corresponding to values $y$ within the local subdomain, $\phi$, of the target domain, $Y$.  Note that for the stationary form of RE, $\phi = Y$.  The scaling factor $\kappa = \underset{y}{\max} \big(U(y)\big)$ and $\alpha$ and $\beta$ are parameters set to $1$ for the metric and hyperparameters to be tuned by the user for the reflective loss function.

In [25]:
def RMSE(y_o, y_p):
    '''
    Function to evalate the root mean squared error
    between a set of observations and predictions
    
    Parameters
    ----------
    y_o : Float, Numpy Array, or Pandas DataFrame Column
        Set of observations, y
    y_p : Float, Numpy Array, or Pandas DataFrame Column
        Set of predictions, y'
        
    Returns
    -------
    rmse : Float
        Root Mean Squared Error
    '''
    total = (((y_o - y_p)**2)/len(y_o))
    rmse = sum(total)**0.5
    return rmse


def R2(y_o, y_p):
    '''
    Function to evalate the r2 value between
    a set of observations and predictions
    
    Parameters
    ----------
    y_o : Float, Numpy Array, or Pandas DataFrame Column
        Set of observations, y
    y_p : Float, Numpy Array, or Pandas DataFrame Column
        Set of predictions, y'
        
    Returns
    -------
    r2 : Float 
        R2
    '''
    cache1 = ((y_o - y_p)**2)
    mu = np.mean(y_o)
    cache2 = ((y_o - mu)**2)
    r2 = 1 - (sum(cache1))/(sum(cache2))
    return r2

def PSI(u_of_y, kappa, alpha=1, beta=1):
    '''
    Function to evalate the weighting to be applied
    elementwise to error terms with 

    Parameters
    ----------
    u_of_y : Float, Numpy Array, or Pandas DataFrame Column
        Probability of y, elementwise, according to
        the distribution fitted to training data
    alpha : Float
        Reflective scaling hyperparameter.
    beta : Float
        Reflective shift hyperparameter.
    kappa : Float
        Global maximum of u_of_y (recommend using calculus to find).

    Returns
    -------
    psi : Float, Numpy Array, or Pandas DataFrame Column
        Reflective weighting to be applied in the loss
        function during network training.
    '''
    psi  = -1 * alpha * (u_of_y/kappa) + beta
    return psi


def RE(y_o, y_p, psi):
    '''
    Function to evalate the reflective error
    between a set of observations and predictions
    
    Parameters
    ----------
    y_o : Float, Numpy Array, or Pandas DataFrame Column
        Set of observations, y
    y_p : Float, Numpy Array, or Pandas DataFrame Column
        Set of predictions, y'
        
    Returns
    -------
    r_rmse : Float
        Reflective Root Mean Squared Error
    '''
    cache1 = ((y_o - y_p)**2)
    re = ((sum(cache1 * psi))/(sum(cache1)))**(0.5)
    return re


def RELossFunc(prediction, target, psi):
    '''
    Function to calculate unaggregated loss that can be processed
    using the machine learning library specified by the user

    Parameters
    ----------
    prediction : Float, array, or Tensor
        Machine learning algorithm output.
    target :  Float, array, or Tensor
        Observation to compare the prediction against.
    gamma : Float, array, or Tensor
        Reflective weighting penalty applied elementwise to the
        prediction and target pairs.

    Returns
    -------
    interrim_loss : Float, array, or Tensor
        Unaggregated loss between prediction(s) and target(s).
    '''
    interrim_loss = (((prediction - target)**2) * psi)
    return interrim_loss

## Data Acquisition
There are two sources from which we need to obtain data: the first is the streamflow data itself, which is taken from the Centre for Ecology & Hydrology's [National River Flow Archive](https://nrfa.ceh.ac.uk/); and the second is ERA5 meteorological data, which can be downloaded using the European Centre for Medium-Range Weather Forecasts' Copernicus Data Store.

The following two links provide direct access to the csv files containing the streamflow records for the two rivers used in this study: [Avon at Bathford](https://nrfa.ceh.ac.uk/data/station/meanflow/53018) & [Exe at Pixton](https://nrfa.ceh.ac.uk/data/station/meanflow/45009).  The NRFA also has an API that can be queried for additional data, with instructions on how to use said API available [here](https://nrfa.ceh.ac.uk/content/nrfa-api).

In order to download data through the CDS API, users will need to set up a personal access token and download the requisite python library.  Full instructions can be found [here](https://cds.climate.copernicus.eu/how-to-api).  The following script will download the required data once setup.

In [49]:
def query(filename, product, variables, gridsquare, years, months,
                   days, times, pressure_levels='n/a'):
    '''
    Creates a standardised dataquery in dictionary format that can be passed
    to initialise the era5 class and subsequent download from the copernicus
    data store.

    Parameters
    ----------
    filename : String
        Filename stem to which the data will be saved.
    product : String
        ERA5 data product that is being accessed.  Refer to the CDS API for
        guidance on product names if further information is required.
    variables : String, List of strings
        Meteorologial variables to extract from the data product.
    gridsquare : String, List of strings
        Lat/Lon gridsquare over which to extract meteorological data.
    years : Integer, String, List of integers or strings
        Years at which to extract meteorological data.
    months : Integer, String, List of integers or strings
        Months at which to extract meteorological data.
    days : Integer, String, List of integers or strings
        Days at which to extract meteorological data.
    times : Integer, String, List of integers or strings
        Hours at which to extract meteorological data.
    pressure_levels : Integer, String, List of integers or strings, optional
        Pressure levels from which to extract varibles. The default is 'n/a'.

    Returns
    -------
    query : Dictionary
        Standardised data query format.
    '''
    query = {'file_stem':filename, 'product':product, 'variables':variables,
               'area':gridsquare, 'years':years, 'months':months, 'days':days,
               'times':times, 'pressure_levels':pressure_levels
               }
    return query

class era5():
    def __init__(self, query):
        '''
        Initialises the era5 data query class for accessing and downloading
        data via the CDS API.

        Parameters
        ----------
        query : Dictionary
            Standardised data query.

        Returns
        -------
        None.
        '''
        self.c = cdsapi.Client()
        self.query = query
        self.pressure_set = query['pressure_levels']

    def download(self):
        '''
        Initiates the download from the CDS, outputting data to the 
        target filename.

        Returns
        -------
        None.
        '''
        if self.pressure_set=='n/a':
            filename = str(self.query['file_stem']) + '.nc'
            self.c.retrieve(self.query['product'],
                            {"product_type":   ["reanalysis"],
                            "data_format":         ["netcdf"],
                            "variable":       self.query['variables'],
                            "area":           self.query['area'],
                            "year":           self.query['years'],
                            "month":          self.query['months'],
                            "day":            self.query['days'],
                            "time":           self.query['times']},
                            filename)
        else:
            for p in self.pressure_set:
                filename = str(self.query['file_stem']) + str(p) + 'hPa.nc'
                self.c.retrieve(self.query['product'],
                                {"product_type":   "reanalysis",
                                "data_format":         "netcdf",
                                "pressure_level": [p],
                                "variable":       self.query['variables'],
                                "area":           self.query['area'],
                                "year":           self.query['years'],
                                "month":          self.query['months'],
                                "day":            self.query['days'],
                                "time":           self.query['times']},
                                filename)

area = ['60.00/-8.00/48.00/4.00']
yyyy = [str(y) for y in range(1979,2022,1)]
mm = [str(m) for m in range(1,13,1)]
dd = [str(d) for d in range(1,32,1)]
hh = [str(t).zfill(2) + ':00' for t in range(0, 24, 1)]
met = ['total_precipitation','temperature','u_component_of_wind',
       'v_component_of_wind','relative_humidity',
       'volumetric_soil_water_layer_1','volumetric_soil_water_layer_2',
       'volumetric_soil_water_layer_3','volumetric_soil_water_layer_4',]

for yy in yyyy:
    filename = 'Rainfall_' + str(yy)
    rain_query = er.query(filename, 'reanalysis-era5-single-levels', met[0],
                          area, yy, mm, dd, hh)
    rain_data = er.era5(rain_query).download()
    er.aggregate_mean(str(rain_query['file_stem']) + '.nc',
                      str(rain_query['file_stem']) + '_aggregated.nc')
full_rain_data = xr.open_mfdataset('Rainfall_*_aggregated.nc', concat_dim='time')
full_rain_data.to_netcdf(path='Rainfall.nc')
pressure_query = er.query('Pressure','reanalysis-era5-pressure-levels',
                          met[1:5], area, yyyy, mm, dd, '12:00', ['1000'])
pressure_data = er.era5(pressure_query).download()
soil_query = er.query('Soil_Moisture','reanalysis-era5-land', met[5:],
                      area, yyyy, mm, dd, '12:00')
soil_data = er.era5(soil_query).download()

## Data Preprocessing
With the data downloaded, we will now preprocess it by extracting meteorological variables, including antecedent proxies, at the centroid of the catchment, with catchment boundaries taken from the NRFA database as shape files along with the gauged daily flow data for the pair of target catchments; for convenience, metadata for these catchments is stored in the file ['Catchment_Database.csv'](./Catchment_Database.csv).

In [None]:
def rename(df):
    '''
    Dictionary based renaming of columns within a dataframe from ECMWF
    nomenclature to plain language.

    Parameters
    ----------
    df : Pandas dataframe
        Original dataframe of variables pulled from ERA5.

    Returns
    -------
    df : Pandas dataframe
        Dataframe with renamed columns for human readability.
    '''
    df = df.rename(columns={'time':'Date','tp':'Rain',
                            't':'Temperature','r':'Humidity',
                            'u':'U Windspeed','v':'V Windspeed',
                            'swvl1':'Soil Moisture 1',
                            'swvl2':'Soil Moisture 2',
                            'swvl3':'Soil Moisture 3',
                            'swvl4':'Soil Moisture 4'})
    return df

def weather_shift(df, variable, ndays, past=True):
    '''
    Creates additional columns by copying meteorological variables and
    shifting them back or forward in time for a given number of steps,
    creating rows of data with the required antecedent record that can be
    passed directly into a model.

    Parameters
    ----------
    xf : Pandas dataframe
        Dataframe of variables to be shifted in time.
    variable : String
        Name of the feature column to be shifted.
    ndays : Integer
        Number of days, or timesteps, to apply the shift.
    past : Boolean, optional
        Applying either a rearward or forward looking shift.
        The default is True.

    Returns
    -------
    xf : Pandas dataframe
        Dataframe with time shifted variables in each row.
    '''
    if past == True:
        for i in range (1, ndays):
            colname = str(variable) + '-{0}'.format(i)
            df[colname] = df[variable].shift(+i)
    else:
        for i in range (1, ndays):
            colname = str(variable) + '+{0}'.format(i)
            df[colname] = df[variable].shift(-i)
    return df

class hydrobase():
    def __init__(self, station, flowpath, boundarypath):
        '''
        Initialises a class instance of a streamflow database.
        
        Parameters
        ----------
        station : String
            Gauging station number, keyed to flow data files.
        flowpath : String
            File path for the flow data file.
        boundarypath : String
            File path for the .shp catchment boundary file.  Note that the
            .shx file must also be present in the same location.

        Returns
        -------
        None.
        '''
        self.station = station
        self.flowpath = flowpath
        self.flow = pd.read_csv(flowpath)
        self.flow = self.flow.drop(self.flow.columns[2], axis=1)
        self.flow = self.flow.drop(self.flow.index[0:19])
        self.flow.columns = ['Date', 'Flow']
        self.flow['Date'] = pd.to_datetime(self.flow['Date'],
                                           format='%Y-%m-%d').dt.date
        self.flow['Flow'] = self.flow['Flow'].astype(float)
        self.boundarypath = boundarypath
        self.boundary = gp.read_file(self.boundarypath)
        self.points = self.boundary.centroid
        self.lat, self.lon = osg.BNG_2_latlon(self.points.x[0],
                                              self.points.y[0])
        
    def meteorological_extraction(self, domain_data):
        '''
        Extracts meteorological data from the domain datafiles using xarray
        and interpolates across the spatial dimensions to obtain a single
        time series of the meteorological variables.

        Parameters
        ----------
        domain_data : Xarray
            Array of spatio-temporally distributed variables.

        Returns
        -------
        local_data : Xarray
            Interpolated variables at the centroid coordinates.
        '''
        local_data = domain_data.interp(longitude=self.lon, latitude=self.lat)
        local_data = local_data.to_dataframe().reset_index()
        return local_data
    
    def flow_meteorolgy_combine(self, domain_weather, surface_data, days):
        '''
        Takes the standard CEH flow file format and combines it with
        meteorological data interpolated at the centroid of the catchment.

        Parameters
        ----------
        domain_weather : Xarray opened netcdf file
            Meteorological, or other, netcdf files opened using xarray.
        surface_data : Xarray opened netcdf file
            Soil moisture, or other, surface netcdf files opened using xarray
        days : Integer
            Number of days needed for antecedent record.

        Returns
        -------
        combined : Pandas dataframe
            A single dataframe combining all input meteorological and output
            streamflow variables.
        '''
        weather = self.meteorological_extraction(domain_weather)
        surface = self.meteorological_extraction(surface_data)
        weather = rename(weather)
        surface = rename(surface)
        weather['Rain'] = weather['Rain']*1000*24
        weather['Date'] = pd.to_datetime(weather['Date'],
                                         format='%Y-%m-%d').dt.date
        surface['Date'] = pd.to_datetime(surface['Date'],
                                         format='%Y-%m-%d').dt.date
        weather = weather.drop(['longitude', 'latitude'], axis=1)
        surface = surface.drop(['longitude', 'latitude'], axis=1)
        weather['Resultant Windspeed'] = (weather['U Windspeed']**2
                                          + weather['V Windspeed']**2)**(1/2)
        for f in ['Rain','Temperature','Resultant Windspeed','Humidity']:
            weather = weather_shift(weather, f, days)
            for window in [28, 90, 180]:
                ma.stat_roller(weather, f, window, method='mean')
        combined = pd.merge(weather, surface, on='Date')
        combined = pd.merge(self.flow, combined, how='inner', on='Date')
        combined = combined.drop(combined.index[0:179])
        return combined
    
    def output_file(self, domain_weather, surface_data, days):
        '''
        Applies the meteorological extraction and dataset combination functions
        and returns a single combined dataframe which is then exported as a 
        lumped regression .csv file.

        Parameters
        ----------
        domain_weather : Xarray opened netcdf file
            Meteorological, or other, netcdf files opened using xarray.
        surface_data : Xarray opened netcdf file
            Soil moisture, or other, surface netcdf files opened using xarray
        days : Integer
            Number of days needed for antecedent record.

        Returns
        -------
        None.
        '''
        outdf = self.flow_meteorolgy_combine(domain_weather, surface_data, days)
        outpath = str(self.station) + '_lumped.csv'
        outdf.to_csv(outpath, index=True)

domain_weather = xr.open_mfdataset(['Rainfall.nc',
                                    'Pressure.nc'])
surface_data = xr.open_dataset('Soil_Moisture.nc')
db = pd.read_csv('Catchment_Database.csv')
for i in range(len(db)):
    cache = hydrobase(db.loc[i][0], db.loc[i][3], db.loc[i][4])
    output = cache.output_file(domain_weather, surface_data, 14)

## Variable Handling
The following pair of functions will be used to extract the required columns from the pandas dataframes and then normalise the input variables.

In [58]:
def featurelocator(df, features):
    '''
    Identifies the column numbers for features from a dataframe
    for use with indexing the array form of that dataframe

    Parameters
    ----------
    df : Pandas dataframe
        Pandas dataframe of all input and output information
    features : List
        List of target variables or features.

    Returns
    -------
    array_indices : List
        List of indices for slicing the converted array.
    '''
    array_indices = [df.columns.get_loc(f) for f in features]
    return array_indices


def normalise(df, feature, norm_dict={}, write_cache=True):
    '''
    Normalises the feature columns of a dataframe, with the option
    to retain a cache of the normalisation parameters for use on additional
    datasets.    
    
    Parameters
    ----------
    df : Pandas dataframe
        Dataframe containing columns of variables to be normalised
    feature : String
        Name of feature column to be normalised
    norm_dict : Dictionary
        Dictionary in which normalisation cache parameters are stored
    write_cache : Boolean
        Determines whether or not the cache key values will be overwritten

    Returns
    -------
    array_indices : List
        The input feature column normalised with a dictionary key value entry
        made containing all normalisation parameters.
    '''
    if write_cache == True:
        favg = np.mean(df[feature])
        fmax = np.max(df[feature])
        fmin = np.min(df[feature])
        cachelist = [favg, fmax, fmin]
        norm_dict[feature] = cachelist
        return(df[feature] - favg)/(fmax-fmin)
    elif write_cache == False:
        try:
            cache = norm_dict[feature]
            return(df[feature] - cache[0])/(cache[1]-cache[2])
        except:
            raise KeyError('Normalisation cache specified incorrectly')

## Creation of Reflective Weights & Tensor Arrays
Using the above functions, we'll: load in the preprocessed data files; partition the data into the training and validation + test sets; create the input and output tensors for the neural network; and normalise the input features using parameters from the training set only.  We'll also create the Reflective Weights needed for both the assessment of our model's performance and for the Reflective loss function, again using information from the training set only.

In [None]:
station = 53018
filename = str(str(station) + '_lumped.csv')
rf = pd.read_csv(filename)
rf['Date'] = pd.to_datetime(rf['Date'], format='%Y-%m-%d').dt.date

### Identify features (with either antecedent proxies or soil moisture levels)
days = 13
features = ['Rain'] + ['Rain-' + f'{d+1}' for d in range(days)] \
            + ['Temperature'] \
            + ['Temperature-' + f'{d+1}' for d in range(days)] \
            + ['Resultant Windspeed'] \
            + ['Resultant Windspeed-' + f'{d+1}' for d in range(days)] \
            + ['Humidity'] + ['Humidity-' + f'{d+1}' for d in range(days)] \
            + ['Rain_28_Mu','Rain_90_Mu','Rain_180_Mu',
               'Temperature_28_Mu','Temperature_90_Mu','Temperature_180_Mu']
targets = ['Flow']
xspace = featurelocator(rf, features)
yspace = featurelocator(rf, targets)

###Test/Train data split by years
yearlist = [2009+i for i in range(12)]
rftrain = rf[~pd.to_datetime(rf['Date']).dt.year.isin(yearlist)]

### Fit distribution to training set data & set reflective penalty
flows = rftrain['Flow'].values
sigma, loc, scale = sp.stats.lognorm.fit(flows)
alpha = 1
beta = 2
kappa = np.exp(-2*(sigma**2)) * (loc*np.exp(2*(sigma**2))+scale)
relevance = (1/(sigma*(kappa-loc)/scale*((np.pi*2)**(1/2)))) \
        * np.exp(-(np.log((kappa-loc)/scale)**2)/(2*sigma**2)) / scale*2
rf['U_of_y'] = (1/(sigma*(rf['Flow']-loc)/scale*((np.pi*2)**(1/2)))) \
        * np.exp(-(np.log((rf['Flow']-loc)/scale)**2)/(2*sigma**2)) / scale*2
rf['Psi'] = RELossWeight(rf['U_of_y'], 1, 1, relevance)
rf['Relevance'] = RELossWeight(rf['U_of_y'], 1, 2, relevance)

### Normalise features using parameters cached from the training set
norm_cache = {}
for f in features:
    rftrain[f] = ma.normalise(rftrain, f, norm_cache, write_cache=True)
    rf[f] = ma.normalise(rf, f, norm_cache, write_cache=False)

### Convert dataframe subsets to arrays and then to PyTorch variables
rftrain = rf[~pd.to_datetime(rf['Date']).dt.year.isin(yearlist)]
trnset = rftrain.to_numpy()
X = trnset[:,xspace].reshape(len(trnset), len(xspace)).astype(float)
Y = trnset[:,yspace].reshape(len(trnset), len(yspace)).astype(float)
G = trnset[:,rf.columns.get_loc('Relevance')].reshape(len(trnset), 1).astype(float)
x = torch.from_numpy(X).to(device)
y = torch.from_numpy(Y).to(device)
g = torch.from_numpy(G).to(device)
x, y = Variable(x), Variable(y)

## Model Setup
In this experiment, we're using a relatively simple multilayer perceptron; it will have just 3 layers and will use the swish activiation function.  We'll also randomly initialise the network weights at this point.

In [55]:
class AntecedentNET(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(AntecedentNET, self).__init__()
        self.in_dim = in_dim
        self.out_dim = out_dim
        self.linear_layers = nn.Sequential(
            nn.Linear(in_dim, 64),
            nn.SiLU(),
            nn.Linear(64, 16),
            nn.SiLU(),
            nn.Linear(16, 1))
    
    def forward(self, z):
        z = self.linear_layers(z)
        return z

def init_weights(m):
    if type(m) == torch.nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

net = AntecedentNET(len(xspace), len(yspace))
net.apply(init_weights)

NameError: name 'xspace' is not defined

## Model Training
The optimisation algorithm we use is the Adam algorithm.  As the network is relatively simple, it doesn't require an excessive number of epochs and should train relatively quickly on a modern hardware setup (we used a Macbook with an M2 chip featuring 8 cores and 24 GB of memory and it took a minute or two).  We're using the Reflective Loss function with the alpha and beta hyperparameters set above.

In [None]:
net = nn.DataParallel(net)
net = net.train()
net = net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=0.001, weight_decay=0.1)
loss_list = []
for i in range(5000):
    y_pred = net(x.float())
    loss = torch.abs(torch.mean(RELossFunc(y_pred, y.float(), g.float())))
    net.zero_grad()
    loss.backward()
    optimizer.step()
    loss_list.append(loss.data)
    if(i % 500 == 0):
        print('epoch {}, loss {}'.format(i, loss.data))

## Model Evaluation
With the model trained, we can now examine it's performance on the test set.  The code below will report the metrics evaluated on the training, validation, and test sets but these can be deleted or retained as appropriate.

In [None]:
net = net.eval()
fullset = rf.to_numpy()
Z = fullset[:,xspace].reshape(len(fullset), len(xspace)).astype(float)
z = torch.from_numpy(Z).to(device)
predicted = net(z.float()).data.cpu().numpy()
rf['Predicted'] = predicted
maxflow = 300
rftrain = rf[~pd.to_datetime(rf['Date']).dt.year.isin(yearlist)]
rfvalid = rf[pd.to_datetime(rf['Date']).dt.year.isin(yearlist[0:1])]
rftest = rf[pd.to_datetime(rf['Date']).dt.year.isin(yearlist[1:])]
for df in (rftrain, rfvalid, rftest):
    print('- - - - - - - - - - - - - - -')
    print('RMSE: ' + str(RMSE(df['Flow'], df['Predicted'])))
    print('R\N{SUPERSCRIPT TWO}: ' + str(R2(df['Flow'], df['Predicted'])))
    print('RE: ' + str(RE(df['Flow'], df['Predicted'], df['Psi'])))