# qmrpy Starter Notebook: EPGで信号生成 → 全解析モデルを一通り実行

このノートブックは **qmrpyのEPG（epgpy backend）で信号を合成**し、qmrpyに実装されている主要な解析モデルを **網羅的に最小例で実行**するスタータです。

対象（現状のqmrpy実装）:
- T1: `VfaT1`, `InversionRecovery`
- T2: `MonoT2`, `DecaesT2Map`, `DecaesT2Part`, `MultiComponentT2 (MWF)`
- B1: `B1Dam`
- Noise/Denoise: `MPPCA`

実行例: `uv run --locked jupyter lab`


In [None]:
# Ensure we import the local qmrpy (repo ./src) even if the notebook kernel is not started via `uv run`.
from pathlib import Path
import sys

def _find_repo_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / 'pyproject.toml').exists():
            return p
    return start

_repo_root = _find_repo_root(Path.cwd())
_src = _repo_root / 'src'
if _src.exists():
    sys.path.insert(0, str(_src))

import numpy as np
import pandas as pd

from plotnine import (
    ggplot, aes, geom_line, geom_point, geom_col, geom_histogram, geom_abline,
    facet_wrap, theme_bw, labs
)

from qmrpy.sim import add_rician_noise

from qmrpy.models.t1 import VfaT1, InversionRecovery
from qmrpy.models.t2 import MonoT2, DecaesT2Map, DecaesT2Part, MultiComponentT2
from qmrpy.models.t2.decaes_t2 import _basis_matrix  # internal helper for EPG forward synthesis
from qmrpy.models.b1 import B1Dam
from qmrpy.models.noise import MPPCA

rng = np.random.default_rng(0)
np.set_printoptions(precision=6, suppress=True)


## 1) EPG（epgpy backend）で Multi-component T2 のSEエコートレイン信号を合成
DECAES系のforwardは `basis matrix A(TE, T2, alpha)` を作って `signal = A @ distribution` として合成できます。


In [None]:
# --- protocol ---
n_te = 16
te_s = 10e-3
n_t2 = 40
t2_range_s = (10e-3, 2.0)
t1_s = 1.0
alpha_deg_true = 160.0

t2_times_s = np.logspace(np.log10(t2_range_s[0]), np.log10(t2_range_s[1]), n_t2)
echotimes_s = te_s * np.arange(1, n_te + 1)

A = _basis_matrix(
    n_te=n_te,
    te_s=te_s,
    t2_times_s=t2_times_s,
    t1_s=t1_s,
    alpha_deg=alpha_deg_true,
    refcon_angle_deg=180.0,
    epg_backend='epgpy',
)

# --- make a synthetic T2 distribution (3 peaks in log-space) ---
def log_gauss(t2, *, mu_s, sigma_log, amp):
    z = (np.log(t2) - np.log(mu_s)) / sigma_log
    return amp * np.exp(-0.5 * z**2)

dist_true = (
    log_gauss(t2_times_s, mu_s=0.020, sigma_log=0.20, amp=1.0)
  + log_gauss(t2_times_s, mu_s=0.080, sigma_log=0.25, amp=2.5)
  + log_gauss(t2_times_s, mu_s=0.800, sigma_log=0.35, amp=0.6)
)
# scale to a plausible signal amplitude
dist_true = dist_true / dist_true.max() * 2000.0

signal_clean = A @ dist_true
signal = add_rician_noise(signal_clean, sigma=20.0, rng=rng)

print('signal_clean[:5]:', signal_clean[:5])
print('signal[:5]      :', signal[:5])


In [None]:
df_decay = pd.DataFrame({
    'echo': np.arange(1, n_te + 1),
    'te_ms': echotimes_s * 1e3,
    'clean': signal_clean,
    'obs': signal,
}).melt(id_vars=['echo','te_ms'], value_vars=['clean','obs'], var_name='kind', value_name='signal')
(ggplot(df_decay, aes('te_ms', 'signal', color='kind'))
 + geom_line() + geom_point(size=2)
 + theme_bw()
 + labs(title='EPG SE echo train (clean vs observed)', x='TE [ms]', y='signal'))


In [None]:
df_dist = pd.DataFrame({'t2_ms': t2_times_s*1e3, 'weight': dist_true})
(ggplot(df_dist, aes('t2_ms', 'weight'))
 + geom_line()
 + theme_bw()
 + labs(title='True T2 distribution (synthetic)', x='T2 [ms] (log grid)', y='weight'))


## 2) DecaesT2Map（EPG backend）で multi-component T2 を推定（reg=GCV）
`DecaesT2Map.fit(signal)` は **refocusing angle最適化 + NNLS + 正則化選択**（GCV/L-curve/chi2/MDP）をまとめて行います。


In [None]:
decaes = DecaesT2Map(
    n_te=n_te,
    te_s=te_s,
    n_t2=n_t2,
    t2_range_s=t2_range_s,
    t1_s=t1_s,
    reg='gcv',
    n_ref_angles=64,
    save_reg_param=True,
    save_decay_curve=True,
    epg_backend='epgpy',
)
out = decaes.fit(signal)

print('alpha_deg_hat:', out['alpha_deg'])
print('mu:', out.get('mu'))
print('ggm [ms]:', out['ggm']*1e3)
print('snr:', out['snr'])


In [None]:
df_fit = pd.DataFrame({
    't2_ms': out['t2times_s']*1e3,
    'true': dist_true,
    'hat': out['distribution'],
}).melt(id_vars=['t2_ms'], value_vars=['true','hat'], var_name='kind', value_name='weight')
(ggplot(df_fit, aes('t2_ms', 'weight', color='kind'))
 + geom_line()
 + theme_bw()
 + labs(title='DecaesT2Map: distribution (true vs fitted)', x='T2 [ms]', y='weight'))


In [None]:
df_curve = pd.DataFrame({
    'te_ms': out['echotimes_s']*1e3,
    'obs': signal,
    'fit': out['decaycurve'],
}).melt(id_vars=['te_ms'], value_vars=['obs','fit'], var_name='kind', value_name='signal')
(ggplot(df_curve, aes('te_ms', 'signal', color='kind'))
 + geom_line() + geom_point(size=2)
 + theme_bw()
 + labs(title='DecaesT2Map: decay curve (observed vs fitted)', x='TE [ms]', y='signal'))


## 3) DecaesT2Part: distributionから short/medium成分の指標（SFR/SGM/MFR/MGM）
qMRLabの `T2partSEcorr` 相当で、**distributionを入力**として指標を計算します。


In [None]:
t2part = DecaesT2Part(
    n_t2=n_t2,
    t2_range_s=t2_range_s,
    spwin_s=(10e-3, 40e-3),
    mpwin_s=(40e-3, 200e-3),
    sigmoid_s=5e-3,
)
parts_true = t2part.fit(dist_true)
parts_hat = t2part.fit(out['distribution'])
print('parts_true:', parts_true)
print('parts_hat :', parts_hat)


## 4) MultiComponentT2 (MWF): 指数基底（非EPG）のNNLSでT2スペクトル→MWF等
こちらはより簡略な指数基底モデル（`exp(-TE/T2)`）でNNLSを解き、MWFなどを算出します。


In [None]:
mwf = MultiComponentT2(te_ms=echotimes_s*1e3)
mwf_out = mwf.fit(signal, regularization_alpha=0.0, cutoff_ms=40.0, upper_cutoff_iew_ms=200.0)
print('mwf:', mwf_out['mwf'])
print('t2mw_ms:', mwf_out['t2mw_ms'])
print('t2iew_ms:', mwf_out['t2iew_ms'])


In [None]:
df_mwf = pd.DataFrame({'t2_ms': mwf_out['t2_basis_ms'], 'weight': mwf_out['weights']})
(ggplot(df_mwf, aes('t2_ms', 'weight'))
 + geom_line()
 + theme_bw()
 + labs(title='MultiComponentT2 (MWF): fitted spectrum (non-EPG basis)', x='T2 [ms]', y='weight'))


## 5) MonoT2: 単一指数フィット


In [None]:
mono = MonoT2(te=echotimes_s*1e3)  # ms
mono_true = mono.forward(m0=2000.0, t2=80.0)
mono_obs = add_rician_noise(mono_true, sigma=20.0, rng=rng)
mono_fit = mono.fit(mono_obs, fit_type='exponential')
print('mono_fit:', mono_fit)

df_mono = pd.DataFrame({
    'te_ms': echotimes_s*1e3,
    'obs': mono_obs,
    'fit': mono.forward(m0=mono_fit['m0']*np.max(np.abs(mono_obs)), t2=mono_fit['t2']),
}).melt(id_vars=['te_ms'], value_vars=['obs','fit'], var_name='kind', value_name='signal')
(ggplot(df_mono, aes('te_ms', 'signal', color='kind'))
 + geom_line() + geom_point(size=2)
 + theme_bw()
 + labs(title='MonoT2: observed vs fitted', x='TE [ms]', y='signal'))


## 6) VfaT1: VFA T1（線形化フィット）


In [None]:
vfa = VfaT1(flip_angle_deg=np.array([3.0, 8.0, 15.0, 25.0]), tr_s=0.015, b1=1.0)
vfa_true = vfa.forward(m0=2000.0, t1_s=1.0)
vfa_obs = add_rician_noise(vfa_true, sigma=20.0, rng=rng)
vfa_fit = vfa.fit_linear(vfa_obs)
print('vfa_fit:', vfa_fit)

df_vfa = pd.DataFrame({
    'fa_deg': vfa.flip_angle_deg,
    'clean': vfa_true,
    'obs': vfa_obs,
}).melt(id_vars=['fa_deg'], value_vars=['clean','obs'], var_name='kind', value_name='signal')
(ggplot(df_vfa, aes('fa_deg', 'signal', color='kind'))
 + geom_line() + geom_point(size=2)
 + theme_bw()
 + labs(title='VfaT1: clean vs observed', x='flip angle [deg]', y='signal'))


## 7) InversionRecovery: IR T1（magnitudeデータの極性復元付き）


In [None]:
ti_ms = np.array([50, 100, 200, 400, 800, 1200, 2000], dtype=float)
ir = InversionRecovery(ti_ms=ti_ms)
ir_clean = ir.forward(t1_ms=900.0, ra=500.0, rb=-1200.0, magnitude=True)
ir_obs = add_rician_noise(ir_clean, sigma=20.0, rng=rng)
ir_fit = ir.fit(ir_obs, method='magnitude')
print('ir_fit:', ir_fit)

df_ir = pd.DataFrame({
    'ti_ms': ti_ms,
    'obs': ir_obs,
    'fit': ir.forward(t1_ms=ir_fit['t1_ms'], ra=ir_fit['ra'], rb=ir_fit['rb'], magnitude=True),
}).melt(id_vars=['ti_ms'], value_vars=['obs','fit'], var_name='kind', value_name='signal')
(ggplot(df_ir, aes('ti_ms', 'signal', color='kind'))
 + geom_line() + geom_point(size=2)
 + theme_bw()
 + labs(title='InversionRecovery: observed vs fitted (magnitude)', x='TI [ms]', y='signal'))


## 8) B1Dam: Double-Angle MethodでB1推定


In [None]:
dam = B1Dam(alpha_deg=60.0)
b1_true = 0.92
dam_clean = dam.forward(m0=1000.0, b1=b1_true)
dam_obs = add_rician_noise(dam_clean, sigma=10.0, rng=rng)
dam_fit = dam.fit_raw(dam_obs)
print('b1_true:', b1_true)
print('dam_fit:', dam_fit)


## 9) MPPCA: 4Dデータのデノイジング
最小例として小さな4Dデータ（x,y,z,t）を合成してMP-PCAを適用します。


In [None]:
# small synthetic 4D dataset
sx, sy, sz, tdim = 16, 16, 1, 20
t = np.arange(tdim)
base = (np.sin(2*np.pi*t/tdim) + 1.2)[None, None, None, :]
img_clean = 1000.0 * base * np.ones((sx, sy, sz, tdim))
img_noisy = img_clean + rng.normal(0.0, 30.0, size=img_clean.shape)
mask = np.ones((sx, sy, sz), dtype=bool)

mppca = MPPCA(kernel=(5,5,1))
mpp = mppca.fit(img_noisy, mask=mask)
den = mpp['denoised']
sigma = mpp['sigma']

print('sigma mean:', float(np.mean(sigma)))

# visualize one voxel time-course
vx = (8, 8, 0)
df_mp = pd.DataFrame({
    't': t,
    'clean': img_clean[vx][...],
    'noisy': img_noisy[vx][...],
    'denoised': den[vx][...],
}).melt(id_vars=['t'], value_vars=['clean','noisy','denoised'], var_name='kind', value_name='value')
(ggplot(df_mp, aes('t', 'value', color='kind'))
 + geom_line() + geom_point(size=2)
 + theme_bw()
 + labs(title='MPPCA: voxel time-course', x='t', y='value'))


## 10) fit_image: 2D画像（ノイズあり）での動作確認
各モデルの `fit_image` は入力 `data` の最後の次元がプロトコル次元（TE/TI/FAなど）であることを前提に、mask内の各voxelを逐次 `fit`（または線形化fit）します。


In [None]:
# 2D phantom + noisy acquisition -> fit_image
sx, sy = 24, 24
yy, xx = np.mgrid[0:sy, 0:sx]
cx, cy = (sx - 1) / 2.0, (sy - 1) / 2.0
mask2d = (xx - cx)**2 + (yy - cy)**2 <= (min(sx, sy) * 0.35)**2

# --- VfaT1 fit_image ---
vfa_img = VfaT1(flip_angle_deg=np.array([3.0, 8.0, 15.0, 25.0]), tr_s=0.015, b1=1.0)
t1_true_s = 0.7 + 0.8 * (xx / (sx - 1))
m0_true = 1500.0
alpha = np.deg2rad(vfa_img.flip_angle_deg)[None, None, :]
E = np.exp(-vfa_img.tr_s / t1_true_s)[..., None]
vfa_clean = m0_true * np.sin(alpha) * (1.0 - E) / (1.0 - E * np.cos(alpha))
vfa_obs_img = add_rician_noise(vfa_clean, sigma=15.0, rng=rng)
vfa_map = vfa_img.fit_image(vfa_obs_img, mask=mask2d)

df_vfa_map = pd.DataFrame({
    't1_true_s': t1_true_s[mask2d].ravel(),
    't1_est_s': vfa_map['t1_s'][mask2d].ravel(),
})
(ggplot(df_vfa_map, aes('t1_true_s', 't1_est_s'))
 + geom_point(alpha=0.35, size=1.5)
 + theme_bw()
 + labs(title='VfaT1.fit_image (noisy): true vs estimated', x='T1 true [s]', y='T1 estimated [s]'))

# --- MonoT2 fit_image ---
te_ms = echotimes_s * 1e3
mono_img = MonoT2(te=te_ms)
t2_true_ms = 40.0 + 120.0 * (yy / (sy - 1))
mono_clean = (2000.0 * np.exp(-te_ms[None, None, :] / t2_true_ms[..., None]))
mono_obs_img = add_rician_noise(mono_clean, sigma=25.0, rng=rng)
mono_map = mono_img.fit_image(mono_obs_img, mask=mask2d, fit_type='exponential')

df_t2_map = pd.DataFrame({
    't2_true_ms': t2_true_ms[mask2d].ravel(),
    't2_est_ms': mono_map['t2'][mask2d].ravel(),
})
(ggplot(df_t2_map, aes('t2_true_ms', 't2_est_ms'))
 + geom_point(alpha=0.35, size=1.5)
 + theme_bw()
 + labs(title='MonoT2.fit_image (noisy): true vs estimated', x='T2 true [ms]', y='T2 estimated [ms]'))

# --- B1Dam fit_image ---
dam_img = B1Dam(alpha_deg=60.0)
b1_true = 0.8 + 0.4 * (xx / (sx - 1))
a1 = np.deg2rad(dam_img.alpha_deg) * b1_true
a2 = np.deg2rad(dam_img.alpha_deg * 2.0) * b1_true
dam_clean = np.stack([1000.0 * np.sin(a1), 1000.0 * np.sin(a2)], axis=-1)
dam_obs_img = add_rician_noise(dam_clean, sigma=10.0, rng=rng)
dam_map = dam_img.fit_image(dam_obs_img, mask=mask2d)

df_b1_map = pd.DataFrame({
    'b1_true': b1_true[mask2d].ravel(),
    'b1_est': dam_map['b1_raw'][mask2d].ravel(),
})
(ggplot(df_b1_map, aes('b1_true', 'b1_est'))
 + geom_point(alpha=0.35, size=1.5)
 + theme_bw()
 + labs(title='B1Dam.fit_image (noisy): true vs estimated', x='B1 true', y='B1 estimated'))


---

次のステップ（用途に応じて）:
- 各モデルの `fit_image`（画像処理）: 本ノートの「10) fit_image」で最小確認
- `qmrpy.sim`（SimVary/SimRnd/CRLB等）をこのノートに統合
- 解析パラメータ（ノイズレベル、正則化、flip angle探索範囲）の推奨設定を追記
