In [None]:
import pandas as pd
import datetime
import math
from math import sqrt
import itertools
import numpy as np
import scipy.stats as stats
from scipy.stats import norm

import statsmodels as sm
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant

# Bayesian Method

In [None]:
def atal_methode(x, grens, HBKRE):
    #gegevens HB en KRE uit de literatuur
    beta_afwijking = 0.35
    if HBKRE == "HB":
        bio_var = 0.027
        analyst_var = 0.0174
        var_tussen = 0.059
        ondergrens_bio_var = 0.017
    else:
        bio_var = 0.045
        analyst_var = 0.017
        var_tussen = 0.141
        ondergrens_bio_var = 0.042
    
    mu_mu = np.mean(grens)
    mu_sigma = mu_mu * var_tussen
    sigma_mu = mu_mu * np.sqrt(bio_var**2 + analyst_var**2)
    sigma_sigma = beta_afwijking * sigma_mu
    
    minimale_var = sqrt(ondergrens_bio_var**2 + analyst_var**2) * np.mean(x)
    
    #aanmaken parameter schatters
    x_max = np.max(x)
    x_min = np.min(x)

    if HBKRE == "HB":
        if x_max - x_min > 2.5:
            mu_i = np.linspace(x_min*0.80, x_max*1.20, 30)
        else:
            mu_i = np.arange(x_min*0.80, x_max*1.21, 0.1)    
    else:
        if x_max - x_min > 40:
            mu_i = np.linspace(x_min*0.80, x_max*1.20, 30)
        else:
            mu_i = np.arange(x_min*0.80, x_max*1.20 + 1.5, 1.5)
        
    if HBKRE == "HB":
        sigma_j = np.linspace(minimale_var, 1.2, 30)
    else:
        if x_max > 130:
            sigma_j = np.linspace(minimale_var, 35, 50)
        else:
            sigma_j = np.linspace(minimale_var, 12, 30)
            
    mu, sigma = np.meshgrid(mu_i, sigma_j)
    bayes = np.column_stack((mu.ravel(), sigma.ravel()))
    
    likelihoods = norm.pdf(x[:, np.newaxis], bayes[:,0], bayes[:,1])
    bayes_likelihood = np.prod(likelihoods, axis=0)
    
    #P(mu_i, sigma_j)
    #probability van mu sigma uit de a-priori verdeling
    prior = norm.pdf(bayes[:,0], loc=mu_mu, scale=mu_sigma) * norm.pdf(bayes[:,1], loc=sigma_mu, scale=sigma_sigma)
    
    #Vermenigvuldig de bovenstaande kansen
    prob = bayes_likelihood * prior
    
    #De totale kans wordt berekend om de kansen te normaliseren
    tot_prob = np.sum(prob)
    
    #Genormaliseerde kansen
    norm_prob = prob / tot_prob

    #Nieuwe waarden aanmaken voor probability berekening
    if HBKRE == "HB":
        waarde_nieuw = np.linspace(x_min * 0.75, x_max * 1.25, 40)
    else:
        waarde_nieuw = np.linspace(x_min * 0.75, x_max * 1.25, 40)
    
    #Matrix voor elke combinatie van mu en sigma met de mogelijke nieuwe waarden
    matrix_waarde_nieuw =  np.zeros((len(bayes), len(waarde_nieuw)))

    
    #Kansberekening voor elke combinatie van mu en sigma met mogelijke nieuwe waarde
    for k, waarde in enumerate(waarde_nieuw):
        #iterate over each row of bayes
        for j in range(len(bayes)):
            matrix_waarde_nieuw[j,k] = norm.pdf(waarde, loc=bayes[j,0], scale=bayes[j,1]) * norm_prob[j]
    
    #Kansen normaliseren
    kansen_waarde_nieuw = np.sum(matrix_waarde_nieuw, axis=0) / np.sum(matrix_waarde_nieuw)
    
    #Beste parameter schatter
    mu_post = np.sum(kansen_waarde_nieuw * waarde_nieuw)
    sigma_post = np.sqrt(np.sum(kansen_waarde_nieuw * (waarde_nieuw - mu_post)**2))
    
    #95% CI van de voorspelling
    ondergrens = mu_post-1.96*sigma_post
    bovengrens = mu_post+1.96*sigma_post
    
    res = np.array([ondergrens, bovengrens])
    return res

# Homeostatic method

In [None]:
def dixon_test(data, left=True, right=True):
    '''
    Dit is een functie voor Dixon test die ik online heb gevonden. 
    Sebastian Raschka heeft deze op zijn website staan.
    Het is overigens belangrijk om te bedenken met welke confidence er gewerkt wordt.
    Deze functie is nu gebaseerd op 95% confidence. 
    Bij wijziging meoten de waarden in Q95 ook aangepast worden
    
    Keyword arguments:
        data = A ordered or unordered list of data points (int or float).
        left = Q-test of minimum value in the ordered list if True
        right = Q-test of maximum value in the ordered list if True
        q_dict = A dictionary of Q-values for a given confidence level,
            where the dict. keys are sample sizes N, and the associated values 
            are the corresponding critical Q values.
            
    Returns a list of 2 values for the outliers, or None.   
    '''
    q_dict = {
        3: 0.941, 4: 0.765, 5: 0.642, 6: 0.560, 7: 0.507, 8: 0.468, 9: 0.437,
        10: 0.412, 11: 0.392, 12: 0.378, 13: 0.361, 14: 0.349, 15: 0.338, 16: 0.329,
        17: 0.320, 18: 0.313, 19: 0.306, 20: 0.300, 21: 0.295, 22: 0.290, 23: 0.285,
        24: 0.281, 25: 0.277, 26: 0.273, 27: 0.270, 28: 0.267, 29: 0.263, 30: 0.260
    } 
    
    assert(left or right), 'At least on of the variables, `left` or `right`, must be True.'
    assert(len(data) >= 3), 'At least 3 data points are required'
    assert(len(data) <= max(q_dict.keys())), 'Sample size too large'
    
    sdata = sorted(data)
    Q_mindiff, Q_maxdiff = (0, 0), (0, 0)
    
    if left:
        Q_min = (sdata[1] - sdata[0])
        try:
            Q_min /= (sdata[-1] - sdata[0])
        except ZeroDivisionError:
            pass
        Q_minddiff = (Q_min - q_dict[len(data)], sdata[0])
        
    if right:
        Q_max = abs((sdata[-2] - sdata[-1]))
        try:
            Q_max /= abs((sdata[0] - sdata[-1]))
        except ZeroDivisionError:
            pass
            Q_maxdiff = (Q_max - q_dict[len(data)], sdata[-1])
    
    if not Q_mindiff[0] > 0 and not Q_maxdiff[0] > 0:
        outliers = [None, None]
        
    elif Q_mindiff[0] == Q_maxdiff[0]:
        outliers = [Q_mindiff[1], Q_maxdiff[1]]
        
    elif Q_mindiff[0] > Q_maxdiff[0]:
        outliers = [Q_mindiff[1], None]
        
    else:
        outliers = [None, Q_maxdiff[1]]
        
    return outliers
    

def coskun_methode(y):
    alpha = 0.05
    y = np.array(y)
    y_non_na = y[~np.isnan(y)]
    n = len(y_non_na)
    
    if n >= 3 and np.all(y_non_na != 0):
        x = np.arange(1, (n + 1))
        
        #linear regression
        x_with_const = add_constant(x)
        model = OLS(y_non_na, x_with_const).fit(method='qr')
        
        slope = model.params[1]
        #calculate alternative steyx (instead of model.bse[1]) to match R approach
        steyx = np.sqrt(np.sum(model.resid**2)/ model.df_resid)
        
        #confidence intervals based on residual standard error
        s_ll = slope - steyx
        s_ul = slope + steyx
        
        #trend: s_ll * s_ul
        trend = s_ll * s_ul
        
        #Dixon test for outliers
        try:
            outliers = dixon_test(y_non_na)
            has_outlier = any(outliers)
            if has_outlier:
                y_non_na = y_non_na[~np.isin(y_non_na, outliers)]
                dix_res = f'Outliers are present, obs y={outlier_value} are removed'
            else:
                dix_res = "No outliers at 5% significance level"
        except ValueError as e:
            return {
                'LL': np.nan,
                'UL': np.nan,
                'Range': np.nan,
                'trend_test': str(e),
                'out_test': str(e)
            }

        
        #Check if trend is present
        if s_ll <= 0 <= s_ul:
            #Compute reference intervals
            tvset = stats.t.ppf(1-(alpha/2), df=n-1) * np.sqrt((n+1)/n) * np.std(y_non_na, ddof=1)
            ll = np.mean(y_non_na) - tvset
            ul = np.mean(y_non_na) + tvset
            range_ri = ul - ll
            trend_res = "Trend is not present"
        else:
            ll = ul = range_ri = np.nan
            trend_res = "Trend is present - intervals are not computed"
        
    else:
        ll = ul = range_i = npp.nan
        trend_res = "Not computed, n <  3"
        dix_res = "Not computed, n < 3"
        
    return ll, ul, range_ri, trend_res, dix_res


# Pipeline

In [None]:
def pipeline(df):
    results = []
    
    #iteratie over unieke metingen (HB en KRE)
    for HBKRE in df['meting'].unique():
        print(f'Verwerking {HBKRE} metingen...')
        
        sub_group = df[df['meting'] == HBKRE]
        aantal_patients = len(sub_group['patientID'].unique())
        huidige_patient_teller = 0

        #iteratie over unieke patienten 
        for patient in sub_group['patientID'].unique():
            huidige_patient_teller += 1
            print(f'    Verwerking patient {huidige_patient_teller} van {aantal_patients}...')
            
            patient_data = sub_group[sub_group.patientID == patient]
            x = patient_data['meetresultaat'].values 
            bestaande_interval = patient_data[['lg', 'rg']].iloc[0]

            #Atal methode toepassen (bayesiaans)
            ondergrens_atal, bovengrens_atal = atal_methode(x, bestaande_interval, HBKRE)

            results.append(pd.Series({
                'patient': patient,
                'meting': HBKRE, 
                'methode': 'ATAL',
                'ondergrens': ondergrens_atal,
                'bovengrens': bovengrens_atal
            }))

            #Coskun methode toepassen (homeostasis)
            ondergrens_coskun, bovengrens_coskun, _, _, _  = coskun_methode(x)

            results.append(pd.Series({
                'patient': patient,
                'meting': HBKRE, 
                'methode': 'COSKUN',
                'ondergrens': ondergrens_coskun,
                'bovengrens': bovengrens_coskun
            }))


    print('Pipeline verwerking compleet!')
    return pd.DataFrame(results)

In [None]:
df = pd.read_csv('dataset.csv').drop(['Unnamed: 0'], axis=1)
df.datum_meting = pd.to_datetime(df.datum_meting)

In [None]:
result_df = pipeline(df)