L'objectif de ce TP est de réimplémenter vous-mêmes le modèle décrit dans la publication scientifique suivante : 

- https://www.frontiersin.org/journals/applied-mathematics-and-statistics/articles/10.3389/fams.2022.836349/full

C'est à vous !

## Partie I - Implémentation de base

In [None]:
# imports
import numpy as np
from scipy.optimize import minimize
import scipy.stats as ss
# ...

In [None]:
# Helper functions:
def generate_data(seed=121, nsteps=365, gt_sig=0.25, gt_eps=0.4, gt_pOut_g=0.03, nb_stds=2, init=11.0):
    # Script de génération des données. Ne pas modifier cette cellule dans le cadre de ce TP.
    
    # Initialisation de la graine :
    np.random.seed(seed)
    
    # Génération du vecteur de dates :
    dates = []
    dates.append(pd.to_datetime('2021-01-01'))
    
    for i in range(1,nsteps):
        dates.append(dates[0] + datetime.timedelta(days=i))
    
    df = pd.DataFrame()
    df['dates'] = dates
    df['weekday'] = df.dates.dt.weekday
    df['id'] = df.index.tolist()
    
    # Génération du processus latent X[t] selon un modele RW(1) :
    Xt = np.zeros(nsteps)
    Xt[0] = init # premier point fixé de manière à obtenir des valeurs crédibles pour le SARS-CoV-2
    for i in range(1,nsteps):
        Xt[i] = Xt[i-1] + np.random.normal(loc=0, scale=gt_sig)
    
    df['Xt'] = Xt
    
    # Génération du vecteur d'observations Y[t] dans le cas nominal :
    Yt = np.zeros(df.shape[0])
    for i in range(df.shape[0]):
        Yt[i] = Xt[i] + np.random.normal(loc=0, scale=gt_eps)
    
    df.loc[df.weekday.isin([1,2,3,5,6]), 'id'] = np.nan # on n'observe que les jours 0 et 4 (lundi et vendredi)
    obsAt = np.array(df.loc[~df['id'].isna()].index.tolist())
    
    # Mise à jour de la graine pour les outliers :
    np.random.seed(seed + 33)
    
    # Génération des outliers :
    Y_std = np.nanstd(Yt)
    a = np.nanmin(Yt) - nb_stds*Y_std
    b = np.nanmax(Yt) + nb_stds*Y_std
    outliers_indexes = np.random.choice(obsAt, size=int(obsAt.shape[0]*gt_pOut_g), replace=False)
    Yt[outliers_indexes] = np.random.uniform(low=a, high=b, size=outliers_indexes.shape[0])
    
    # Ajout de la censure sur 20% des échantillons :
    thresh = np.percentile(df.loc[~df.Xt.isna()].Xt.values, 20)
    df['Yt'] = Yt
    df.loc[df.Yt<= thresh, 'Yt'] = thresh
    df.loc[df.id.isna(), 'Yt'] = np.nan   

    return df, outliers_indexes, thresh

def visualize_raw_data(df, outliers_indexes):
    LABEL_SIZE = 30
    TICK_SIZE = 30
    TITLE_SIZE = 38
    LEGEND_SIZE = 30
    DATES_SIZE = 18
    figsize = (28, 10) 
    
    plt.rc('axes', labelsize=LABEL_SIZE)
    plt.rc('xtick', labelsize=TICK_SIZE)   
    plt.rc('ytick', labelsize=TICK_SIZE)
    plt.rc('figure', titlesize=TITLE_SIZE)
    plt.rc('legend', fontsize=LEGEND_SIZE)
    
    fig = plt.figure(figsize=figsize, layout="constrained")
    
    ax_dict = fig.subplot_mosaic(
        """
        A
        """
    )
    
    ### A
    ax_dict['A'].plot(df.dates.values, df.Xt.values, label='$X_t$', color='deepskyblue', linewidth=10)
    ax_dict['A'].plot(df.dates.values, df.Xt.values, color='black', linewidth=3)
    ax_dict['A'].scatter(df.dates.values, df.Yt.values, label='$\hat{X}_t$', color='orange', edgecolor='black', s=360, zorder=1,
                        linewidths=1.5, alpha=0.9)
    
    outliers_points = ax_dict['A'].scatter(df.dates.values[outliers_indexes], df.Yt.values[outliers_indexes],
                         color='none', edgecolor='red', s=520, zorder=2, linewidth=5, label="True outliers")
    
    
    ax_dict['A'].set_ylabel("Gene concentration (GU/L) - Log scale")
    ax_dict['A'].set_xlabel("Sampling date")
    ax_dict['A'].tick_params(axis='x', labelsize=TICK_SIZE)
    ax_dict['A'].tick_params(axis='y', labelsize=TICK_SIZE)
    
    plt.rcParams['text.usetex'] = False
    h1, l1 = ax_dict['A'].get_legend_handles_labels()
    fig.legend(h1, l1, loc='upper center', bbox_to_anchor=(0.5, 0), fancybox=True, shadow=True, ncol=3)
    plt.grid()
    plt.show()    

def visualize_smoothed_data(df, outliers_indexes, scou, solution, visualize_solution=True):
    LABEL_SIZE = 30
    TICK_SIZE = 30
    TITLE_SIZE = 38
    LEGEND_SIZE = 30
    DATES_SIZE = 18
    figsize = (28, 10)
    
    plt.rc('axes', labelsize=LABEL_SIZE)
    plt.rc('xtick', labelsize=TICK_SIZE)   
    plt.rc('ytick', labelsize=TICK_SIZE)
    plt.rc('figure', titlesize=TITLE_SIZE)
    plt.rc('legend', fontsize=LEGEND_SIZE)
    plt.rcParams['text.usetex'] = False#True
    
    fig = plt.figure(figsize=figsize, layout="constrained")
    
    ax_dict = fig.subplot_mosaic(
        """
        A
        """
    )
    
    ### A
    ax_dict['A'].plot(df.dates.values, df.Xt.values, label='$X_t$', color='deepskyblue', linewidth=8, zorder=3)
    ax_dict['A'].plot(df.dates.values, df.Xt.values, color='black', linewidth=3, zorder=3)

    if visualize_solution:
        ax_dict['A'].plot(df.dates.values, solution.muX.values, label='$X^*_{t_{opt}}$ - Solution', color='green', linewidth=8, zorder=3)
    ax_dict['A'].plot(df.dates.values, scou.muX, label='$X^*_{t_{opt}}$ - Your implementation', color='orange', linewidth=5, zorder=3)
    
    scatter_points = ax_dict['A'].scatter(df.dates.values, df.Yt.values, label='$\hat{X}_t$', 
                         c=scou.pointwise_pout,
                         cmap='bwr', edgecolor='black', s=360, zorder=2,
                         linewidths=1.5, alpha=0.9, vmin=0, vmax=1)
    
    censored_observations = df.loc[df.Yt<=thresh]
    
    censored_points = ax_dict['A'].scatter(censored_observations.dates.values,
                                           censored_observations.Yt.values, label='$\hat{X}_{t_{Censored}}$', 
                         color='none', edgecolor='darkorange', s=520, zorder=1, linewidth=5)
    
    outliers_points = ax_dict['A'].scatter(df.dates.values[outliers_indexes], df.Yt.values[outliers_indexes],
                         color='none', edgecolor='red', s=520, zorder=1, linewidth=5, label="True outliers")
    
    ax_dict['A'].set_ylabel("Gene concentration (GU/L) - Log scale")
    ax_dict['A'].set_xlabel("Sampling date")
    ax_dict['A'].tick_params(axis='x', labelsize=TICK_SIZE)
    ax_dict['A'].tick_params(axis='y', labelsize=TICK_SIZE)
    ax_dict['A'].grid(linewidth=1, color='black', alpha=0.8)
    
    ### Outlier probability legend:
    cmin, cmax = 0.0, 1.0 
    axins1 = inset_axes(ax_dict['A'], width='2%', height='100%', loc='right', borderpad=0)
    axins1.grid(False)
    cbar = fig.colorbar(scatter_points, cax=axins1, orientation='vertical')
    
    # Setting tick limits:
    cbar.set_ticks([cmin, cmax])
    cbar.ax.yaxis.set_major_locator(ticker.FixedLocator([cmin, cmax]))
    
    # Standardizing the float format displayed:
    decimal_places = 1
    cbar.ax.set_yticklabels([f"{cmin:.{decimal_places}f}", f"{cmax:.{decimal_places}f}"], fontsize=TICK_SIZE)
    
    # Placing the label at the right spot:
    cbar.ax.set_ylabel('Outlier probability', size=TICK_SIZE, labelpad=-35)
    
    # Main legend
    plt.rcParams['text.usetex'] = False
    h1, l1 = ax_dict['A'].get_legend_handles_labels()
    fig.legend(h1, l1, loc='upper center', bbox_to_anchor=(0.5, 0), fancybox=True, shadow=True, ncol=6)
    plt.grid()
    plt.show()

# Fonction de validation des résultats, ne pas modifier

def cmp(s, scou, solution_dict, rtol):

    if s=='pointwise_pout':
        tmp1 = getattr(scou, s)
        tmp2 = solution_dict[s]

        tmp1 = tmp1[np.where(~np.isnan(tmp1))]
        tmp2 = tmp2[np.where(~np.isnan(tmp2))]

    else:
        tmp1 = getattr(scou, s)
        tmp2 = solution_dict[s]
        
    maxdiff = np.max(np.abs(tmp1 - tmp2))
    ex = np.allclose(tmp1, tmp2, rtol)
    
    print(f'{s:15s} | exact: {str(ex):5s} | maxdiff: {maxdiff}')

In [None]:
class SCOU():

    def __init__(self, observations, lod_vect,                                                          # SCOU's inputs
                 nb_states,                                                                             # SCOU's hyperparameter
                 solver='Nelder-Mead', opt_tol=5e-3, max_iter=600, adaptive=True, disp=True):           # Nelder Mead's parameters

        self.observations = observations
        self.lod_vect = lod_vect
        self.nb_states = nb_states
        self.solver = solver
        self.opt_tol = opt_tol
        self.max_iter = max_iter
        self.adaptive = adaptive
        self.disp = disp

        self.init_parameters_vertex()
        self.get_T_ronde()
        self.get_X_ronde()
        self.observations_below_LoD = # ...
        self.observations_above_LoD = # ...
        self.get_outlier_emission_parts()

    def init_parameters_vertex(self):
        self.epsilon = np.nanstd(self.observations)
        self.sigma = self.epsilon * 0.5
        self.p_out = 0.1
    
    def get_T_ronde(self):
        # ...
        
    def get_X_ronde(self):
        # ...    
        
    def compute_transition_matrix(self):
        # ...
        
    def get_outlier_emission_parts(self):
        # ...    
        
    def compute_emission_matrix(self):
        # ...
        
    def compute_forward_matrix(self):
        # ...

    def compute_backward_matrix(self):
        # ...

    def compute_log_likelihood(self, params):
        self.sigma, self.epsilon, self.p_out = params
        self.compute_forward_matrix()
        LL = # ...

        return -LL
        
    def fit(self):
        initial_guess = [self.sigma, self.epsilon, self.p_out]
        result = minimize(fun=self.compute_log_likelihood, x0=initial_guess, method=self.solver, 
                          options={'maxiter':self.max_iter, 'xatol':self.opt_tol, 'disp':self.disp, 'adaptive': self.adaptive})
        
    def predict(self):
        # ...

    def compute_pointwise_outlier_probabilities(self):
        # ...

### Application à des données synthétiques

In [None]:
import pandas as pd
import datetime
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import matplotlib.ticker as ticker
import time

Il est maintenant l'heure de vérifier si votre implémentation est correcte ! Essayez de l'appliquer aux données suivantes :

In [None]:
df, outliers_indexes, thresh = generate_data()

In [None]:
visualize_raw_data(df, outliers_indexes)

In [None]:
lod_vect = np.ones(df.shape[0]) * thresh
observation_vect = df.Yt.values
nb_states = 600 

t0 = time.time()
scou = SCOU(observation_vect, lod_vect=lod_vect, nb_states=nb_states)
scou.fit()
scou.predict()
scou.compute_pointwise_outlier_probabilities()
t1 = time.time()

print("Computation took " + str(t1 - t0) + "seconds.")

In [None]:
solution = pd.read_csv('III_solution.csv', sep=";")
solution_dict = {}

for parameter in ['sigma', 'epsilon', 'p_out']:
    solution_dict[parameter] = solution[parameter].values[0]
for parameter in ['muX', 'IC95_lower', 'IC95_upper', 'pointwise_pout']:
    solution_dict[parameter] = solution[parameter].values


In [None]:
visualize_smoothed_data(df, outliers_indexes, scou, solution)

In [None]:
# Ne pas modifier cette cellule
rtol = 1e-2 
rtol2 = 1e-1

cmp('sigma', scou, solution_dict, rtol)
cmp('epsilon', scou, solution_dict, rtol)
cmp('p_out', scou, solution_dict, rtol)
cmp('muX', scou, solution_dict, rtol)
cmp('IC95_lower', scou, solution_dict, rtol)
cmp('IC95_upper', scou, solution_dict, rtol)
cmp('pointwise_pout', scou, solution_dict, rtol2)

Si toutes les vérifications sont à True dans la cellule ci-dessus, vous pouvez passer à la suite !

## Partie II - Optimisation de l'implémentation

Essayez maintenant d'appliquer votre implémentation au jeu de données ci-dessous. Que se passe-t-il ? Avez-vous une idée d'où peut venir le problème ?

In [None]:
df, outliers_indexes, thresh = generate_data(seed=122, nsteps=365*6, init=np.random.uniform())

In [None]:
visualize_raw_data(df, outliers_indexes)

In [None]:
lod_vect = np.ones(df.shape[0]) * thresh
observation_vect = df.Yt.values
nb_states = 600 

t0 = time.time()
scou = SCOU(observation_vect, lod_vect=lod_vect, nb_states=nb_states, max_iter=150)
scou.fit()
scou.predict()
scou.compute_pointwise_outlier_probabilities()
t1 = time.time()



print("Computation took " + str(t1 - t0) + "seconds.")

In [None]:
visualize_smoothed_data(df, outliers_indexes, scou, solution=None, visualize_solution=False)