## Feedforward Neural Network Toolkit

In [None]:
import numpy as np
import pandas as pd
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Dict, Iterable, Iterator, Tuple

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
import matplotlib.pyplot as plt

LABEL_MAP: Dict[int, str] = {0: 'a', 1: 'e', 2: 'g', 3: 'i', 4: 'l', 5: 'n', 6: 'o', 7: 'r', 8: 't', 9: 'u'}
OCR_ROOT = Path('图像分类-dataset')
REG_ROOT = Path('回归-dataset')

def sigmoid(x: np.ndarray) -> np.ndarray:
    x = np.clip(x, -60.0, 60.0)
    return 1.0 / (1.0 + np.exp(-x))

def relu(x: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, x)

def softmax(z: np.ndarray) -> np.ndarray:
    shifted = z - np.max(z, axis=1, keepdims=True)
    exp = np.exp(shifted)
    return exp / np.sum(exp, axis=1, keepdims=True)

def cross_entropy(probs: np.ndarray, targets: np.ndarray) -> float:
    probs = np.clip(probs, 1e-12, 1.0)
    return -float(np.sum(targets * np.log(probs)) / len(targets))

def mse(preds: np.ndarray, targets: np.ndarray) -> float:
    diff = preds - targets
    return float(0.5 * np.mean(diff * diff))

@dataclass(frozen=True)
class ActivationSpec:
    forward: Callable[[np.ndarray], np.ndarray]
    backward: Callable[[np.ndarray, np.ndarray], np.ndarray]

def _sigmoid_grad(_: np.ndarray, activated: np.ndarray) -> np.ndarray:
    return activated * (1.0 - activated)

def _relu_grad(pre_activation: np.ndarray, _: np.ndarray) -> np.ndarray:
    return (pre_activation > 0.0).astype(float)

ACTIVATIONS: Dict[str, ActivationSpec] = {
    'sigmoid': ActivationSpec(sigmoid, _sigmoid_grad),
    'relu': ActivationSpec(relu, _relu_grad),
}

class FeedForwardNetwork:
    def __init__(self, layer_sizes: Iterable[int], activation: str = 'sigmoid', seed: int = 0):
        self.layer_sizes = tuple(layer_sizes)
        if len(self.layer_sizes) < 2:
            raise ValueError('layer_sizes must include input and output dimensions')
        try:
            self.activation = ACTIVATIONS[activation]
        except KeyError as exc:
            raise ValueError(f'Unsupported activation: {activation}') from exc
        rng = np.random.default_rng(seed)
        self.params = [
            [rng.standard_normal((inp, out)) * np.sqrt(2.0 / (inp + out)), np.zeros(out)]
            for inp, out in zip(self.layer_sizes[:-1], self.layer_sizes[1:])
        ]

    def forward(self, X: np.ndarray, return_cache: bool = False):
        activations = [X]
        pre_acts = []
        for idx, (W, b) in enumerate(self.params):
            z = activations[-1] @ W + b
            pre_acts.append(z)
            if idx == len(self.params) - 1:
                a = z
            else:
                a = self.activation.forward(z)
            activations.append(a)
        if return_cache:
            return activations[-1], (activations, pre_acts)
        return activations[-1]

    def backward(self, grad_output: np.ndarray, cache: Tuple[Iterable[np.ndarray], Iterable[np.ndarray]]):
        activations, pre_acts = cache
        grads = []
        delta = grad_output
        batch_size = activations[0].shape[0]
        for idx in reversed(range(len(self.params))):
            a_prev = activations[idx]
            W, _ = self.params[idx]
            grads.insert(0, (a_prev.T @ delta / batch_size, np.mean(delta, axis=0)))
            if idx:
                delta = delta @ W.T
                delta *= self.activation.backward(pre_acts[idx - 1], activations[idx])
        return grads

    def apply_grads(self, grads, lr: float):
        for (dW, db), param in zip(grads, self.params):
            param[0] -= lr * dW
            param[1] -= lr * db

    def predict(self, X: np.ndarray) -> np.ndarray:
        return np.argmax(self.forward(X), axis=1)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return softmax(self.forward(X))

    def predict_regression(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X)

def _create_encoder() -> OneHotEncoder:
    try:
        return OneHotEncoder(sparse_output=False)
    except TypeError:
        return OneHotEncoder(sparse=False)

@dataclass(frozen=True)
class ClassificationData:
    X_train: np.ndarray
    y_train: np.ndarray
    y_train_onehot: np.ndarray
    X_test: np.ndarray
    y_test: np.ndarray
    encoder: OneHotEncoder

def load_ocr_dataset(dataset_size: int, test_ratio: float = 0.2) -> ClassificationData:
    df = pd.read_csv(OCR_ROOT / f'{dataset_size}Train.csv', header=None)
    y = df.iloc[:, 0].to_numpy(dtype=int)
    X = df.iloc[:, 1:].to_numpy(dtype=np.float32)
    if X.max() > 1.0:
        X = X / 255.0
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_ratio, stratify=y, random_state=0
    )
    encoder = _create_encoder()
    y_train_onehot = encoder.fit_transform(y_train.reshape(-1, 1))
    y_test_onehot = encoder.transform(y_test.reshape(-1, 1))
    return ClassificationData(X_train, y_train, y_train_onehot, X_test, y_test, encoder)

def iterate_minibatches(*arrays: np.ndarray, batch_size: int, rng: np.random.Generator) -> Iterator[Tuple[np.ndarray, ...]]:
    if not arrays:
        return
    size = arrays[0].shape[0]
    order = rng.permutation(size)
    for start in range(0, size, batch_size):
        idx = order[start:start + batch_size]
        yield tuple(arr[idx] for arr in arrays)

@dataclass
class ClassifierResult:
    network: FeedForwardNetwork
    losses: list
    accuracy: float
    probs: np.ndarray
    preds: np.ndarray
    data: ClassificationData

def train_classifier(
    dataset_size: int,
    activation: str = 'sigmoid',
    hidden_layers: Tuple[int, ...] = (128, 62),
    lr: float = 0.5,
    epochs: int = 50,
    batch_size: int = 64,
    seed: int = 0,
) -> ClassifierResult:
    data = load_ocr_dataset(dataset_size)
    layer_sizes = (data.X_train.shape[1], *hidden_layers, data.y_train_onehot.shape[1])
    net = FeedForwardNetwork(layer_sizes, activation=activation, seed=seed)
    rng = np.random.default_rng(seed)
    losses = []
    for _ in range(epochs):
        for xb, yb in iterate_minibatches(data.X_train, data.y_train_onehot, batch_size=batch_size, rng=rng):
            logits, cache = net.forward(xb, return_cache=True)
            probs = softmax(logits)
            grad_logits = (probs - yb) / len(xb)
            grads = net.backward(grad_logits, cache)
            net.apply_grads(grads, lr)
        train_logits = net.forward(data.X_train)
        losses.append(cross_entropy(softmax(train_logits), data.y_train_onehot))
    test_probs = net.predict_proba(data.X_test)
    test_preds = np.argmax(test_probs, axis=1)
    accuracy = float(np.mean(test_preds == data.y_test))
    return ClassifierResult(net, losses, accuracy, test_probs, test_preds, data)

def plot_classifier_losses(results: Dict[str, ClassifierResult]):
    plt.figure(figsize=(7, 4))
    for name, outcome in results.items():
        epochs = range(1, len(outcome.losses) + 1)
        plt.plot(list(epochs), outcome.losses, label=name)
    plt.xlabel('Epoch')
    plt.ylabel('Cross-Entropy')
    plt.title('Training Loss')
    plt.legend()
    plt.grid(True, linewidth=0.3)
    plt.tight_layout()

def show_classifier_examples(result: ClassifierResult, count: int = 5, seed: int = 0):
    rng = np.random.default_rng(seed)
    indices = rng.choice(len(result.data.X_test), size=count, replace=False)
    fig, axes = plt.subplots(1, count, figsize=(3 * count, 3))
    for ax, idx in zip(axes, indices):
        image = result.data.X_test[idx].reshape(16, 8)
        true_label = LABEL_MAP[result.data.y_test[idx]]
        pred_label = LABEL_MAP[result.preds[idx]]
        ax.imshow(1.0 - image, cmap='gray', vmin=0, vmax=1)
        ax.set_title(f'T:{true_label} P:{pred_label}')
        ax.axis('off')
    plt.tight_layout()

@dataclass(frozen=True)
class NormalizationStats:
    x_mean: float
    x_std: float
    y_mean: float
    y_std: float

@dataclass(frozen=True)
class RegressionSplit:
    X: np.ndarray
    y: np.ndarray

@dataclass(frozen=True)
class RegressionDataset:
    train: RegressionSplit
    id: RegressionSplit
    ood: RegressionSplit
    raw: Dict[str, RegressionSplit]
    stats: NormalizationStats

def prepare_regression_data(train_size: int = 1800, seed: int = 42) -> RegressionDataset:
    train_df = pd.read_csv(REG_ROOT / 'data_train.csv', header=None, skiprows=1)
    valid_df = pd.read_csv(REG_ROOT / 'data_valid.csv', header=None, skiprows=1)
    test_df = pd.read_csv(REG_ROOT / 'data_test.csv', header=None, skiprows=1)
    shuffled = train_df.sample(frac=1.0, random_state=seed).reset_index(drop=True)
    X_all = shuffled.iloc[:, 0].to_numpy(float).reshape(-1, 1)
    y_all = shuffled.iloc[:, 1].to_numpy(float).reshape(-1, 1)
    X_train, y_train = X_all[:train_size], y_all[:train_size]
    X_id, y_id = X_all[train_size:], y_all[train_size:]
    X_ood = np.concatenate(
        [valid_df.iloc[:, 0].to_numpy(float), test_df.iloc[:, 0].to_numpy(float)]
    ).reshape(-1, 1)
    y_ood = np.concatenate(
        [valid_df.iloc[:, 1].to_numpy(float), test_df.iloc[:, 1].to_numpy(float)]
    ).reshape(-1, 1)
    x_mean = float(X_train.mean())
    x_std = max(float(X_train.std()), 1e-8)
    y_mean = float(y_train.mean())
    y_std = max(float(y_train.std()), 1e-8)
    stats = NormalizationStats(x_mean, x_std, y_mean, y_std)
    def norm_x(x: np.ndarray) -> np.ndarray:
        return (x - stats.x_mean) / stats.x_std
    def norm_y(y: np.ndarray) -> np.ndarray:
        return (y - stats.y_mean) / stats.y_std
    train_split = RegressionSplit(norm_x(X_train), norm_y(y_train))
    id_split = RegressionSplit(norm_x(X_id), norm_y(y_id))
    ood_split = RegressionSplit(norm_x(X_ood), norm_y(y_ood))
    raw = {
        'train': RegressionSplit(X_train, y_train),
        'id': RegressionSplit(X_id, y_id),
        'ood': RegressionSplit(X_ood, y_ood),
    }
    return RegressionDataset(train_split, id_split, ood_split, raw, stats)

def denormalize(y: np.ndarray, stats: NormalizationStats) -> np.ndarray:
    return y * stats.y_std + stats.y_mean

@dataclass
class RegressionResult:
    network: FeedForwardNetwork
    data: RegressionDataset
    history: Dict[str, list]

def train_regressor(
    train_size: int = 1800,
    hidden_layers: Tuple[int, ...] = (64, 32),
    lr: float = 1e-3,
    epochs: int = 20000,
    log_every: int = 100,
    seed: int = 42,
) -> RegressionResult:
    data = prepare_regression_data(train_size=train_size, seed=seed)
    net = FeedForwardNetwork((1, *hidden_layers, 1), activation='sigmoid', seed=seed)
    history = {'steps': [], 'train': [], 'id': [], 'ood': []}
    X_train, y_train = data.train.X, data.train.y
    for step in range(1, epochs + 1):
        preds, cache = net.forward(X_train, return_cache=True)
        grad = (preds - y_train) / len(X_train)
        grads = net.backward(grad, cache)
        net.apply_grads(grads, lr)
        if step % log_every == 0 or step == epochs:
            history['steps'].append(step)
            history['train'].append(mse(net.predict_regression(X_train), y_train))
            history['id'].append(mse(net.predict_regression(data.id.X), data.id.y))
            history['ood'].append(mse(net.predict_regression(data.ood.X), data.ood.y))
    return RegressionResult(net, data, history)

def plot_regression_fit(result: RegressionResult, title: str = 'FFNN Regression', grid=(-4.0, 6.0, 300)):
    stats = result.data.stats
    net = result.network
    x_grid = np.linspace(grid[0], grid[1], grid[2]).reshape(-1, 1)
    x_norm = (x_grid - stats.x_mean) / stats.x_std
    y_grid = denormalize(net.predict_regression(x_norm), stats)
    plt.figure(figsize=(6, 4))
    for key, color in [('train', 'tab:blue'), ('id', 'tab:orange'), ('ood', 'tab:green')]:
        split = result.data.raw[key]
        plt.scatter(split.X, split.y, s=10, label=key.upper(), color=color)
    plt.plot(x_grid, y_grid, color='tab:red', linewidth=2, label='prediction')
    plt.axvline(x=2.0, color='black', linestyle='--')
    plt.legend()
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.title(title)
    plt.tight_layout()

def plot_regression_history(result: RegressionResult):
    hist = result.history
    plt.figure(figsize=(6, 4))
    plt.plot(hist['steps'], hist['train'], label='train')
    plt.plot(hist['steps'], hist['id'], label='id test')
    plt.plot(hist['steps'], hist['ood'], label='ood test')
    plt.xlabel('Epoch')
    plt.ylabel('MSE')
    plt.title('Regression Loss History')
    plt.legend()
    plt.grid(True, linewidth=0.3)
    plt.tight_layout()

def computational_graph_values(x: Iterable[float]):
    x1, x2, x3 = x
    z1 = 2 * x1 + x2
    z2 = x1 * 3 * x3
    z3 = -x3 * 2 * x2
    u1 = np.sin(z1)
    u2 = 6 * x3 + 2 * z2
    u3 = 2 * z1 + z3
    v1 = u1 + np.cos(u3)
    v2 = np.sin(-u2)
    v3 = u1 * u3
    y1 = v1 ** 2 + v2 ** 3
    y2 = v2 * v3
    J_yv = np.array([[2 * v1, 3 * v2 ** 2, 0.0], [0.0, v3, v2]])
    J_vu = np.array([[1.0, 0.0, -np.sin(u3)], [0.0, -np.cos(-u2), 0.0], [u3, 0.0, u1]])
    return {
        'y': np.array([y1, y2], dtype=float),
        'J_yv': J_yv,
        'J_vu': J_vu,
        'J_yu': J_yv @ J_vu,
    }

def numeric_jacobian(func, x: Iterable[float], h: float = 1e-5) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    y0 = func(x)
    m = len(y0)
    n = len(x)
    J = np.zeros((m, n))
    for j in range(n):
        dx = np.zeros_like(x)
        dx[j] = h
        J[:, j] = (func(x + dx) - func(x - dx)) / (2 * h)
    return J

def verify_computational_graph(x: Iterable[float], h: float = 1e-5):
    values = computational_graph_values(x)
    numeric = numeric_jacobian(lambda inp: computational_graph_values(inp)['y'], x, h)
    values['J_numeric'] = numeric
    return values
