# Pipeline Unificado SWV → FOPDT → PD (com modo Progress)

Este notebook reúne e aprofunda todo o pipeline de análise SWV do projeto, incluindo:

1) Sanitização dos dumps SWV/ITM em CSV canônico (`filter_swv_export.py`)
2) Cálculo de séries derivadas e métricas temporais (`analyze_step_responses.py`)
3) Identificação FOPDT (K, L, τ) por análise de degrau
4) Síntese de ganhos PD (Ziegler–Nichols, curva de reação) e conversão para firmware
5) Emissão de summary + CSVs derivados
6) Consolidação em `tuning_profiles.py`
7) Demonstração de simulação no modo 'progress' (compat) como em `interactive_old_sim.py`

Inclui fórmulas, heurísticas, checklist de diagnóstico e uma seção de **FOPDT em detalhes**. Ao final, há uma seção de **Referências** com livros e artigos clássicos.

## 1. Sanitização (SWV → *_filtered.csv)

Objetivo: transformar dumps SWV possivelmente ruidosos em CSVs consistentes com linhas do tipo:

```
axis,id,time_ms,encoder,pulses
```

Regras de consistência implementadas em `filter_swv_export.py`:
- Aceita apenas linhas que casam uma regex numérica inteira com 5 campos.
- `axis` é fixado pela primeira linha válida; linhas com eixo divergente são descartadas.
- `id` deve ser estritamente crescente; `time_ms` e `pulses` não podem regredir (≥).
- `encoder` é convertido para módulo (sempre "positivo").
- Arquivo que não mantém nenhuma linha é descartado (remove-se a saída vazia).

Racional: regressões em `time_ms`/`pulses` invalidam derivadas; eixo misto polui constantes físicas; `id` estrito facilita detectar perdas/duplicações.

## 2. Séries Derivadas (velocidades)

Com janelas de tamanho `N` (stride):
- $t_k = time\_ms(k)/1000$; $\Delta t_k = t_k - t_{k-N+1}$.
- $v_{cmd}(k) = [pulses(k)-pulses(k-N+1)]/\Delta t_k$.
- $v_{enc}(k) = [encoder(k)-encoder(k-N+1)]/\Delta t_k$.

Trade-off do stride ("N"):
- N pequeno: resposta rápida, porém ruído maior.
- N grande: suave, porém pode atrasar/achatar o degrau.

Dica: ajuste `--stride` conforme ruído/Δt mínimo; valide com plots.

## 3. Identificação FOPDT (K, L, τ)

Modelo: $G_v(s) = \dfrac{K e^{-Ls}}{\tau s + 1}$.

- Ganho estático: $K = v_{enc,ss}/v_{cmd,ss}$ (médias na janela final).
- Tempo morto: $L = t_{enc}^{5\%} - t_{cmd}^{5\%}$.
- Constante de tempo: $\tau = t_{63} - L$, com $t_{63}$ tal que $v_{enc}(t_{63}) \ge 0,632\,v_{enc,ss}$.\n
Heurísticas de robustez:
- Se $L \le 0$ ou indefinido, use piso $\min(\Delta t)$.\n- Se $\tau \le 0$ ou indefinido, idem.\n- Descarte valores NaN/inf nas janelas.

Classificação de forma do degrau (opcional): *step-like* se subida 10–90% ≤ 15% da duração; senão *ramp-like*.

## 4. Síntese PD (Ziegler–Nichols, Curva de Reação)

$K_p = 1{,}2 \cdot \dfrac{\tau}{K\,L}$,  $T_d = 0{,}5 L$,  $K_d = K_p T_d$,  $K_i = 0$.\n
- Sugestão segura: iniciar com $K_p/2$ e subir gradualmente.
- Conversão firmware (escala 256): `kp_i = round(Kp·256)`, etc.
- CPR do encoder: menor CPR → menor $K$ → maior $K_p$ (mantidos L, τ).

## 5. Summary e Artefatos
- `analysis_summary.txt`: consolida K, L, τ, ganhos PD e estatísticas.
- `analysis_data/*_derived.csv`: séries derivadas para auditoria.
- Use estes arquivos para verificar thresholds, Δt mínimo e forma do degrau.

## 6. Integração com `tuning_profiles.py`
- Tabela consolidada `(axis, microstep) → (Kp, Ki, Kd, L, τ, steady_*)`.
- Simuladores (`fopdt_simulator.py`, `interactive_sim.py`) consomem esses perfis e aplicam `K_SCALE=256`.
- Pode-se automatizar o carregamento do `analysis_summary.txt` como melhoria futura.

## FOPDT em detalhes
**Modelo**: $G(s) = K e^{-L s}/(τ s + 1)$. Resposta a degrau $u_0$: $y(t)=0$ (t<L); $y(t)=K\,u_0\,[1 - e^{-(t - L)/τ}]$ (t≥L).

- 63,2%: $t_{63} = L + τ$; 90%: $t_{90} ≈ L + 2{,}303·τ$; $t_s(±2\%) ≈ L + 4·τ$.
- Freq.: $|G(jω)| = K/√(1 + (ω·τ)^2)$, $φ = −arctan(ω·τ) − ω·L$. Atraso reduz margem de fase.
- Discreto: $n_d = round(L/T_s)$, $α = 1 − e^{−T_s/τ}$, $x[k] = x[k−1] + α·(K·u[k−n_d] − x[k−1])$.
- Identificação por degrau com thresholds (5% e 63,2%) + pisos por Δt mínimo.
- Tuning PD (Z–N): $K_p = 1{,}2·τ/(K·L)$, $T_d = 0{,}5·L$, $K_d = K_p·T_d$.
- Limites: processos com integrador/2ª ordem subamortecida podem exigir modelos alternativos (IDT, 2ª ordem + atraso).


## 7. Demo: Simulação no modo 'progress' (compat)
Executa a simulação headless com as mesmas flags do `interactive_old_sim.py` (estratégia de mestre por progresso), gravando CSV e imprimindo um resumo rápido.

Observação: esta célula usa as classes de `interactive_sim.py` e pode imprimir mensagens do backend do Matplotlib.

In [None]:
# Simulação headless no modo 'progress' (compat)
from pathlib import Path
import csv

from tuning_profiles import AnalysisCatalog
from interactive_sim import (
    PlantConfig, Scenario, parse_axis_map, gains_from_catalog, InteractiveSim,
)

# Eixos e microsteps (edite conforme necessidade)
axis_map = parse_axis_map('X:256,Y:256,Z:256')
kp_xyz, ki_xyz, kd_xyz = gains_from_catalog(axis_map)

# Configuração de planta e cenário compatíveis com interactive_old_sim.py
cfg = PlantConfig(
    microstep_factor=axis_map[0][1],
    enc_cpr_xyz=(40000, 2500, 40000),
    kd_alpha_bits=8,
    step_high_ticks=1,
    step_low_ticks=1,
)
scn = Scenario(
    s_xyz=(20000, 16000, 12000),
    v_xyz=(10000, 8000, 6000),
    dir_xyz=(1, 1, 1),
    kp_xyz=kp_xyz, ki_xyz=ki_xyz, kd_xyz=kd_xyz,
    sim_time_s=5.0,
    use_dda=True,
)

sim = InteractiveSim(
    cfg, scn, log_dir=Path('sim_logs'), enable_logging=True, auto_analyze=True, headless=True
)

# Flags de compatibilidade (modo 'progress')
sim.master_select_strategy = 'progress'
sim.prefer_loaded_master = False
sim.master_switch_margin_steps = 0.0
sim.sync_err_feed_threshold = 200.0
sim.sync_err_feed_min_fraction = 0.25
sim.sync_hold_enabled = False
sim.finish_all_axes = False
sim.ramp_use_worst_remaining = False
sim.global_stop_all_axes = True
sim.finish_window_steps = 0.0
sim.finish_disable_stall = False
sim.finish_extra_budget_steps = 0

# Execução headless (como no interactive_old_sim)
sim._start_log_session()
sim._log_state(
    t=sim.t,
    pos_steps=sim.pos_real.copy(),
    pos_enc=sim._encoder_rel_dda(),
    vel_sps=sim.v_real.copy(),
    casc_err=sim.g_casc_err_s32.copy(),
    load_c=sim.active_C_load.copy(),
    load_timer=sim.load_timer_xyz.copy(),
    span_steps=0.0,
    global_stop=False,
)
while sim.k < sim.N_steps_total:
    sim._step()
sim._stop_log_session()

# Resumo do último log
if getattr(sim, 'last_log_path', None) and sim.last_log_path.exists():
    with sim.last_log_path.open() as f:
        r = csv.DictReader(f)
        rows = list(r)
    if rows:
        span_final = float(rows[-1]['span_steps'])
        stop_frac = sum(int(x['global_stop']) for x in rows)/len(rows)
        print(f'Log: {sim.last_log_path.name} | amostras={len(rows)} | span_final={span_final:.0f} | stop_frac={stop_frac:.1%}')
else:
    print('Sem log gerado.')


## 8. Utilitários demonstrativos (K, L, τ e Z–N)
Pequenas funções para ilustrar as mesmas fórmulas usadas nos scripts.

In [None]:
from pathlib import Path
import csv, math
from typing import List, Dict, Optional, Tuple

def load_filtered_rows(csv_path: Path) -> List[Dict[str, int]]:
    rows: List[Dict[str, int]] = []
    with csv_path.open('r', encoding='utf-8', newline='') as fh:
        reader = csv.reader(fh)
        for cols in reader:
            if len(cols) < 5: continue
            try:
                rows.append({'axis': int(cols[0]), 'seq': int(cols[1]), 'time_ms': int(cols[2]), 'encoder': int(cols[3]), 'pulses': int(cols[4])})
            except:
                pass
    return rows

def compute_raw_series(rows: List[Dict[str, int]]):
    T: List[float] = []
    Vc: List[float] = []
    Ve: List[float] = []
    dt_values: List[float] = []
    prev = None
    for r in rows:
        if prev is None: prev = r; continue
        dt_ms = r['time_ms'] - prev['time_ms']
        if dt_ms <= 0: prev = r; continue
        dt = dt_ms / 1000.0
        Vc.append((r['pulses'] - prev['pulses']) / dt)
        Ve.append((r['encoder'] - prev['encoder']) / dt)
        T.append(r['time_ms'] / 1000.0)
        dt_values.append(dt)
        prev = r
    return T, Vc, Ve, dt_values

def steady_value(values: List[float], frac: float = 0.2) -> Optional[float]:
    if not values: return None
    start = len(values) - max(1, int(len(values) * frac))
    window = [v for v in values[start:] if (v is not None and not math.isnan(v))]
    if not window: window = [v for v in values if (v is not None and not math.isnan(v))]
    if not window: return None
    return sum(window) / len(window)

def first_crossing(times: List[float], values: List[float], thr: float) -> Optional[float]:
    if thr is None: return None
    for t, v in zip(times, values):
        if v is None or math.isnan(v): continue
        if v >= thr: return t
    return None

def identify_fopdt(times: List[float], Vc: List[float], Ve: List[float], dt_vals: List[float]):
    vc_ss = steady_value(Vc); ve_ss = steady_value(Ve)
    K = (ve_ss / vc_ss) if (vc_ss and vc_ss > 0) else None
    tc = first_crossing(times, Vc, (vc_ss*0.05) if (vc_ss and vc_ss>0) else None)
    te = first_crossing(times, Ve, (ve_ss*0.05) if (ve_ss and ve_ss>0) else None)
    L = (te - tc) if (tc is not None and te is not None) else None
    if L is not None and L < 0: L = 0.0
    t63 = first_crossing(times, Ve, (ve_ss*0.632) if ve_ss else None)
    tau = (t63 - L) if (t63 is not None and L is not None) else None
    if tau is not None and tau < 0: tau = None
    mdt = min(dt_vals) if dt_vals else None
    if (L is None or L <= 0) and mdt: L = mdt
    if (tau is None or tau <= 0) and mdt: tau = mdt
    return K, L, tau, vc_ss, ve_ss

def zn_pd(K: Optional[float], L: Optional[float], tau: Optional[float]):
    if not (K and L and tau) or (K <= 0 or L <= 0 or tau <= 0):
        return None, 0.0, None
    Kp = 1.2 * (tau / (K * L))
    Td = 0.5 * L
    Kd = Kp * Td
    return Kp, 0.0, Kd

def firmware_gains(kp: float, ki: float, kd: float, k_scale: int = 256) -> Tuple[int, int, int]:
    return (int(round(kp * k_scale)), int(round(ki * k_scale)), int(round(kd * k_scale)))


## 9. Checklist de Diagnóstico
- Sanitização: linhas suficientes? `id` estrito? eixo único?
- Stride: Δt mínimo suficiente? ruído aceitável?
- Regime: `v_cmd,ss` e `v_enc,ss` bem definidos? saturações?
- Crossings: thresholds atingidos (5% e 63,2%)?
- Ganhos: `Kp_safe` estável no simulador? overshoot dentro do alvo?
- Firmware: inteiros coerentes com `K_SCALE=256`?

## 10. Referências
- Ziegler, J. G.; Nichols, N. B. (1942). Optimum Settings for Automatic Controllers. Transactions of the ASME, 64, 759–768.
- Cohen, G. H.; Coon, G. A. (1953). Theoretical Considerations of Time-Delayed Control. Ind. Eng. Chem., 45(12), 243–251.
- Åström, K. J.; Hägglund, T. (1995). PID Controllers: Theory, Design, and Tuning (2nd ed.). ISA.
- Åström, K. J.; Murray, R. M. (2008). Feedback Systems: An Introduction for Scientists and Engineers. Princeton Univ. Press.
- Seborg, D. E.; Edgar, T. F.; Mellichamp, D. A.; Doyle, F. (2011). Process Dynamics and Control (3rd ed.). Wiley.
- Skogestad, S. (2003). Simple analytic rules for model reduction and PID controller tuning. Journal of Process Control, 13(4), 291–309.
- Rivera, D. E.; Morari, M.; Skogestad, S. (1986). Internal Model Control. 4. PID controller design. Ind. Eng. Chem. Process Des. Dev., 25(1), 252–265.
- Smith, O. J. M. (1957). Close control of loops with dead time. Chemical Engineering Progress, 53(5), 217–219.
- Franklin, G. F.; Powell, J. D.; Emami-Naeini, A. (2015). Feedback Control of Dynamic Systems (7th ed.). Pearson.
- Ogata, K. (2010). Modern Control Engineering (5th ed.). Prentice Hall.