# Functions evaluation

In [46]:
import json
import urllib.request as ur
from tqdm.notebook import tqdm

import xarray as xr
import numpy as np
import pandas as pd

import lmfit
import numdifftools
import corner
from scipy.stats import f_oneway

from matplotlib import pyplot as plt

from IPython.display import display, Markdown

_np = np.seterr(all="ignore", divide="raise")

In [3]:
url = 'https://raw.githubusercontent.com/maxdevblock/covid-19-time-series/master/json/COVID-COUNTRIES.json'
with ur.urlopen(url) as _url:
    _data = json.loads(_url.read().decode())

In [4]:
def gompertz_function(x, a, b, k, e):
    exp = - np.exp(k * (b - x))
    return a * np.exp(exp) + e

In [5]:
def double_gompertz_function(x, a1, b1, k1, a2, b2, k2, e):
    exp1 = - np.exp(k1 * (b1 - x))
    g1 = a1 * np.exp(exp1)
    exp2 = - np.exp(k2 * (b2 - x))
    g2 = (a2 - a1) * np.exp(exp2)
    return g1 + g2 + e

In [6]:
def logit_function(x, a, b, k, e):
    d = k * (b - np.array(x))
    return (a / (1 + np.exp(d))) + e

In [7]:
def double_logit_function(x, a1, b1, k1, a2, b2, k2, e):
    d1 = k1 * (b1 - np.array(x))
    l1 = a1 / (1 + np.exp(d1))
    d2 = k2 * (b2 - np.array(x))
    l2 = (a2 - a1) / (1 + np.exp(d2))
    return l1 + l2 + e

In [8]:
def guessed_params(function, model, y):
    dy = pd.Series(y).diff()
    max_y_i = y.argmax()
    
    if function == gompertz_function:
        p = model.make_params(
            a=y[-1],
            b=max_y_i,
            k=.1,
            e=y[0]
        )
    
    if function == logit_function:
        p = model.make_params(
            a=y[-1],
            b=max_y_i,
            k=.1,
            e=y[0]
        )
    
    if function in [double_logit_function, double_gompertz_function]:
        p = model.make_params(
            a1=y[max_y_i] * 2,
            b1=max_y_i,
            k1=.1,
            a2=max(y),
            b2=len(y),
            k2=.1,
            e=y[0]
        )
    
    return p

In [43]:
def get_stats(country, function, print_res=False):
    x_dates = np.array(list(_data[country]["Confirmed"].keys()))
    x = np.arange(0, len(x_dates))
    y = np.array(list(_data[country]["Confirmed"].values()))
    
    # define model
    model = lmfit.Model(function)
    
    p = guessed_params(function, model, y)
    
    # get model fit optimal results
    result = model.fit(data=y, params=p, x=x, method='Nelder', nan_policy='omit')

    if print_res:
        lmfit.report_fit(result)
        result.plot()
        plt.show()
    
        # get stats for model fit
        emcee_kws = dict(
            steps=1000, burn=300, thin=20, is_weighted=False, progress=False
        )
        emcee_params = result.params.copy()
        emcee_params.add('__lnsigma', value=np.log(0.1), min=np.log(0.001), max=np.log(2.0))

        result_emcee = model.fit(
            data=y, x=x, params=emcee_params, method='emcee',
            nan_policy='omit', fit_kws=emcee_kws
        )

        lmfit.report_fit(result_emcee)

        ax = plt.plot(x, model.eval(params=result.params, x=x), label='Nelder', zorder=100)
        result_emcee.plot_fit(ax=ax, data_kws=dict(color='gray', markersize=2))
        plt.show()

        plt.plot(result_emcee.acceptance_fraction)
        plt.xlabel('walker')
        plt.ylabel('acceptance fraction')
        plt.show()

        if hasattr(result_emcee, "acor"):
            print("Autocorrelation time for the parameters:")
            print("----------------------------------------")
            for i, p in enumerate(result.params):
                print(p, result.acor[i])

        emcee_corner = corner.corner(
            result_emcee.flatchain, labels=result_emcee.var_names,
            truths=list(result_emcee.params.valuesdict().values())
        )
        plt.show()

    return result

In [132]:
functions = [gompertz_function, logit_function, double_gompertz_function, double_logit_function]

cols = ["countries"]
for function in functions:
    func_name = function.__name__.replace("_", " ").title()
    cols.append(func_name)

results = []

for i, country in enumerate(tqdm(list(_data.keys()))):
    results.append([f"{country}"])
    f_stats = {"best_fit": [], "residual": []}
    for function in functions:
        try:
            result = get_stats(country, function, print_res=False)
            results[i].append(result)
        except Exception as err:
            print(f"{country} - {function} ERR {err}")

HBox(children=(FloatProgress(value=0.0, max=185.0), HTML(value='')))

Sao Tome and Principe - Logit Function ERR divide by zero encountered in log



In [146]:
cols = ["countries"]
for function in functions:
    func_name = function.__name__.replace("_", " ").title()
    cols.append(func_name)

res_df = pd.DataFrame(data=results, columns=cols)
res_df.set_index("countries", inplace=True)

In [153]:
res_df.to_pickle("fit_data.pkl")

In [159]:
res_df.loc["Italy"]["Gompertz Function"]

0,1,2
fitting method,Nelder-Mead,
# function evals,370,
# data points,97,
# variables,4,
chi-square,2.0592e+08,
reduced chi-square,2214188.68,
Akaike info crit.,1421.12366,
Bayesian info crit.,1431.42250,

name,value,standard error,relative error,initial value,min,max,vary
a,224252.073,1589.23628,(0.71%),199414.0,-inf,inf,True
b,64.8350816,0.14101673,(0.22%),96.0,-inf,inf,True
k,0.06469555,0.0008236,(1.27%),0.1,-inf,inf,True
e,-0.00372319,228.398465,(6134477.90%),0.0,-inf,inf,True

0,1,2
a,k,-0.9253
a,b,0.842
b,k,-0.747
a,e,-0.3364
k,e,0.293


In [157]:
res_df["Logit Function"]["China"]

0,1,2
fitting method,Nelder-Mead,
# function evals,652,
# data points,97,
# variables,4,
chi-square,3.2353e+08,
reduced chi-square,3478787.88,
Akaike info crit.,1464.94805,
Bayesian info crit.,1475.24689,

name,value,standard error,relative error,initial value,min,max,vary
a,84324.2517,1158.8215,(1.37%),83918.0,-inf,inf,True
b,17.6135611,0.2176884,(1.24%),96.0,-inf,inf,True
k,0.20410178,0.00709834,(3.48%),0.1,-inf,inf,True
e,-2132.4012,1091.08524,(51.17%),548.0,-inf,inf,True

0,1,2
a,e,-0.9785
b,e,0.8128
a,k,-0.7746
a,b,-0.7566
k,e,0.7511
b,k,0.5817
