In [8]:
import numpy as np


class Context:
    """Context to save variables for backward computation."""

    def __init__(self):
        self.saved_tensors = ()

    def save_for_backward(self, *tensors):
        self.saved_tensors = tuple(tensors)


class Op:
    @staticmethod
    def forward(ctx, *args):
        raise NotImplementedError

    @staticmethod
    def backward(ctx, grad_output):
        raise NotImplementedError

    @classmethod
    def apply(cls, *inputs):
        ctx = Context()
        raw_inputs = [x.data if isinstance(x, Tensor) else x for x in inputs]
        result_data = cls.forward(ctx, *raw_inputs)

        requires_grad = any(isinstance(x, Tensor) and x.requires_grad for x in inputs)
        out = Tensor(result_data, requires_grad=requires_grad)
        if requires_grad:
            out.grad_fn = (cls, ctx, inputs)
        return out


def reduce_grad_to_shape(grad, shape):
    """
    Match the gradient dimension for the input tensor
    - grad: output gradient
    - shape: original input tensor shape
    """
    # remove expanded dimension for batched data
    while grad.ndim > len(shape):
        grad = grad.sum(axis=0)

    # remove the broadcasted axis
    for i, dim in enumerate(shape):
        if dim == 1 and (grad.shape[i] != 1):
            grad = grad.sum(axis=i, keepdims=True)

    return grad


class Add(Op):
    @staticmethod
    def forward(ctx, a, b):
        ctx.save_for_backward(a.shape, b.shape)
        return a + b

    @staticmethod
    def backward(ctx, grad_output):
        a_shape, b_shape = ctx.saved_tensors
        grad_a = reduce_grad_to_shape(grad_output, a_shape)
        grad_b = reduce_grad_to_shape(grad_output, b_shape)
        return grad_a, grad_b


class Mul(Op):
    @staticmethod
    def forward(ctx, a, b):
        ctx.save_for_backward(a, b)
        return a * b

    @staticmethod
    def backward(ctx, grad_output):
        a, b = ctx.saved_tensors
        grad_a = reduce_grad_to_shape(b * grad_output, a.shape)
        grad_b = reduce_grad_to_shape(a * grad_output, b.shape)
        return grad_a, grad_b


class MatMul(Op):
    @staticmethod
    def forward(ctx, a, b):
        ctx.save_for_backward(a, b)
        return a @ b

    @staticmethod
    def backward(ctx, grad_output):
        a, b = ctx.saved_tensors
        grad_a = grad_output @ b.T
        grad_b = a.T @ grad_output
        return grad_a, grad_b


class Pow(Op):
    @staticmethod
    def forward(ctx, a, b):
        ctx.save_for_backward(a, b)
        return a**b

    @staticmethod
    def backward(ctx, grad_output):
        a, b = ctx.saved_tensors
        grad_a = grad_output * a ** (b - 1) * b
        grad_b = grad_output * a**b * np.log(a)
        return grad_a, grad_b


class Sum(Op):
    @staticmethod
    def forward(ctx, a, axis=None, keepdims=False):
        ctx.save_for_backward(a.shape, axis, keepdims)
        return np.sum(a, axis=axis, keepdims=keepdims)

    @staticmethod
    def backward(ctx, grad_output):
        (a_shape, axis, keepdims) = ctx.saved_tensors

        # Restore reduced dimensions
        if not keepdims and axis is not None:
            to_expand = axis
            grad_output = np.expand_dims(grad_output, to_expand)

        grad_a = np.broadcast_to(grad_output, a_shape)
        return grad_a


class ReLU(Op):
    @staticmethod
    def forward(ctx, a):
        ctx.save_for_backward(a)
        return np.maximum(a, 0)

    @staticmethod
    def backward(ctx, grad_output):
        (a,) = ctx.saved_tensors
        grad_a = (a > 0) * grad_output
        return reduce_grad_to_shape(grad_a, a.shape)


class CrossEntropyLoss(Op):
    @staticmethod
    def forward(ctx, logits, targets):
        assert logits.ndim <= 2, "Multidimensional classes not supported"

        # Log-Softmax, see https://arxiv.org/pdf/1909.03469
        logits_max = logits.max(axis=-1, keepdims=True)
        logits_shifted = logits - logits_max
        logsumexp = np.log(np.sum(np.exp(logits_shifted), axis=-1, keepdims=True))
        log_probs = logits_shifted - logsumexp

        losses = -np.sum(targets * log_probs, axis=-1)
        ctx.save_for_backward(log_probs, targets, np.size(losses))

        return np.mean(losses)

    @staticmethod
    def backward(ctx, grad_output):
        log_probs, targets, batch_size = ctx.saved_tensors

        grad_logits = (
            grad_output
            * (targets.sum(-1, keepdims=True) * np.exp(log_probs) - targets)
            / batch_size
        )
        grad_targets = grad_output * -log_probs / batch_size
        return grad_logits, grad_targets


class Log(Op):
    @staticmethod
    def forward(ctx, x):
        ctx.save_for_backward(x)
        return np.log(x)

    @staticmethod
    def backward(ctx, grad_output):
        (x,) = ctx.saved_tensors
        grad_x = grad_output * (1.0 / x)
        return reduce_grad_to_shape(grad_x, x.shape)


class NLLLoss(Op):
    @staticmethod
    def forward(ctx, log_probs, targets_one_hot):
        assert log_probs.ndim <= 2, "Multidimensional classes not supported"

        # PyTorch batches on dim 0 or 1
        losses = -np.sum(targets_one_hot * log_probs, axis=-1)
        ctx.save_for_backward(log_probs, targets_one_hot, np.size(losses))

        return np.mean(losses)

    @staticmethod
    def backward(ctx, grad_output):
        log_probs, targets_one_hot, batch_size = ctx.saved_tensors

        grad_log_probs = grad_output * -targets_one_hot / batch_size
        grad_targets = grad_output * -log_probs / batch_size

        return grad_log_probs, grad_targets


class Softmax(Op):
    @staticmethod
    def forward(ctx, logits):
        # Shifting for stability, see https://arxiv.org/pdf/1909.03469
        logits_max = logits.max(axis=-1, keepdims=True)
        logits_exp_shifted = np.exp(logits - logits_max)
        probs = logits_exp_shifted / logits_exp_shifted.sum(axis=-1, keepdims=True)
        ctx.save_for_backward(probs)

        return probs

    @staticmethod
    def backward(ctx, grad_output):
        (probs,) = ctx.saved_tensors
        # https://eli.thegreenplace.net/2016/the-softmax-function-and-its-derivative/
        # https://github.com/jax-ml/jax/blob/f74467851b1186b434d4b538d0be419378a47a69/jax/_src/nn/functions.py#L648-L652
        return probs * (grad_output - (probs * grad_output).sum(axis=-1, keepdims=True))


In [9]:
class Tensor:
    def __init__(self, data, requires_grad=False):
        if isinstance(data, Tensor):
            data = data.data  # unwrap
        self.data = np.array(data, dtype=np.float32)
        self.requires_grad = requires_grad
        self.grad = None
        self.grad_fn = None  # (OpClass, ctx, inputs) for non-leaf tensors

    # Operator overloads delegate to our Op classes via apply:
    def __add__(self, other):
        return Add.apply(self, other)

    def __radd__(self, other):
        return Add.apply(other, self)

    def __sub__(self, other):
        return Add.apply(self, Mul.apply(other, Tensor(-1.0)))

    def __rsub__(self, other):
        return Add.apply(other, Mul.apply(self, Tensor(-1.0)))

    def __mul__(self, other):
        return Mul.apply(self, other)

    def __rmul__(self, other):
        return Mul.apply(other, self)

    def __matmul__(self, other):
        return MatMul.apply(self, other)  # matrix @

    def __neg__(self):
        return Mul.apply(self, Tensor(-1.0))

    def __truediv__(self, other):
        return Mul.apply(self, Tensor(1.0) / other)

    def __pow__(self, exponent):
        return Pow.apply(self, exponent)

    def __rpow__(self, base):
        return Pow.apply(base, self)

    def __repr__(self):
        return f"Tensor(data={self.data}, grad={self.grad}, requires_grad={self.requires_grad})"

    # initialization
    @staticmethod
    def zeros(shape, requires_grad=False):
        return Tensor(np.zeros(shape, dtype=np.float32), requires_grad=requires_grad)

    @staticmethod
    def ones(shape, requires_grad=False):
        return Tensor(np.ones(shape, dtype=np.float32), requires_grad=requires_grad)

    def backward(self, grad_output=None):
        if not self.requires_grad:
            raise RuntimeError(
                "Cannot call backward on a tensor that does not require grad."
            )
        # If no grad specified, tensor must be scalar
        if grad_output is None:
            if self.data.size != 1:
                raise RuntimeError(
                    "grad_output must be provided for non-scalar tensors"
                )
            grad_output = np.ones_like(self.data, dtype=np.float32)
        else:
            grad_output = np.array(grad_output, dtype=np.float32)
        # Initialize this tensor's gradient
        self.grad = grad_output

        # Build a topologically sorted list of tensors (post-order DFS)
        topo_order = []
        visited = set()

        def build_graph(t):
            if isinstance(t, Tensor) and t not in visited:
                visited.add(t)
                if t.grad_fn is not None:  # not a leaf
                    op_cls, ctx, inputs = t.grad_fn
                    for inp in inputs:
                        build_graph(inp)
                topo_order.append(t)

        build_graph(self)

        # Traverse graph in reverse topological order, apply chain rule
        for t in reversed(topo_order):
            if t.grad_fn is None:
                continue  # leaf node (no backward op)
            op_cls, ctx, inputs = t.grad_fn
            grad_out = t.grad  # gradient of the output w.rt. this tensor
            # Compute gradients of inputs via this op's backward
            grad_inputs = op_cls.backward(ctx, grad_out)
            if grad_inputs is None:
                grad_inputs = ()
            elif not isinstance(grad_inputs, tuple):
                grad_inputs = (grad_inputs,)
            # Accumulate gradients into input tensors
            for inp, grad in zip(inputs, grad_inputs):
                if isinstance(inp, Tensor) and inp.requires_grad and grad is not None:
                    grad = np.array(grad, dtype=np.float32)  # ensure numpy
                    if inp.grad is None:
                        inp.grad = grad
                    else:
                        inp.grad += grad


In [11]:
def _node_name(t: "Tensor"):
    if t.grad_fn is None:
        return f"Leaf(shape={tuple(t.data.shape)}, requires_grad={t.requires_grad})"
    op_cls, _, _ = t.grad_fn
    return f"{op_cls.__name__}(out_shape={tuple(t.data.shape)})"


def _short_id(obj):
    return hex(id(obj))[-5:]  # 보기 좋은 짧은 id


def trace_graph(root: "Tensor"):
    """루트 텐서에서 시작해 모든 조상(부모 텐서)들을 수집하고
    토폴로지 순서를 반환합니다."""
    visited = set()
    topo = []

    def build(t):
        if not isinstance(t, Tensor) or t in visited:
            return
        visited.add(t)
        if t.grad_fn is not None:
            op_cls, ctx, inputs = t.grad_fn
            for inp in inputs:
                if isinstance(inp, Tensor):
                    build(inp)
        topo.append(t)

    build(root)
    return topo  # 부모가 먼저, root가 마지막(포스트오더)


def print_autograd_graph(root: "Tensor"):
    topo = trace_graph(root)
    print("=== Autograd Graph (parents -> node) ===")
    for node in topo:
        label_node = _node_name(node)
        nid = _short_id(node)
        if node.grad_fn is None:
            print(f"[{nid}] {label_node}")
            continue
        op_cls, _, inputs = node.grad_fn
        # 부모들 나열 (부모) --op--> (자식=node)
        for inp in inputs:
            if isinstance(inp, Tensor):
                pid = _short_id(inp)
                print(
                    f"[{pid}] {_node_name(inp)} --{op_cls.__name__}--> [{nid}] {label_node}"
                )
            else:
                print(f"[const {inp}] --{op_cls.__name__}--> [{nid}] {label_node}")

    # backward 방문 순서(실제로 backward에서 사용하는 역토폴로지)
    print("\n=== Backward visit order (reverse topological) ===")
    for node in reversed(topo):
        print(f"[{_short_id(node)}] {_node_name(node)}")


# f(x, w, b) = ReLU(x*w + b)
x = Tensor(2.0, requires_grad=True)
w = Tensor(3.0, requires_grad=True)
b = Tensor(-4.0, requires_grad=True)

z = x * w + b  # Add(Mul(x,w), b)
y = ReLU.apply(z)  # ReLU(z)
loss = y  # 스칼라처럼 취급 (여기선 y가 스칼라)

print_autograd_graph(loss)
loss.backward()

print("\n=== Gradients ===")
print("dl/dx =", x.grad)  # ReLU'(z) * w  (z>0이면 w, 아니면 0)
print("dl/dw =", w.grad)  # ReLU'(z) * x
print("dl/db =", b.grad)  # ReLU'(z) * 1

=== Autograd Graph (parents -> node) ===
[34650] Leaf(shape=(), requires_grad=True)
[4dc40] Leaf(shape=(), requires_grad=True)
[34650] Leaf(shape=(), requires_grad=True) --Mul--> [b9dc0] Mul(out_shape=())
[4dc40] Leaf(shape=(), requires_grad=True) --Mul--> [b9dc0] Mul(out_shape=())
[8c440] Leaf(shape=(), requires_grad=True)
[b9dc0] Mul(out_shape=()) --Add--> [c8d40] Add(out_shape=())
[8c440] Leaf(shape=(), requires_grad=True) --Add--> [c8d40] Add(out_shape=())
[c8d40] Add(out_shape=()) --ReLU--> [37da0] ReLU(out_shape=())

=== Backward visit order (reverse topological) ===
[37da0] ReLU(out_shape=())
[c8d40] Add(out_shape=())
[8c440] Leaf(shape=(), requires_grad=True)
[b9dc0] Mul(out_shape=())
[4dc40] Leaf(shape=(), requires_grad=True)
[34650] Leaf(shape=(), requires_grad=True)

=== Gradients ===
dl/dx = 3.0
dl/dw = 2.0
dl/db = 1.0


In [12]:
x = Tensor(2.0, requires_grad=True)
w = Tensor(3.0, requires_grad=True)
b = Tensor(-4.0, requires_grad=True)

z = x * w + b  # Add(Mul(x,w), b)
y = ReLU.apply(z)  # ReLU(z)
loss = y  # 스칼라처럼 취급 (여기선 y가 스칼라)

print_autograd_graph(loss)
loss.backward()

print("\n=== Gradients ===")
print("dl/dx =", x.grad)  # ReLU'(z) * w  (z>0이면 w, 아니면 0)
print("dl/dw =", w.grad)  # ReLU'(z) * x
print("dl/db =", b.grad)  # ReLU'(z) * 1

=== Autograd Graph (parents -> node) ===
[43d70] Leaf(shape=(), requires_grad=True)
[8dc40] Leaf(shape=(), requires_grad=True)
[43d70] Leaf(shape=(), requires_grad=True) --Mul--> [41e20] Mul(out_shape=())
[8dc40] Leaf(shape=(), requires_grad=True) --Mul--> [41e20] Mul(out_shape=())
[40ef0] Leaf(shape=(), requires_grad=True)
[41e20] Mul(out_shape=()) --Add--> [40830] Add(out_shape=())
[40ef0] Leaf(shape=(), requires_grad=True) --Add--> [40830] Add(out_shape=())
[40830] Add(out_shape=()) --ReLU--> [402c0] ReLU(out_shape=())

=== Backward visit order (reverse topological) ===
[402c0] ReLU(out_shape=())
[40830] Add(out_shape=())
[40ef0] Leaf(shape=(), requires_grad=True)
[41e20] Mul(out_shape=())
[8dc40] Leaf(shape=(), requires_grad=True)
[43d70] Leaf(shape=(), requires_grad=True)

=== Gradients ===
dl/dx = 3.0
dl/dw = 2.0
dl/db = 1.0


In [13]:
a = Tensor(np.array([0.4, 1.0, -2.0], dtype=np.float32), requires_grad=True)

y = ReLU.apply(a * Tensor(2.0) - Tensor(1.0))  # elementwise
loss = Sum.apply(y)  # Sum op로 스칼라화

print_autograd_graph(loss)
loss.backward()

print("\n=== Values & Gradients ===")
print("y =", y.data)  # [0., 1., 0.]  (ReLU 결과)
print("dl/da =", a.grad)  # ReLU>0인 위치에서 2, 나머지는 0  → [0., 2., 0.]

=== Autograd Graph (parents -> node) ===
[42930] Leaf(shape=(3,), requires_grad=True)
[42450] Leaf(shape=(), requires_grad=False)
[42930] Leaf(shape=(3,), requires_grad=True) --Mul--> [43650] Mul(out_shape=(3,))
[42450] Leaf(shape=(), requires_grad=False) --Mul--> [43650] Mul(out_shape=(3,))
[42a50] Leaf(shape=(), requires_grad=False)
[43650] Mul(out_shape=(3,)) --Add--> [43170] Add(out_shape=(3,))
[42a50] Leaf(shape=(), requires_grad=False) --Add--> [43170] Add(out_shape=(3,))
[43170] Add(out_shape=(3,)) --ReLU--> [42f90] ReLU(out_shape=(3,))
[42f90] ReLU(out_shape=(3,)) --Sum--> [43a70] Sum(out_shape=())

=== Backward visit order (reverse topological) ===
[43a70] Sum(out_shape=())
[42f90] ReLU(out_shape=(3,))
[43170] Add(out_shape=(3,))
[42a50] Leaf(shape=(), requires_grad=False)
[43650] Mul(out_shape=(3,))
[42450] Leaf(shape=(), requires_grad=False)
[42930] Leaf(shape=(3,), requires_grad=True)

=== Values & Gradients ===
y = [0. 1. 0.]
dl/da = [0. 2. 0.]


In [14]:
import os, gzip
from urllib.request import urlretrieve


def load_mnist(path=None):
    """Download MNIST and load it into NumPy arrays."""
    # url_base = "http://yann.lecun.com/exdb/mnist/"
    url_base = "https://ossci-datasets.s3.amazonaws.com/mnist/"

    files = {
        "train_images": "train-images-idx3-ubyte.gz",
        "train_labels": "train-labels-idx1-ubyte.gz",
        "test_images": "t10k-images-idx3-ubyte.gz",
        "test_labels": "t10k-labels-idx1-ubyte.gz",
    }
    # Default path to ./data/mnist
    if path is None:
        path = os.path.join(os.path.expanduser("./"), "data", "mnist")
    os.makedirs(path, exist_ok=True)

    # Download missing files
    for name, filename in files.items():
        filepath = os.path.join(path, filename)
        if not os.path.isfile(filepath):
            urlretrieve(url_base + filename, filepath)
            print(f"Downloaded {filename}")

    # Load images
    def _read_images(filename):
        with gzip.open(os.path.join(path, filename), "rb") as f:
            data = f.read()
            # The first 16 bytes are header (magic, num, rows, cols)
            images = np.frombuffer(data, dtype=np.uint8, offset=16)
        images = images.reshape(-1, 28 * 28).astype(np.float32) / 255.0
        return images

    # Load labels
    def _read_labels(filename):
        with gzip.open(os.path.join(path, filename), "rb") as f:
            data = f.read()
            # First 8 bytes are header (magic, num)
            labels = np.frombuffer(data, dtype=np.uint8, offset=8)
        # Convert to one-hot vectors of length 10
        one_hot = np.zeros((labels.size, 10), dtype=np.float32)
        one_hot[np.arange(labels.size), labels] = 1.0
        return one_hot

    # Read all parts
    X_train = _read_images(files["train_images"])
    y_train = _read_labels(files["train_labels"])
    X_test = _read_images(files["test_images"])
    y_test = _read_labels(files["test_labels"])
    return X_train, y_train, X_test, y_test


# Load the MNIST data (this will download if not already present)
X_train, y_train, X_test, y_test = load_mnist()
print("MNIST data loaded:", X_train.shape, y_train.shape)

MNIST data loaded: (60000, 784) (60000, 10)


In [16]:
# Initialize network parameters
class MLP3:
    def __init__(self, in_dim=784, h1=128, h2=64, out_dim=10, seed=42):
        rng = np.random.default_rng(seed)
        self.W1 = Tensor(rng.normal(0, 0.01, (in_dim, h1)), requires_grad=True)
        self.b1 = Tensor(np.zeros(h1, dtype=np.float32), requires_grad=True)
        self.W2 = Tensor(rng.normal(0, 0.01, (h1, h2)), requires_grad=True)
        self.b2 = Tensor(np.zeros(h2, dtype=np.float32), requires_grad=True)
        self.W3 = Tensor(rng.normal(0, 0.01, (h2, out_dim)), requires_grad=True)
        self.b3 = Tensor(np.zeros(out_dim, dtype=np.float32), requires_grad=True)

    @property
    def params(self):
        return [self.W1, self.b1, self.W2, self.b2, self.W3, self.b3]

    def __call__(self, x: Tensor) -> Tensor:
        h1 = ReLU.apply(x @ self.W1 + self.b1)
        h2 = ReLU.apply(h1 @ self.W2 + self.b2)
        logits = h2 @ self.W3 + self.b3
        return logits


model = MLP3()
learning_rate = 0.1
batch_size = 100
num_epochs = 10

for epoch in range(num_epochs):
    # Shuffle training data
    perm = np.random.permutation(X_train.shape[0])
    X_train, y_train = X_train[perm], y_train[perm]
    total_loss = 0.0

    for i in range(0, X_train.shape[0], batch_size):
        # Mini-batch slice
        X_batch = Tensor(X_train[i : i + batch_size])  # input batch
        y_batch = Tensor(y_train[i : i + batch_size])  # target batch

        # Forward pass:
        logits = model(X_batch)
        probs = Softmax.apply(logits)
        logp = Log.apply(probs)
        loss = NLLLoss.apply(logp, y_batch)
        # loss = CrossEntropyLoss.apply(logits, y_batch)  # Compute cross-entropy loss over the batch
        total_loss += float(loss.data)

        # Backward pass:
        loss.backward()  # compute gradients for all weights/biases

        # Update parameters with SGD:
        for param in model.params:
            # simple gradient descent step
            param.data -= learning_rate * param.grad
            # reset gradient to zero for next batch
            param.grad = None

        # Evaluate accuracy on the training set (optional) or just print loss
        if ((i) % 1000) == 0:
            print(
                f"Epoch {epoch + 1}, Itr {i} / {X_train.shape[0]}: avg loss = {total_loss / (X_train.shape[0] / batch_size * (i + 1)):.6f}"
            )

    # quick test accuracy
    Xt = Tensor(X_test, requires_grad=False)
    logits = model(Xt)
    preds = np.argmax(logits.data, axis=1)
    true = np.argmax(y_test, axis=1)
    acc = (preds == true).mean()
    print(f"Epoch {epoch + 1} / Test accuracy: {acc * 100:.2f}%")

Epoch 1, Itr 0 / 60000: avg loss = 0.003838
Epoch 1, Itr 1000 / 60000: avg loss = 0.000042
Epoch 1, Itr 2000 / 60000: avg loss = 0.000040
Epoch 1, Itr 3000 / 60000: avg loss = 0.000040
Epoch 1, Itr 4000 / 60000: avg loss = 0.000039
Epoch 1, Itr 5000 / 60000: avg loss = 0.000039
Epoch 1, Itr 6000 / 60000: avg loss = 0.000039
Epoch 1, Itr 7000 / 60000: avg loss = 0.000039
Epoch 1, Itr 8000 / 60000: avg loss = 0.000039
Epoch 1, Itr 9000 / 60000: avg loss = 0.000039
Epoch 1, Itr 10000 / 60000: avg loss = 0.000039
Epoch 1, Itr 11000 / 60000: avg loss = 0.000039
Epoch 1, Itr 12000 / 60000: avg loss = 0.000039
Epoch 1, Itr 13000 / 60000: avg loss = 0.000039
Epoch 1, Itr 14000 / 60000: avg loss = 0.000039
Epoch 1, Itr 15000 / 60000: avg loss = 0.000039
Epoch 1, Itr 16000 / 60000: avg loss = 0.000039
Epoch 1, Itr 17000 / 60000: avg loss = 0.000039
Epoch 1, Itr 18000 / 60000: avg loss = 0.000039
Epoch 1, Itr 19000 / 60000: avg loss = 0.000039
Epoch 1, Itr 20000 / 60000: avg loss = 0.000039
Epoch