In [1]:
import pandas as pd
import numpy as np 


In [10]:
class DivergenceScore:
    def __init__(self,df=None):
        """
        o, f    :   1-D series of observations and forecasts
        """      
        self.df = df
        self.N = np.sum(self.df.nk)
        self.pks = np.array([self.df.pk])
        
        self.eps = self._determine_eps(None)
        self.base = 2
        self.log = self._determine_log(self.base)
        self.ks = self.df.columns
        
    @staticmethod
    def _determine_log(base):
        if base == 2:
            return np.log2
        elif base == "e":
            return np.log 
        elif base == 10:
            return np.log10
        else:
            raise Exception("Choose log base from 2, 10, or e")
        
    @classmethod
    def compute_dkl(cls,q,p,base,eps=None):
        eps = cls._determine_eps(eps)
        log = cls._determine_log(base)
        dkl = q*log(q/(p+eps)) + (1-q)*log((1-q)/(1-p+eps))
        return dkl
    
    @staticmethod
    def _determine_eps(eps):
        if eps is None:
            return np.finfo(float).eps
        else:
            assert eps > np.finfo(float).eps
            return eps
    
    @classmethod
    def compute_ds_from_components(cls,rel=None,res=None,unc=None):
        assert rel and res and unc 
        return unc - res + rel 
    
    def compute_ds(self,from_components=False,return_all=False):
        if from_components:
            rel = self.compute_rel()
            res = self.compute_res()
            unc = self.compute_unc()
            if return_all:
                print("Note that return_all is ignored outputting components")
            return self.compute_ds_from_components(rel=rel,res=res,unc=unc)
        else:
            try:
                ds_series = self.compute_dkl(self.o,self.f, self.base)
            except AttributeError:
                # or "if self.o is None"
                print("You can only ask for DS computed from a time series.")
            if return_all:
                return ds_series
            return np.mean(ds_series)
        
    def compute_rel(self):
        total_dkl = 0
        for i, k in enumerate(self.ks):   
            ok_bar = self.df.loc["ok",k]/self.df.loc["nk",k]
            dkl = self.compute_dkl(ok_bar,self.df.loc["pk",k],self.base)
            total_dkl += dkl * self.df.loc["nk",k]
        return total_dkl/self.N
    
    def compute_res(self):
        total_dkl = 0
        for i, k in enumerate(self.ks):   
            ok_bar = self.df.loc["ok",k]/self.df.loc["nk",k]
            o_bar = np.sum(self.df.loc["ok"])/self.N
            dkl = self.compute_dkl(ok_bar,o_bar,base=self.base)
            total_dkl += dkl * self.df.loc["nk",k]
            # print(i,k,ok_bar,o_bar,dkl,total_dkl)
        return total_dkl/self.N
    
    def compute_unc(self):
        o_bar = self.eps + np.sum(self.df.loc["ok"])/self.N
        unc = -(o_bar * self.log(o_bar) + (1-o_bar) * self.log(1-o_bar)) 
        return unc 

        
        

In [8]:
import numpy as np
import pandas as pd

# Given parameters
probability_bins = np.array([0.01, 0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99])
observations = np.array([90, 45, 54, 54, 36, 15, 14, 16, 18, 8, 3, 2, 2]) * 123

# Step 1: Add more variation to nk
# Generate random variation factors with a larger range to increase variability
np.random.seed(42)  # For reproducibility
random_variation = np.random.uniform(0.5, 1.5, size=len(probability_bins))

# Adjust nk with added variation and ensure integers
nk = np.round(observations * random_variation).astype(int)

# Step 2: Ensure the sum of ok and nk are the same
total_ok = np.sum(observations)
total_nk = np.sum(nk)

# If total_nk is different from total_ok, adjust nk values proportionally
if total_nk != total_ok:
    adjustment_ratio = total_ok / total_nk
    nk = np.round(nk * adjustment_ratio).astype(int)

    # Fine-tune to ensure exact match in totals due to rounding
    while np.sum(nk) != total_ok:
        difference = total_ok - np.sum(nk)
        indices = np.random.choice(range(len(nk)), abs(difference), replace=True)
        for i in indices:
            nk[i] += np.sign(difference)

# Create a pandas DataFrame
forecast_df = pd.DataFrame({
    'pk': probability_bins,
    'ok': observations,
    'nk': nk,
})

print(f"Total observations (sum of ok): {np.sum(observations)}")
print(f"Total forecasts (sum of nk): {np.sum(nk)}")
forecast_df


Total observations (sum of ok): 43911
Total forecasts (sum of nk): 43911


Unnamed: 0,pk,ok,nk
0,0.01,11070,9350
1,0.05,5535,7756
2,0.1,6642,7903
3,0.2,6642,7048
4,0.3,4428,2806
5,0.4,1845,1168
6,0.5,1722,928
7,0.6,1968,2597
8,0.7,2214,2355
9,0.8,984,1148


In [11]:
# Initialize DivergenceScore object
div_score = DivergenceScore(df=forecast_df)

# Compute and print REL, RES, UNC, DKL, and DSS
rel = div_score.compute_rel()
res = div_score.compute_res()
unc = div_score.compute_unc()
ds = div_score.compute_ds(from_components=True)
dss = div_score.compute_ds()  

print(f"REL: {rel}, RES: {res}, UNC: {unc}, DKL: {ds}")

# Create a DataFrame to store these metrics for visualization
metrics_df = pd.DataFrame({
    'Metric': ['REL', 'RES', 'UNC', 'DKL'],
    'Value': [rel, res, unc, ds]
})


KeyError: 'pk'

In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns

# Bar plot for REL, RES, UNC, and DKL
sns.barplot(x='Metric', y='Value', data=metrics_df)
plt.title('Forecast Metrics')
plt.show()
