# __`Calcolo delle distribuzioni stazionarie`__
Osserviamo come viene calcolato il vettore $\pi$ della distrubuzione stazionaria in varie librerie e <br>
implementiamo vari metodi e confrontiamo tempi e precisione.

## `Librerie`

In [2]:
import time
import copy

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla
import networkx as nx
import matplotlib.pyplot as plt

from quantecon import gth_solve
from pydtmc import MarkovChain

## `Funzioni`

### `Generazione di P`

#### P irriducibile

> **Nota sulla generazione**: questa funzione è adatta per costruire matrici di transizione $P$ casuali, con un grado controllabile di sparsità, garantendo l'irriducibilità tramite controllo di forte connessione su $G(P)$.  
>
> Tuttavia, quando il numero di zeri richiesto si avvicina al massimo teorico $n(n-1)$, trovare una matrice fortemente connessa diventa sempre più difficile, e i tempi di generazione crescono drasticamente.  
> Inoltre, anche se $P$ risulta irriducibile, un numero troppo basso di archi può portare a **periodicità**, rendendo il **Power Method inefficace**.
>
> In questi casi estremi è preferibile usare una generazione alternativa con struttura più controllata (es. `genera_P_k_random`), che garantisce a priori la forte connessione e, con un minimo numero di archi extra, anche l’aperiodicità.

In [3]:
def genera_P_irriducibile(n, zeros=0, seed=123, tentativi=50000):
    """

    Genera P (nxn) casuale, riproducibile e irriducibile.
    Affidandosi al generatore interno di pydctm

    Args:
        n (int)                 :            dimensione.
        zeros (int)             :            zeri totali nella matrice (anche nella diagonale).
        seed (int)              :            seed iniziale.
        tentativi (int)         :            tentativi massimi.

    Returns:
        np.ndarray              :            matrice di transizione P.

    Raises:
        RuntimeError            :            se non trova una matrice connessa nei tentativi previsti.
    
    """
    
    for k in range(tentativi):

        P = MarkovChain.random(n, zeros=zeros, seed=seed + k).p

        X = nx.from_numpy_array(P, create_using=nx.DiGraph)

        if nx.is_strongly_connected(X):
            return P
        
    raise RuntimeError(f"Non ho P connessa per n={n}, zeros={zeros}")

#### P irriducibile con k archi per nodo

> **Nota sulla generazione**: questo approccio garantisce la $\textbf{forte connessione}$ della matrice $P$ per costruzione,  
> grazie all’arco obbligato $( i \rightarrow (i+1) \mod n )$ che forma un ciclo completo tra tutti i nodi.  
>
> Aggiungendo poi \( k-1 \) archi casuali per riga, si rompe facilmente anche la $\textbf{periodicità}$ del ciclo, rendendo la matrice adatta all'uso del Power Method.  
>
> Questo metodo è particolarmente utile quando si desidera generare matrici $\emph{molto sparse}$, ma con garanzie strutturali.  
> A differenza della funzione `genera_P_irriducibile`, qui non è necessario alcun controllo esplicito sulla connettività,  
> e i tempi di generazione restano contenuti anche con pochissimi archi (es. $ k = 2 $).

In [4]:
def genera_P_k_random(n, k, seed=123):
    """

    Genera P (nxn) irriducibile con:
      - un arco obbligato {i→(i+1)} mod n
      - altri k-1 archi random (possono includere la diagonale)

    Args:
        n (int)                 :           dimensione
        k (int)                 :           numero totale di archi per riga (deve essere 1 ≤ k ≤ n)
        seed (int)              :           seed per RNG

    Returns:
        np.ndarray              :           matrice di transizione P

    """
    
    if not (1 <= k <= n):
        raise ValueError("Serve 1 ≤ k ≤ n")
    
    rng = np.random.default_rng(seed)
    P = np.zeros((n, n))
    
    
    for i in range(n):
        # arco obbligato
        targets = {(i + 1) % n}
        if k > 1:
            # scelgo altri k-1 target (possono includere la diagonale i)
            possible_j = [j for j in range(n) if j != (i + 1) % n]
            targets.update(rng.choice(possible_j, size=k-1, replace=False))
      
        for j in targets:
            P[i, j] = rng.uniform(np.nextafter(0, 1), 1)

        # normalizzo la riga i affinché sommi a 1
        row_sum = P[i].sum()
        P[i] /= row_sum

    
    return P

### `Generazione di Q`

#### Q irriducibile

> **Nota sulla generazione**: questa funzione genera matrici generatore $Q$ **irriducibili** (fortemente connesse)  
> a partire da una struttura sparsa con zeri distribuiti casualmente fuori diagonale.  
>
> L’algoritmo impone un massimo di $n(n - 2)$ zeri fuori diagonale, per garantire che sia matematicamente possibile ottenere  
> un grafo fortemente connesso.  
>
> **Osservazioni:**
> - Quando il numero di zeri si avvicina al massimo consentito, la probabilità di ottenere una $Q$ fortemente connessa si riduce drasticamente, e può essere necessario un numero elevato di tentativi.
> - Anche se la matrice è fortemente connessa, un numero troppo basso di archi rende probabile la **periodicità** della catena. Tuttavia, nei test questa si è rivelata molto rara.
> - Questa funzione è più robusta rispetto alla generazione di $P$ sparse: riesce a garantire connettività anche in presenza di elevata sparsità.
>
> Utile quando si desiderano CTMC reversibili con controllo sulla sparsità e mantenimento dell’ergodicità.

In [5]:
def genera_Q_irriducibile(n, zeros=0, tasso_massimo=1.0, seed=123, tentativi=50000):
    """

    Genera Q (nxn) irriducibile.

    Args:
        n (int)                 :           dimensione.
        zeros (int)             :           zeri off-diagonali totali.
        tasso_massimo (float)   :           inserisce tassi tra 0 e tasso_massimo.
        seed (int)              :           seed iniziale.
        tentativi (int)         :           tentativi massimi.

    Returns:
        np.ndarray              :           generator matrix Q.

    """
    
    max_zeros = n * (n - 2)
    if zeros > max_zeros:
        raise ValueError(f"Numero di zeri incompatibile: massimo {max_zeros}")

    for k in range(tentativi):
        rng = np.random.default_rng(seed + k)
        Q = np.zeros((n, n))

        # Calcola quanti zeri per riga (distribuzione uniforme)
        zeros_per_row = zeros // n
        extra_zeros = zeros % n  # Zeri extra da distribuire

        # Assegna zeri per ogni riga
        for i in range(n):
            possible_j = [j for j in range(n) if j != i]  # Indici fuori diagonale
            num_zeros = zeros_per_row + (1 if i < extra_zeros else 0)
            
            zero_positions = rng.choice(possible_j, num_zeros, replace=False)
            for j in possible_j:
                if j not in zero_positions:
                    Q[i, j] = rng.uniform(np.nextafter(0, 1), tasso_massimo)

        # Calcola la diagonale
        Q[np.diag_indices(n)] = -Q.sum(axis=1)

        # Verifica irreducibilità
        if nx.is_strongly_connected(nx.from_numpy_array((Q > 0).astype(int), create_using=nx.DiGraph)):
            return Q

    raise RuntimeError(f"Non ho Q irreducibile per n={n}, zeros={zeros}")

#### Q irriducibile con k archi per nodo

> **Nota sulla generazione**: questa funzione costruisce matrici generatore $Q$ irriducibili,  
> fissando per ogni riga un arco obbligato $i \to (i+1) \bmod n$ per garantire la connessione,  
> e aggiungendo $k-1$ archi off-diagonali scelti a caso. I tassi sono estratti da $(0, \texttt{tasso\_massimo}]$,  
> mentre la diagonale viene calcolata per ottenere righe a somma nulla, come richiesto per i generatori CTMC. <br>
> La struttura ciclica garantisce la connettività, ma **non assicura di per sé l’aperiodicità**:  
> tuttavia, questa non è un requisito per metodi come GTH, sistemi lineari o autovalori.  
> Quando si usa il Power Method, invece, la matrice $Q$ viene trasformata con l’uniformizzazione  
> $P = I + Q / B$, che introduce autoloop e rompe la periodicità, rendendo il metodo sempre applicabile.

In [6]:
def genera_Q_k_random(n, k, tasso_massimo=1.0, seed=123):
    """
    Genera una matrice Q (nxn) irriducibile con:
      - arco obbligato i → (i+1)%n per garantire connessione
      - altri k-1 archi off-diagonali random per riga
      - pesi positivi ∈ (0, tasso_massimo]
      - diagonale negativa, righe a somma zero

    Args:
        n (int)                     :               dimensione della matrice
        k (int)                     :               archi totali per riga (deve essere ≥1 e ≤ n - 1)
        tasso_massimo (float)       :               massimo valore dei tassi
        seed (int)                  :               seed per random

    Returns:
        np.ndarray                  :               matrice generatore Q valida e irriducibile
    """
    if not (1 <= k <= n-1):
        raise ValueError("Serve 1 ≤ k ≤ n-1")

    rng = np.random.default_rng(seed)
    Q = np.zeros((n, n))

    for i in range(n):
        targets = {(i + 1) % n}  # arco obbligato

        pool = [j for j in range(n) if j != i and j != (i + 1) % n]
        num_extra = k - 1
        replace = num_extra > len(pool)

        extra = rng.choice(pool, size=num_extra, replace=False)
        targets.update(extra)

        for j in targets:
            Q[i, j] = rng.uniform(np.nextafter(0, 1), tasso_massimo)

    Q[np.diag_indices(n)] = -Q.sum(axis=1)

    G = nx.from_numpy_array((Q > 0).astype(int), create_using=nx.DiGraph)
    if nx.is_strongly_connected(G):
        return Q

    raise RuntimeError(f"Q non irriducibile con n={n}, k={k}, seed={seed}")

### `Mia implementazione di ght_solve()`

In [7]:
def my_gth_solve(P):

    """
    Calcola la distribuzione stazionaria π tramite algoritmo GTH.

    Args:
        P (np.ndarray)      :       matrice di transizione o generatore Metzler.

    Returns:
        np.ndarray          :       vettore stazionario π.
    """

    A = P.copy().astype(float)
    n = A.shape[0]
    pi = np.zeros(n)

    # Forward elimination senza pivot 
    for k in range(n - 1):
        scala = A[k, k+1:].sum()
        if scala <= 0:
            raise ValueError("Scala non positiva: catena non ergodica o input non valido.")
        for i in range(k + 1, n):
            A[i, k] /= scala
            for j in range(k + 1, n):
                A[i, j] += A[i, k] * A[k, j]

    # Backward substitution
    pi[-1] = 1.0
    for k in range(n - 2, -1, -1):
        for i in range(k + 1, n):
            pi[k] += A[i, k] * pi[i]

    # Normalizzazione finale
    pi /= pi.sum()
    return pi

### `Ricerca tramite risoluzione di sistema`

In [8]:
def sistema_numpy(P_or_Q, discrete=True):
    """

    Calcola π risolvendo il sistema (I - P^T)x = 0 (DTMC) o Q^T x = 0 (CTMC).

    Args:
        P_or_Q (np.ndarray)     :           matrice di transizione P o generatore Q.
        discrete (bool)         :           True per P, False per Q.

    Returns:
        np.ndarray              :           vettore stazionario π.

    """

    n = P_or_Q.shape[0]
    M = np.eye(n) - P_or_Q.T if discrete else P_or_Q.T.copy()
    M[-1, :] = 1
    b = np.zeros(n)
    b[-1] = 1
    pi, *_ = np.linalg.lstsq(M, b, rcond=None)
    pi /= pi.sum()
    return pi

In [9]:
def sistema_scipy(P_or_Q, discrete=True):
    """
    
    Calcola π risolvendo il sistema (I - P^T)x = 0 (DTMC) o Q^T x = 0 (CTMC).
    Versione con SciPy su matrici sparse.

    Args:
        P_or_Q (np.ndarray)     :           matrice di transizione P o generatore Q.
        discrete (bool)         :           True per P, False per Q.

    Returns:
        np.ndarray              :           vettore stazionario π.

    """

    n = P_or_Q.shape[0]
    M = np.eye(n) - P_or_Q.T if discrete else P_or_Q.T.copy()
    M[-1, :] = 1
    b = np.zeros(n)
    b[-1] = 1

    M_sparse = sp.csr_matrix(M)
    x = spla.spsolve(M_sparse, b)
    x /= x.sum()
    return x

### `Ricerca tramite spettro`

In [10]:
def solve_via_eig_numpy(P_or_Q, discrete=True):
    """
    Calcola π come autovettore sinistro dominante tramite NumPy.

    - Per DTMC (P)              :       cerca λ=1
    - Per CTMC (Q)              :       cerca λ=0

    Args:
        P_or_Q (np.ndarray)     :       matrice di transizione (P) o generatore (Q)
        discrete (bool)         :       True se P (DTMC), False se Q (CTMC)

    Returns:
        np.ndarray              :       distribuzione stazionaria π normalizzata
    """
     
    w, v = np.linalg.eig(P_or_Q.T)
    target = 1 if discrete else 0
    idx = np.argmin(np.abs(w - target))
    pi = np.real(v[:, idx])
    pi /= pi.sum()
    return pi

In [11]:
def solve_via_eig_scipy(P_or_Q, discrete=True):
    """
    Calcola π come autovettore sinistro dominante di P o Q tramite ARPACK.
    
    - Per DTMC (P)              :           usa which='LR' (autovalore reale più grande, 1)
    - Per CTMC (Q)              :           usa which='SM' (autovalore reale più piccolo in magnitudine, 0)
    
    Args:
        P_or_Q (np.ndarray)     :           matrice di transizione (P) o generatore (Q)
        discrete (bool)         :           True per DTMC, False per CTMC

    Returns:
        np.ndarray              :           distribuzione stazionaria π normalizzata
    """
    M = sp.csr_matrix(P_or_Q.T)
    which = 'LR' if discrete else 'SM'

    vals, vecs = spla.eigs(M, k=1, which=which)
    pi = np.real(vecs[:, 0])
    pi /= pi.sum()
    return pi

### `Uso del Power-Method`

In [22]:
def solve_via_power_numpy(P_or_Q, discrete=True, tol=1e-15, max_iter=500000, test=False):
    """
    Calcola il vettore stazionario π usando il Power Method (dense).

    - Se `discrete=True` (DTMC)     :       π_{n+1} = π_n @ P
    - Se `discrete=False` (CTMC)    :       usa uniformizzazione
                                            → P = I + Q / B con B ≥ max q(i)

    Args:
        P_or_Q (np.ndarray)         :       matrice P (DCTM) o Q (CTMC)
        discrete (bool)             :       True per P, False per Q
        tol (float)                 :       tolleranza su ||π_{n+1} - π_n||₁    {Norma L1}
        max_iter (int)              :       numero massimo di iterazioni

    Returns:
        np.ndarray                  :       vettore stazionario π normalizzato
    """
    n = P_or_Q.shape[0]
    pi = np.ones(n) / n

    if discrete:
        P = P_or_Q
    else:
        B = (-np.diag(P_or_Q)).max()
        if B <= 0:
            raise ValueError("Uniformizzazione fallita: Q ha diagonale non negativa.")

        P = np.eye(n) + P_or_Q / B

    for i in range(max_iter):
        pi_new = pi @ P
        pi_new_sum = pi_new.sum()

        if pi_new_sum == 0 or not np.isfinite(pi_new_sum):
            raise RuntimeError("Power Method fallito: somma non valida in π_new.")

        pi_new /= pi_new_sum

        if np.linalg.norm(pi_new - pi, 1) < tol:
            return i if test else pi_new

        pi = pi_new

    raise RuntimeError("Power Method non converge entro max_iter.")

In [13]:
def solve_via_power_scipy(P_or_Q, discrete=True, tol=1.1e-15, max_iter=500000):
    """
    Calcola π col Power Method (sparse).

    - Se discrete=True (DTMC)       :           π_{n+1} = π_n @ P
    - Se discrete=False (CTMC)      :           usa uniformizzazione
                                                P = I + Q / B  con B ≥ max q(i)

    Args:
        P_or_Q (array o csr_matrix) :           matrice P (DTMC) o Q (CTMC)
        discrete (bool)             :           True per P, False per Q
        tol (float)                 :           tolleranza ||π_{n+1} - π_n||₁  {Norma L1}
        max_iter (int)              :           iterazioni massime

    Returns:
        np.ndarray                  :           vettore stazionario π
    """
    n = P_or_Q.shape[0]
    pi = np.ones(n) / n
    
    if not sp.issparse(P_or_Q):
        P_or_Q = sp.csr_matrix(P_or_Q)

    if not discrete:
        B = (-P_or_Q.diagonal()).max()
        if B <= 0:
            raise ValueError("Uniformizzazione fallita: Q ha diagonale non negativa.")
        
        I = sp.identity(n, format="csr")
        P = I + P_or_Q / B

    else:
        P = P_or_Q

    for i in range(max_iter):
        pi_new = pi @ P
        pi_new = np.asarray(pi_new).ravel()
        pi_new_sum = pi_new.sum()

        if pi_new_sum <= 0 or not np.isfinite(pi_new_sum):
            raise RuntimeError("Power Method (sparse) fallito: somma non valida.")
        pi_new /= pi_new_sum

        if np.linalg.norm(pi_new - pi, 1) < tol:
            return pi_new
        
        pi = pi_new

    raise RuntimeError("Power Method (sparse) non converge entro max_iter.")

### `Bound per le k-iterazioni`

Stima precisa del bound superiore sul numero di iterazioni $k$ affinché
$$
\|\pi^{(k)} - \pi\|_1 < \text{tol}
$$
usando il bound spettrale completo derivato dalla decomposizione spettrale
di $P^\top$ (o della matrice uniformizzata, nel caso continuo).

Il bound teorico utilizzato è:
$$
\|\pi^{(k)} - \pi\|_1 \le \sqrt{n} \cdot \left(\sum_{i=2}^{n} \left|\frac{\alpha_i}{\alpha_1}\right|^2 \cdot \left|\frac{\lambda_i}{\lambda_1}\right|^{2k}\right)^{1/2}
$$
dove:
- $\lambda_i$ sono gli autovalori (in ordine di modulo decrescente),
- $\alpha_i$ sono i coefficienti nella decomposizione $\pi^{(0)} = \sum \alpha_i v_i$ degli autovettori destri di $P^\top$,
- $\pi^{(0)}$ è il vettore iniziale uniforme,
- $n$ è la dimensione della matrice.

Da cui segue che, per ottenere un errore inferiore a una data tolleranza, si cerca il minimo intero $k$ tale che:
$[
k \ge \min \left\{ k \in \mathbb{N} \;\middle|\; \sqrt{n} \cdot \left( \sum_{i=2}^{n} \left| \frac{\alpha_i}{\alpha_1} \right|^2 \cdot \left| \frac{\lambda_i}{\lambda_1} \right|^{2k} \right)^{1/2} < \text{tol} \right\}
]$

Questa stima non può essere risolta in forma chiusa, per questo motivo, nel nostro codice cerchiamo iterativamente <br>
il minimo intero $k$ tale che il bound sia minore della tolleranza desiderata.

In [14]:
def stima_k_necessario_precisa(P_or_Q, discrete=True, tol=1e-15):
    """
    Stima precisa del bound superiore di iterazioni k necessarie affinché
    ||pi^(k) - pi||_1 < tol, usando il bound spettrale completo con tutti gli autovalori.
    Supporta sia P (DTMC) che Q (CTMC via uniformizzazione).

    Args:
        P_or_Q (np.ndarray)                 :           matrice P (stocastica per riga) o Q (generatore)
        discrete (bool)                     :           True se P (DTMC), False se Q (CTMC)
        tol (float)                         :           tolleranza desiderata (default 1e-15)

    Returns:
        dict: con chiavi:
            - 'k_upper' (int)               :           stima superiore sul numero di iterazioni
            - 'lambda_star' (float)         :           |λ⋆| (modulo massimo degli autovalori ≠ λ1)
            - 'C_precisa' (float)           :           costante teorica effettiva del bound
            - 'diagonalizzabile' (bool)     :           True se P^T è diagonalizzabile
    """
    n = P_or_Q.shape[0]

    if not discrete:
        B = (-np.diag(P_or_Q)).max()
        if B <= 0:
            raise ValueError("Uniformizzazione fallita: Q ha diagonale non negativa.")
        P = np.eye(n) + P_or_Q / B
        if not np.allclose(P.sum(axis=1), 1, atol=1e-12):
            raise ValueError("La matrice uniformizzata non è stocastica.")
    else:
        P = P_or_Q

    A = P.T
    w, V = np.linalg.eig(A)
    rank = np.linalg.matrix_rank(V)
    idx = np.argsort(-np.abs(w))
    w = w[idx]
    V = V[:, idx]
    V = V / np.linalg.norm(V, axis=0)

    print(V)

    
    diagonalizzabile = (rank == n)
    if not diagonalizzabile:
        return {
            'k_upper': None,
            'lambda_star': None,
            'C_precisa': None,
            'diagonalizzabile': False
        }

    pi0 = np.ones(n) / n
    Vinv = np.linalg.inv(V)
    alpha = Vinv @ pi0
    lambda1 = w[0]
    lambda_others = w[1:]
    alpha_others = alpha[1:]

    def errore_spettrale_teorico(k):
        somma = np.sum(np.abs(alpha_others / alpha[0])**2 * np.abs(lambda_others / lambda1)**(2 * k))
        return np.sqrt(n) * np.sqrt(somma)

    for k in range(1, 100000):
        if errore_spettrale_teorico(k) < tol:
            return {
                'k_upper': k,
                'lambda_star': np.max(np.abs(lambda_others)),
                'diagonalizzabile': True
            }

    return {
        'k_upper': None,
        'lambda_star': np.max(np.abs(lambda_others)),
        'diagonalizzabile': True
    }

In [70]:
P = np.array([
    [0.25,  0.625, 0.125],
    [0.125, 0.25, 0.625 ],
    [0.125, 0.125, 0.75 ]
])
print(stima_k_necessario_precisa(P))
print(solve_via_power_numpy(P, test=True))

w, v = np.linalg.eig(P.T)
print(np.linalg.matrix_rank(v))  
np.linalg.cond(v)
w, v = np.linalg.eig(P)
print(np.linalg.matrix_rank(v))  
np.linalg.cond(v)

[[ 2.08145362e-01  4.42617186e-09  4.42617145e-09]
 [ 3.27085568e-01  7.07106779e-01 -7.07106783e-01]
 [ 9.21786602e-01 -7.07106783e-01  7.07106779e-01]]
{'k_upper': 26, 'lambda_star': 0.12500000312977616, 'diagonalizzabile': True}
19
3
2


1.216922753992663e+17

### `Funzioni per costo computazionale`

In [14]:
def risoluzione():
    """Calcola la risoluzione del timer con time.monotonic."""
    start = time.monotonic()
    while time.monotonic() == start:
        pass
    return time.monotonic() - start

In [15]:
METODI = {
    "gth"                   :                   lambda P, **kw: gth_solve(P),
    "system_numpy"          :                   lambda P, **kw: sistema_numpy(P, **kw),
    "system_scipy"          :                   lambda P, **kw: sistema_scipy(P, **kw),
    "eig_numpy"             :                   lambda P, **kw: solve_via_eig_numpy(P, **kw),
    "eig_scipy"             :                   lambda P, **kw: solve_via_eig_scipy(P, **kw),
    "power_numpy"           :                   lambda P, **kw: solve_via_power_numpy(P, **kw),
    "power_scipy"           :                   lambda P, **kw: solve_via_power_scipy(P, **kw),
}

In [16]:
def tempo_per_n(P, metodo, resolution, **kwargs):
    if metodo not in METODI:
        raise ValueError(f"Metodo '{metodo}' non riconosciuto")
    tmin = resolution
    count = 0
    start = time.monotonic()
    while time.monotonic() - start < tmin:
        METODI[metodo](P.copy(), **kwargs)
        count += 1
    return (time.monotonic() - start) / count

In [17]:
def errore_su_P_o_Q(P_or_Q, metodo, discrete=True, **kwargs):
    """
    Calcola l'errore della distribuzione stazionaria:
    - ||πP - π||₁ per DTMC
    - ||πQ - 0||₁ per CTMC

    Args:
        P_or_Q (np.ndarray)             :           matrice P o Q
        metodo (str)                    :           nome del metodo
        discrete (bool)                 :           True per DTMC, False per CTMC

    Returns:
        float                           :           errore numerico
    """
    if metodo not in METODI:
        raise ValueError(f"Metodo '{metodo}' non riconosciuto")
    
    pi = METODI[metodo](P_or_Q.copy(), discrete=discrete, **kwargs)
    pi = np.ravel(pi)
    
    if discrete:
        return np.linalg.norm(pi @ P_or_Q - pi, 1)
    else:
        return np.linalg.norm(pi @ P_or_Q, 1)

In [18]:
def calcola_costo_e_errore(funzione, C=100, n_punti=20, n_matrici = 10, sparsity=0.98, k=2, tipo_gen_P_orQ=genera_P_irriducibile, discrete=True):
    """

    Confronta tempi ed errori su matrici connesse, sia P (DTMC) che Q (CTMC).
    
    Se discrete=True                        →           genera e testa P
    Se discrete=False                       →           genera e testa Q 

    Args:
        funzione (str)                      :           nome del metodo da usare (es. 'solve_via_power_numpy')
        C (int)                             :           parametro crescita geometrica
        n_punti (int)                       :           numero di dimensioni da testare
        sparsity (float)                    :           percentuale di zeri (per genera_P_irriducibile o genera_Q_irriducibile)
        k (int)                             :           numero di archi per riga (per genera_P_k_random o genera_Q_k_random)
        tipo_gen_P_orQ (callable)           :           generatore di P o Q
        discrete (bool)                     :           True per DTMC (P), False per CTMC (Q)

    Returns:
        list of (n, tempo medio, errore)

    """
    
    A = 10
    B = C ** (1 / (n_punti - 1))
    risultati = []
    resolution = risoluzione() * 10000

    for i in range(n_punti):
        n = int(A * (B ** i))
        tempi = []
        errori = []

        for j in range(n_matrici):
            # Genera P o Q in base al tipo
            if tipo_gen_P_orQ.__name__.endswith("_irriducibile"):
                max_zeros = n * (n - 1)
                zeros = int(max_zeros * sparsity)
                P_or_Q = tipo_gen_P_orQ(n, zeros=zeros, seed = j)
            else:
                P_or_Q = tipo_gen_P_orQ(n, k, seed = j)

            tempi.append(tempo_per_n(P_or_Q, funzione, resolution, discrete=discrete))
            errori.append(errore_su_P_o_Q(P_or_Q, funzione, discrete=discrete))

        t_median = np.median(tempi)
        e_median = np.median(errori)

        risultati.append((n, t_median, e_median))

    return risultati

## `Quantecon`

### Quantecon usa `gth_solve()` che implementa l'algoritmo GTH.

`gth_solve()` accetta matrici `P` o `Q`, cioè che siano di tipo **Metzler**,  
ovvero con **elementi off-diagonali non negativi**.

Effettua una **copia** della matrice di input in `A`,  
che viene modificata **in-place** per calcolare una **fattorizzazione numericamente stabile**,  
**senza uso di pivot**.

Ad ogni passo `k`:
- calcola la **scala** come somma degli elementi a destra del pivot nella riga `k`
- **normalizza la colonna `k` sotto la diagonale** dividendo per questa scala
- **manipola la sottomatrice in basso a destra** aggiornando ogni elemento secondo:
$A[i,j] += A[i,k] * A[k,j]$   per $i > k, j > k$ <br>

---

Per calcolare il vettore stazionario $\pi$,  
la backward substitution parte da `π[n-1] = 1` e risale:
$π[k] = ∑ A[i,k] * π[i]$   per $i = k+1 … n-1$ <br>

Ovvero, per ogni `π[k]`, si usano **gli elementi nella colonna `k` sotto la diagonale**,  
e per ciascuno di essi (posizione `A[i, k]`), si **moltiplica per il valore `π[i]` già calcolato**,  
dove `i` è proprio l’indice di riga di quell’elemento.

Si sommano tutti questi prodotti per ottenere `π[k]`.

Alla fine, `π` viene normalizzato: `π /= π.sum()`.

### Confronto su matrice stocastica P con gth_solve usata in `Quantecon` e la nostra implementazione

In [19]:
P = genera_P_k_random(5, 2)

pi_builtin = gth_solve(P)
pi_manual = my_gth_solve(P)

print("π (quantecon):", pi_builtin, "\n")
print("π (nostra GTH):", pi_manual, "\n")
print("Verifica π • P (nostra GTH):", pi_manual @ P)

π (quantecon): [0.05001137 0.28940231 0.14810138 0.33704751 0.17543743] 

π (nostra GTH): [0.05001137 0.28940231 0.14810138 0.33704751 0.17543743] 

Verifica π • P (nostra GTH): [0.05001137 0.28940231 0.14810138 0.33704751 0.17543743]


### Confronto su matrice generatore Q

In [20]:
Q = genera_Q_irriducibile(5, 4)

pi_builtin = gth_solve(Q)
pi_q = my_gth_solve(Q)

print("π (quantecon):", pi_builtin, "\n")
print("π per Q (nostra GTH):", pi_q, "\n")
print("Verifica π @ Q:", pi_q @ Q)  # Deve essere vicino a 0

π (quantecon): [0.52727537 0.03153829 0.14031068 0.20518315 0.09569252] 

π per Q (nostra GTH): [0.52727537 0.03153829 0.14031068 0.20518315 0.09569252] 

Verifica π @ Q: [-2.57889294e-17 -2.31599712e-18  5.08435308e-17  9.45116248e-18
  2.72221568e-17]


## `PyDTMC`

### PyDTMC: calcolo di π e test di reversibilità

* **Calcolo di π**  
  PyDTMC impiega lo stesso algoritmo **GTH** di *quantecon*, ma lo applica soltanto a
  matrici di transizione **P** (catene DTMC), dopo aver verificato che la somma di
  ogni riga sia pari a 1.

* **Reversibilità**  
  Una volta ottenuta la distribuzione stazionaria $\pi$,
  PyDTMC costruisce la **matrice dei flussi**

  $
    F_{ij}= \pi_i\,P_{ij}.
  $

  Se $F$ risulta (entro la tolleranza numerica) **simmetrica**
  $(F \approx F^{\mathsf{T}})$, la catena soddisfa il bilancio dettagliato
  e viene etichettata come **reversible = YES**.

In [21]:
mc = MarkovChain(P)
print(mc)
print(mc.steady_states)


DISCRETE-TIME MARKOV CHAIN
 SIZE:           5
 RANK:           5
 CLASSES:        1
  > RECURRENT:   1
  > TRANSIENT:   0
 ERGODIC:        YES
  > APERIODIC:   YES
  > IRREDUCIBLE: YES
 ABSORBING:      NO
 MONOTONE:       NO
 REGULAR:        YES
 REVERSIBLE:     NO
 SYMMETRIC:      NO

[array([0.05001137, 0.28940231, 0.14810138, 0.33704751, 0.17543743])]


## `Calcolo diretto di` $\pi$ `tramite sistemi lineari e autovalori`

### Metodo dei sistemi lineari
- **DTMC**: risolve (I−Pᵀ)x=0  
- **CTMC**: risolve Qᵀx=0  


### Metodo eigen-solver
- π è l'autovettore **sinistro** associato a:
  - λ=1 per P (DTMC)  
  - λ=0 per Q (CTMC)

In [22]:
pi1 = sistema_numpy(P)
pi2 = sistema_scipy(P)
pi3 = solve_via_eig_numpy(P)
pi4 = solve_via_eig_scipy(P)
print(pi1,pi2,pi3,pi4,sep="\n\n")

[0.05001137 0.28940231 0.14810138 0.33704751 0.17543743]

[0.05001137 0.28940231 0.14810138 0.33704751 0.17543743]

[0.05001137 0.28940231 0.14810138 0.33704751 0.17543743]

[0.05001137 0.28940231 0.14810138 0.33704751 0.17543743]


## `Metodo Power, ad iterazioni`

Algoritmo iterativo semplice ed efficace per trovare la distribuzione stazionaria.

### Passaggi

1. **Inizializzazione**  
   $[
   \pi^{(0)} = \left[ \tfrac{1}{n}, \dots, \tfrac{1}{n} \right]
   $]

2. **Iterazione**  
   $[
   \pi^{(t+1)} =
   \begin{cases}
   \pi^{(t)} P, & \text{(DTMC)} \\
   \pi^{(t)} \left(I + \frac{Q}{B}\right), & \text{(CTMC)}
   \end{cases}
   \quad
   \text{poi normalizzazione: } \pi^{(t+1)} \leftarrow \frac{\pi^{(t+1)}}{\sum_i \pi^{(t+1)}_i}
   $]

3. **Convergenza**  
   $[
   \| \pi^{(t+1)} - \pi^{(t)} \|_1 < \text{tol}
   \quad (\text{tol} = 10^{-15})
   $]

In [23]:
pi1 = solve_via_power_numpy(P)
pi2 = solve_via_power_scipy(P)
print(pi1, pi2, sep="\n\n")

61

[0.05001137 0.28940231 0.14810138 0.33704751 0.17543743]


## `Andamento precisione e costo computazionale`

In [49]:
# === Lista delle funzioni da confrontare ===
metodi = [
    "gth",          
    "system_numpy",
    "system_scipy",
    "eig_numpy",
    "eig_scipy",
    "power_numpy",
    "power_scipy",
]

# === Calcolo dati per ogni metodo ===
risultati = {}

for metodo in sorted(metodi):
    dati = calcola_costo_e_errore(
        funzione=metodo,
        C=100,
        n_punti=30,
        n_matrici=10,
        sparsity=0,
        k=9,
        tipo_gen_P_orQ=genera_Q_k_random,
        discrete=False
    )
    risultati[metodo] = {
        "x": [d[0] for d in dati],
        "tempo": [d[1] for d in dati],
        "errore": [d[2] for d in dati]
    }

ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 10 is different from 1)

In [None]:
# === Grafico 1: Tempo di esecuzione ===
plt.figure(figsize=(18,10))
for metodo in metodi:
    plt.plot(risultati[metodo]["x"], risultati[metodo]["tempo"], 'o-', label=metodo, markersize=3)
plt.xscale('log')
plt.yscale('log')
plt.grid(True, which="both", ls="--")
plt.xlabel(f"Dimensione matrice $n$", fontsize=12)
plt.ylabel('Tempo medio (secondi)', fontsize=12)
plt.title('Confronto dei tempi di calcolo di $\pi$ su matrice Q sparsa con 9 archi per nodo', fontsize=14)
plt.legend(fontsize=9)
plt.savefig("Q_SPARSA.svg", dpi = 300, format="svg", bbox_inches = "tight")
plt.show()

# === Grafico 2: Errore ||πP - π||₁ ===
plt.figure(figsize=(18,10))
for metodo in metodi:
    plt.plot(risultati[metodo]["x"], risultati[metodo]["errore"], 'o-', label=metodo, markersize=3)
plt.xscale('log')
plt.yscale('log')
plt.grid(True, which="both", ls="--")
plt.xlabel('Dimensione matrice $n$', fontsize=12)
plt.ylabel(r'Errore', fontsize=12)
plt.title('Confronto degli errori di calcolo di $\pi$ su matrice Q sparsa con 9 archi per nodo', fontsize=14)
plt.legend(fontsize=9)
plt.savefig("Q_SPARSA_ERRORE.svg", dpi = 300, format="svg", bbox_inches = "tight")
plt.show()