<a href="https://colab.research.google.com/github/kaiky-ferreira/PerceptronAspirador/blob/main/PerceptronAspirador.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from dataclasses import dataclass
from typing import Tuple, List, Optional, Dict
import unicodedata


# Utilidades e codificação


FLOORS = ["madeira", "cerâmica", "carpete"]

# Aliases mais robustos; suportamos variações e inglês
FLOOR_ALIASES: Dict[str, List[str]] = {
    "madeira":  ["madeira", "wood", "assoalho", "laminado"],
    "cerâmica": ["cerâmica", "ceramica", "tile", "ceramic", "porcelanato"],
    "carpete":  ["carpete", "carpet", "tapete"]
}

def _strip_accents(s: str) -> str:
    return "".join(
        c for c in unicodedata.normalize("NFD", s)
        if unicodedata.category(c) != "Mn"
    )

def canonical_floor_name(raw: str) -> str:
    """Normaliza e encontra o piso canônico ou lança ValueError."""
    s = _strip_accents(raw.strip().lower())
    for canon, variants in FLOOR_ALIASES.items():
        for v in variants:
            if _strip_accents(v) == s or s.startswith(_strip_accents(v)):
                return canon
    raise ValueError(f"Tipo de piso desconhecido: {raw}. Use um de {FLOORS} (aliases aceitos: {FLOOR_ALIASES})")

def one_hot_floor(floor: str) -> np.ndarray:
    f = canonical_floor_name(floor)
    vec = np.zeros(3, dtype=float)
    idx = ["madeira", "cerâmica", "carpete"].index(f)
    vec[idx] = 1.0
    return vec

def scale_inputs(dirt_0_10: float, dist_0_5m: float) -> Tuple[float, float]:
    """Normaliza sujeira e distância para [0,1] com limites/clamp."""
    dirt = max(0.0, min(10.0, float(dirt_0_10))) / 10.0
    dist = max(0.0, min(5.0,  float(dist_0_5m)))  / 5.0
    return dirt, dist

def encode_row(floor: str, dirt: float, dist: float) -> np.ndarray:
    fvec = one_hot_floor(floor)
    dirt_n, dist_n = scale_inputs(dirt, dist)
    return np.concatenate([fvec, [dirt_n, dist_n]])[None, :]  # (1,5)



# Ativação (sigmoide limitada) estável


def _sigmoid_stable(z: np.ndarray) -> np.ndarray:
    """Sigmoide numericamente estável."""
    z = np.asarray(z, dtype=float)
    out = np.empty_like(z)
    pos = z >= 0
    neg = ~pos
    out[pos] = 1.0 / (1.0 + np.exp(-z[pos]))
    ez = np.exp(z[neg])
    out[neg] = ez / (1.0 + ez)
    return out

@dataclass
class BoundedSigmoid:
    """Aceita vetores y_min/y_max para múltiplas saídas."""
    y_min: np.ndarray  # shape (O,)
    y_max: np.ndarray  # shape (O,)

    def forward(self, z: np.ndarray) -> np.ndarray:
        sig = _sigmoid_stable(z)
        return self.y_min + (self.y_max - self.y_min) * sig

    def grad(self, z: np.ndarray) -> np.ndarray:
        sig = _sigmoid_stable(z)
        return (self.y_max - self.y_min) * sig * (1.0 - sig)



# Perceptron Multi-Saída (2 saídas: velocidade e sucção)


class MultiOutputPerceptron:
    """
    Modelo linear com ativação sigmoide limitada e 2 saídas.
    W: (n_features+1, 2) incluindo bias.
    """
    def __init__(self, n_features: int, activation: BoundedSigmoid, seed: int = 42):
        self.n_features = n_features
        self.activation = activation
        rng = np.random.default_rng(seed)
        self.W = rng.uniform(-0.5, 0.5, size=(n_features + 1, 2))  # duas saídas

    def _add_bias(self, X: np.ndarray) -> np.ndarray:
        ones = np.ones((X.shape[0], 1), dtype=float)
        return np.hstack([X, ones])

    def predict(self, X: np.ndarray) -> np.ndarray:
        Xb = self._add_bias(X)              # (N, F+1)
        Z  = Xb @ self.W                    # (N, 2)
        Y  = self.activation.forward(Z)     # (N, 2)
        return Y

    @staticmethod
    def _mse(y_pred: np.ndarray, y_true: np.ndarray) -> float:
        return float(np.mean((y_pred - y_true) ** 2))

    @staticmethod
    def _mae(y_pred: np.ndarray, y_true: np.ndarray) -> float:
        return float(np.mean(np.abs(y_pred - y_true)))

    def fit(
        self,
        X: np.ndarray,
        y_true: np.ndarray,              # shape (N, 2)
        lr: float = 0.2,
        epochs: int = 1000,
        verbose: bool = False,
        X_val: Optional[np.ndarray] = None,
        y_val: Optional[np.ndarray] = None,
        patience: int = 50
    ):
        Xb = self._add_bias(X)            # (N, F+1)
        N  = Xb.shape[0]

        best_val = np.inf
        best_W = self.W.copy()
        no_improve = 0

        for ep in range(epochs):
            Z      = Xb @ self.W                          # (N,2)
            y_pred = self.activation.forward(Z)           # (N,2)
            dAct   = self.activation.grad(Z)              # (N,2)

            # Gradiente MSE médio por amostra e por saída
            dL_dZ  = (2.0 / N) * (y_pred - y_true) * dAct # (N,2)
            grad_W = Xb.T @ dL_dZ                         # (F+1,2)
            self.W -= lr * grad_W

            # Early stopping (se houver validação)
            if X_val is not None and y_val is not None:
                y_val_pred = self.predict(X_val)
                val_loss = self._mse(y_val_pred, y_val)
                if val_loss + 1e-10 < best_val:
                    best_val = val_loss
                    best_W = self.W.copy()
                    no_improve = 0
                else:
                    no_improve += 1

                if verbose and (ep % max(1, epochs // 10) == 0 or ep == epochs - 1):
                    tr_loss = self._mse(y_pred, y_true)
                    print(f"Epoch {ep+1:4d}/{epochs} | train MSE: {tr_loss:.5f} | val MSE: {val_loss:.5f}")

                if no_improve >= patience:
                    if verbose:
                        print(f"Early stopping at epoch {ep+1} (best val MSE: {best_val:.5f}).")
                    break
            else:
                if verbose and (ep % max(1, epochs // 10) == 0 or ep == epochs - 1):
                    tr_loss = self._mse(y_pred, y_true)
                    print(f"Epoch {ep+1:4d}/{epochs} | train MSE: {tr_loss:.5f}")

        # Restaura os melhores pesos (validação)
        if X_val is not None and y_val is not None:
            self.W = best_W

    # Utilitários de métricas públicas
    def evaluate(self, X: np.ndarray, y_true: np.ndarray) -> Tuple[float, float]:
        y_pred = self.predict(X)
        return self._mse(y_pred, y_true), self._mae(y_pred, y_true)



# Heurística-alvo e dataset


def target_speed_and_suction(floor_vec: np.ndarray, dirt_n: float, dist_n: float) -> Tuple[float, float]:
    """Retorna (speed[1,5], suction[1,3]) conforme heurísticas simples."""
    wood, ceramic, carpet = floor_vec
    suction_floor = 0.0 * wood + 0.3 * ceramic + 1.0 * carpet
    speed_floor_penalty = 0.0 * wood + 0.3 * ceramic + 0.8 * carpet
    suction = 1.0 + 0.9 * dirt_n + 0.8 * suction_floor
    suction = float(np.clip(suction, 1.0, 3.0))
    speed = 5.0 - 2.0 * dirt_n - 2.2 * (1.0 - dist_n) - 1.5 * speed_floor_penalty
    speed = float(np.clip(speed, 1.0, 5.0))
    return speed, suction

def make_dataset(n_samples: int = 1500, seed: int = 777):
    rng = np.random.default_rng(seed)
    X_list, y_list = [], []
    for _ in range(n_samples):
        floor = rng.choice(FLOORS)
        dirt  = rng.uniform(0, 10)
        dist  = rng.uniform(0, 5)
        fvec = one_hot_floor(floor)
        dirt_n, dist_n = scale_inputs(dirt, dist)
        x = np.concatenate([fvec, [dirt_n, dist_n]])
        speed, suction = target_speed_and_suction(fvec, dirt_n, dist_n)
        X_list.append(x)
        y_list.append([speed, suction])
    X = np.array(X_list, dtype=float)           # (N,5)
    Y = np.array(y_list, dtype=float)           # (N,2)
    return X, Y

def train_val_split(X: np.ndarray, Y: np.ndarray, val_ratio: float = 0.2, seed: int = 2024):
    rng = np.random.default_rng(seed)
    N = X.shape[0]
    idx = rng.permutation(N)
    n_val = int(round(val_ratio * N))
    val_idx = idx[:n_val]
    tr_idx  = idx[n_val:]
    return X[tr_idx], Y[tr_idx], X[val_idx], Y[val_idx]



# Treino, salvar, carregar, predição


WEIGHTS_PATH = "vacuum_multi.npz"

def train_model(verbose: bool = False):
    X, Y = make_dataset(n_samples=2000, seed=777)

    Xtr, Ytr, Xva, Yva = train_val_split(X, Y, val_ratio=0.2, seed=42)

    # Ativação com limites por saída: [velocidade 1–5, sucção 1–3]
    y_min = np.array([1.0, 1.0], dtype=float)
    y_max = np.array([5.0, 3.0], dtype=float)
    act   = BoundedSigmoid(y_min=y_min, y_max=y_max)

    model = MultiOutputPerceptron(n_features=X.shape[1], activation=act, seed=7)
    model.fit(
        Xtr, Ytr,
        lr=0.2, epochs=1500, verbose=verbose,
        X_val=Xva, y_val=Yva, patience=80
    )

    # Avaliação final
    tr_mse, tr_mae = model.evaluate(Xtr, Ytr)
    va_mse, va_mae = model.evaluate(Xva, Yva)
    if verbose:
        print(f"[FINAL] train MSE={tr_mse:.4f}, MAE={tr_mae:.4f} | val MSE={va_mse:.4f}, MAE={va_mae:.4f}")

    # Salvar pesos + metadados
    np.savez(
        WEIGHTS_PATH,
        W=model.W,
        y_min=y_min,
        y_max=y_max
    )
    return model, {"train": (tr_mse, tr_mae), "val": (va_mse, va_mae)}

def load_model(path: str = WEIGHTS_PATH) -> MultiOutputPerceptron:
    data = np.load(path, allow_pickle=False)
    W = data["W"]
    y_min = data["y_min"]
    y_max = data["y_max"]
    act = BoundedSigmoid(y_min=y_min, y_max=y_max)
    model = MultiOutputPerceptron(n_features=W.shape[0]-1, activation=act, seed=0)
    model.W = W
    return model

def predict_one(model: MultiOutputPerceptron, floor: str, dirt: float, dist: float) -> Tuple[float, float]:
    x = encode_row(floor, dirt, dist)
    y = model.predict(x)[0]  # [speed, suction]
    v = float(np.clip(y[0], 1.0, 5.0))
    p = float(np.clip(y[1], 1.0, 3.0))
    return round(v, 2), round(p, 2)



# Sanity checks (opcionais)


def sanity_checks(model: MultiOutputPerceptron, rng_seed: int = 1) -> None:
    rng = np.random.default_rng(rng_seed)

    # 1) Faixas
    X, _ = make_dataset(n_samples=500, seed=rng.integers(1, 10_000))
    Yp = model.predict(X)
    assert np.all((Yp[:,0] >= 1.0) & (Yp[:,0] <= 5.0)), "Velocidade fora de [1,5]"
    assert np.all((Yp[:,1] >= 1.0) & (Yp[:,1] <= 3.0)), "Sucção fora de [1,3]"

    # 2) Monotonicidade (aproximada) da sucção com sujeira fixa piso/dist
    floors = ["madeira", "cerâmica", "carpete"]
    for f in floors:
        xs = [encode_row(f, d, 5.0) for d in np.linspace(0, 10, 11)]
        xs = np.vstack(xs)
        ys = model.predict(xs)[:,1]  # sucção
        # tolerância: pequenas não-monotonicidades podem ocorrer; checagem frouxa
        assert (ys[-1] >= ys[0] - 1e-6), f"Sucção não cresceu de d=0 para d=10 em {f}"

    # 3) Velocidade aumenta com distância (mantendo sujeira alta)
    xs = [encode_row("madeira", 9.0, d) for d in np.linspace(0, 5, 6)]
    xs = np.vstack(xs)
    vs = model.predict(xs)[:,0]  # velocidade
    assert (vs[-1] >= vs[0] - 1e-6), "Velocidade não aumentou de dist=0 para dist=5"

    print("Sanity checks: OK")



# Execução de script


if __name__ == "__main__":
    model, metrics = train_model(verbose=True)

    print("\nMétricas (MSE, MAE):")
    print(f"   Treino: {metrics['train']}")
    print(f"Validação: {metrics['val']}")

    print("\nDemonstração:")
    scenarios = [
        ("madeira", 2, 5.0),
        ("cerâmica", 6, 2.5),
        ("carpete", 9, 0.5),
        ("carpete", 3, 5.0),
        ("madeira", 10, 0.0),
        ("porcelanato", 7, 4.0),  # alias de cerâmica
        ("wood", 5, 2.0),         # alias de madeira
        ("carpet", 8, 1.0),       # alias de carpete
    ]
    for s in scenarios:
        v, p = predict_one(model, *s)
        print(f"Entrada: piso={s[0]}, sujeira={s[1]}, dist={s[2]} m  ->  velocidade={v}  potência={p}")

    print("\nRodando sanity checks...")
    sanity_checks(model)

Epoch    1/1500 | train MSE: 0.44520 | val MSE: 0.35904
Epoch  151/1500 | train MSE: 0.01514 | val MSE: 0.01503
Epoch  301/1500 | train MSE: 0.00889 | val MSE: 0.00880
Epoch  451/1500 | train MSE: 0.00762 | val MSE: 0.00755
Epoch  601/1500 | train MSE: 0.00724 | val MSE: 0.00720
Epoch  751/1500 | train MSE: 0.00711 | val MSE: 0.00709
Epoch  901/1500 | train MSE: 0.00707 | val MSE: 0.00705
Epoch 1051/1500 | train MSE: 0.00705 | val MSE: 0.00704
Epoch 1201/1500 | train MSE: 0.00705 | val MSE: 0.00704
Early stopping at epoch 1350 (best val MSE: 0.00704).
[FINAL] train MSE=0.0070, MAE=0.0586 | val MSE=0.0070, MAE=0.0598

Métricas (MSE, MAE):
   Treino: (0.007047474067817064, 0.05860346412135196)
Validação: (0.007038230400436376, 0.05977081372699358)

Demonstração:
Entrada: piso=madeira, sujeira=2, dist=5.0 m  ->  velocidade=4.5  potência=1.25
Entrada: piso=cerâmica, sujeira=6, dist=2.5 m  ->  velocidade=2.16  potência=1.76
Entrada: piso=carpete, sujeira=9, dist=0.5 m  ->  velocidade=1.1  p