In [None]:
import numpy as np
from scipy.optimize import minimize_scalar
from scipy.stats import t as student_t
from sklearn.metrics import adjusted_rand_score  
import matplotlib.pyplot as plt
import math
from typing import Callable, Tuple, List, Dict

In [None]:

QuadPiece = Tuple[float, float, float, float, float, int]
#quadpiece is a tuple containing:
#first float a = left bound of the interval
#second flat b: right bound of the interval
#3 other floats A,B,C = coefficients of the quadratic polynom on interval [a,b]: AX^2+BX=c 
#last int tau = index of last changepoint 


INF = 1e18 #To represent infinity 
EPS = 1e-12



#algorithm 2 from appendix D 
def rfpop_algorithm2_add_Qstar_and_gamma(Qstar_pieces: List[QuadPiece], 
                                         #Q_star est représenté sous forme [ (a1,b1,A1,B1,C1,tau1), (a2,b2,A2,B2,C2,tau2), ..., ]
                                         gamma_pieces: List[QuadPiece]):

    out: List[QuadPiece] = []
    i = 0
    j = 0
    while i < len(Qstar_pieces) and j < len(gamma_pieces):
        pa, pb, pA, pB, pC, p_tau = Qstar_pieces[i]
        ga, gb, gA, gB, gC, g_tau = gamma_pieces[j]

        #new interval 
        a = max(pa, ga)
        b = min(pb, gb)
        
        # sum of Q_star and gamma on the new interval
        newA = pA + gA
        newB = pB + gB
        newC = pC + gC
        # tau comes from Q* (p_tau) 
        out.append((a, b, newA, newB, newC, p_tau))
        # avancer indices selon bornes
        if abs(b - pb) < 1e-12:
            i += 1
        if abs(b - gb) < 1e-12:
            j += 1
        # to avoid infinite loop
        if (i < len(Qstar_pieces) and j < len(gamma_pieces)) and a >= b - 1e-14 and abs(b - Qstar_pieces[i][1]) > 1e-12 and abs(b - gamma_pieces[j][1]) > 1e-12:
            break
    # to do as they say on page 12 and merge identical neighbouring intervals
    if not out:
        return []
    merged: List[QuadPiece] = [out[0]]
    for pc in out[1:]:
        a,b,A,B,C,tau = pc
        ma,mb,mA,mB,mC,mtau = merged[-1]
        if (abs(A - mA) < 1e-14 and abs(B - mB) < 1e-14 and abs(C - mC) < 1e-9 and tau == mtau and abs(a - mb) < 1e-9):
            merged[-1] = (ma, b, mA, mB, mC, mtau)
        else:
            merged.append(pc)
    return merged



In [None]:
def rfpop_algorithm3_min_over_theta(Qt_pieces: List[QuadPiece]) -> Tuple[float, int]:
    """
    Algorithme 3 (Appendice D) :
    Pour Qt partitionnée, calculer min_theta Qt(theta) et renvoyer (val_min, tau_argmin).
    - Pour chaque pièce quadratique on calcule l'argmin interne (-B/(2A)) si A != 0,
      sinon on évalue aux bornes internes ; on projette sur (a,b].
    - On retourne la valeur minimale et le tau associé à la pièce où le min est atteint.
    """
    best_val = float('inf')
    best_tau = 0
    for (a,b,A,B,C,tau) in Qt_pieces:
        # calculer argmin sur (a,b] (projeter si nécessaire)
        if abs(A) < 1e-16:
            # linéaire ou constante : comparer point intérieur proche de a et b
            left = a + 1e-12
            right = b
            vleft = A*left*left + B*left + C
            vright = A*right*right + B*right + C
            theta_star = left if vleft <= vright else right
        else:
            theta_star = -B / (2.0 * A)
            if theta_star <= a:
                theta_star = a + 1e-12
            elif theta_star > b:
                theta_star = b
        val = A*theta_star*theta_star + B*theta_star + C
        if val < best_val:
            best_val = val
            best_tau = tau
    return float(best_val), int(best_tau)

def rfpop_algorithm4_prune_compare_to_constant(Qt_pieces: List[QuadPiece],
                                               Qt_val: float,
                                               beta: float,
                                               t_index_for_new: int) -> List[QuadPiece]:
    """
    Algorithme 4 (Appendice D) :
    Génère Q*_{t+1} en comparant Qt(theta) aux constantes Qt_val + beta.
    - Pour chaque pièce p, résoudre p(theta) = Qt_val + beta (équation quadratique)
      pour obtenir des racines dans (a,b], découper l'intervalle et garder,
      sur chaque sous-intervalle, la quadratique si p(mid) <= thr, sinon remplacer par la constante thr
      avec tau = t_index_for_new.
    - Retourne la partition fusionnée.
    """
    thr = Qt_val + beta
    out: List[QuadPiece] = []
    for (a,b,A,B,C,tau) in Qt_pieces:
        # résoudre A x^2 + B x + (C - thr) = 0 sur [a,b]
        roots: List[float] = []
        if abs(A) < 1e-16:
            # cas quasi-linéaire B x + (C - thr) = 0
            if abs(B) > 1e-16:
                x = -(C - thr) / B
                if x + EPS >= a and x - EPS <= b:
                    x_clamped = max(min(x, b), a)
                    roots.append(x_clamped)
        else:
            D = B*B - 4.0*A*(C - thr)
            if D >= -1e-14:
                D = max(D, 0.0)
                sqrtD = math.sqrt(D)
                x1 = (-B - sqrtD) / (2.0*A)
                x2 = (-B + sqrtD) / (2.0*A)
                for x in (x1, x2):
                    if x + EPS >= a and x - EPS <= b:
                        x_clamped = max(min(x, b), a)
                        # éviter doublons
                        if not any(abs(x_clamped - r) < 1e-9 for r in roots):
                            roots.append(x_clamped)
        roots.sort()
        # découper l'intervalle en [a, r1, r2, ..., b]
        breaks = [a] + roots + [b]
        for k in range(len(breaks)-1):
            lo = breaks[k]
            hi = breaks[k+1]
            mid = (lo + hi) / 2.0
            val_mid = A*mid*mid + B*mid + C
            if val_mid <= thr + 1e-12:
                # garder la quadratique originale sur (lo,hi]
                out.append((lo, hi, A, B, C, tau))
            else:
                # remplacer par la constante thr et tau = t_index_for_new
                out.append((lo, hi, 0.0, 0.0, thr, t_index_for_new))
    # fusionner adjacences égales
    if not out:
        return []
    merged: List[QuadPiece] = [out[0]]
    for pc in out[1:]:
        a,b,A,B,C,tau = pc
        ma,mb,mA,mB,mC,mtau = merged[-1]
        if (abs(A - mA) < 1e-14 and abs(B - mB) < 1e-14 and abs(C - mC) < 1e-9 and tau == mtau and abs(a - mb) < 1e-9):
            merged[-1] = (ma, b, mA, mB, mC, mtau)
        else:
            merged.append(pc)
    return merged

def rfpop_algorithm1_main(y: List[float], gamma_builder, beta: float):
    """
    Algorithme 1 (Appendice D) - boucle principale R-FPOP.
    - y : liste/array des observations (index 0..n-1)
    - gamma_builder : fonction (y_t: float, tau_idx: int) -> List[QuadPiece] représentant gamma(y_t,theta)
    - beta : pénalité

    Retourne :
      - cp_tau : liste tau_t pour t=0..n-1 (entiers)
      - Qt_vals : liste Qt minima pour t=0..n-1
      - Qstar_final : partition Q*_{n+1} (liste de QuadPiece)
    """
    y_arr = np.asarray(y, dtype=float)
    n = len(y_arr)
    # initialisation Q*_1(theta) = 0 sur un intervalle borné large
    lo = float(np.min(y_arr)) - 1.0
    hi = float(np.max(y_arr)) + 1.0
    Qstar = [(lo, hi, 0.0, 0.0, 0.0, 0)]  # Q*_1
    cp_tau = [0] * n
    Qt_vals = [0.0] * n
    for t_idx in range(n):
        # Sub-routine 2 : Qt = Q*_t + gamma(y_t)
        gamma_pcs = gamma_builder(float(y_arr[t_idx]), t_idx)
        Qt_pcs = rfpop_algorithm2_add_Qstar_and_gamma(Qstar, gamma_pcs)
        # Sub-routine 3 : min over theta Qt
        Qt_val, tau_t = rfpop_algorithm3_min_over_theta(Qt_pcs)
        cp_tau[t_idx] = tau_t
        Qt_vals[t_idx] = Qt_val
        # Sub-routine 4 : compute Q*_{t+1}
        Qstar = rfpop_algorithm4_prune_compare_to_constant(Qt_pcs, Qt_val, beta, t_idx)
    return cp_tau, Qt_vals, Qstar

# Exemple d'utilisation (commenté) :
# def gamma_builder_biweight(y_t, tau_idx, K=3.0):
#     return [(-INF, y_t - K, 0.0, 0.0, K*K, tau_idx),
#             (y_t - K, y_t + K, 1.0, -2.0*y_t, y_t*y_t, tau_idx),
#             (y_t + K, INF, 0.0, 0.0, K*K, tau_idx)]
#
# y = [0.1, 0.0, 0.2, 5.0, 5.1, 5.2, 0.3, 0.2]
# beta = 2.0
# cp_tau, Qt_vals, Qstar_final = rfpop_algorithm1_main(y, lambda y_t, t: gamma_builder_biweight(y_t, t, K=1.0), beta)
# print("tau_t:", cp_tau)
