autograd


In [None]:
from typing import Any, List, Optional, Tuple, Dict
import numpy

NDArray = numpy.ndarray
TENSOR_COUNTER = 0
LAZY_MODE = False

class ABC:
    def __call__(self, *args, **kwargs):
        raise NotImplementedError()
    
    def compute(self, *args: Any, **kwargs: Any) -> Any:
        raise NotImplementedError()
    
    def gradient(self, *args: Any, **kwargs: Any) -> Any:
        raise NotImplementedError()

class Op(ABC):
    """Operator definition."""

    def __call__(self, *args, **kwargs):
        raise NotImplementedError()

    def compute(self, *args: Tuple["NDArray"])->NDArray:
        raise NotImplementedError()

    def gradient(self, out_grad, node):
        raise NotImplementedError()

    def gradient_as_tuple(self, out_grad, node):
        """ Convenience method to always return a tuple from gradient call"""
        output = self.gradient(out_grad, node)
        if isinstance(output, tuple):
            return output
        elif isinstance(output, list):
            return tuple(output)
        else:
            return (output,)

class Value:
    op: Optional[Op]
    inputs: List["Value"]
    cached_data: NDArray
    requires_grad: bool

    def realize_cached_data(self):
        if self.cached_data is not None:
            return self.cached_data
        self.cached_data = self.op.compute(*[x.realize_cached_data() for x in self.inputs])
        return self.cached_data

    def is_leaf(self):
        return self.op is None

    def _init(self, op: Optional[Op], inputs: List["Value"],  *, num_outputs: int=1, cached_data: NDArray = None,
        requires_grad: Optional[bool] = None
    ):
        if requires_grad is None:
            requires_grad = any(x.requires_grad for x in inputs)
        self.op = op
        self.inputs = inputs
        self.cached_data = cached_data
        self.requires_grad = requires_grad

    @classmethod
    def make_const(cls, data, *, requires_grad=False):
        value = cls.__new__(cls)
        value._init(
            None,
            [],
            cached_data=data,
            requires_grad=requires_grad,
        )
        return value
    
    def make_from_op(cls, op: Op, inputs: List["Value"]):
        value = cls.__new__(cls)
        value._init(op, inputs)

        if not LAZY_MODE:
            if not value.requires_grad:
                return value.detach()
            value.get_outputs()
        return value

class Tensor(Value):
    grad: "Tensor"
    def __init__(self, array, *, dtype="float32", requires_grad=True, **kwargs):
        self.array = numpy.array(array, dtype=dtype)
        self.requires_grad = requires_grad
    
    @staticmethod
    def from_numpy(numpy_array, dtype):
        return numpy.array(numpy_array, dtype=dtype)
    
    @staticmethod
    def make_from_op(op: Op, inputs: List["Value"]):
        tensor = Tensor.__new__(Tensor)
        tensor._init(op, inputs)
        if not LAZY_MODE:
            tensor.realize_cached_data()
        return tensor
    
    @staticmethod
    def make_const(data, requires_grad=False):
        tensor = Tensor.__new__(Tensor)
        if isinstance(data, Tensor):
            tensor_data = data.realize_cached_data()
        else:
            tensor_data = data
        tensor._init(None, cached_data=tensor_data, requires_grad=requires_grad)
        return tensor
    
    @property
    def data(self):
        return self.cached_data
    
    def detach(self):
        return Tensor.make_const(self.realize_cached_data())

    @data.setter
    def data(self, value):
        self.cached_data = value.realize_cached_data()

    @property
    def shape(self):
        return self.realize_cached_data().shape
    
    @property
    def dtype(self):
        return self.realize_cached_data().dtype

    def backward(self, out_grad=None):
        if out_grad:
            out_grad = out_grad
        else:
            out_grad = Tensor(numpy.ones(self.shape))
        compute_gradient_of_variables(self, out_grad)

    def __add__(self, other):
        if isinstance(other, Tensor):
            return EWiseAdd()(self, other)
        else:
            return AddScalar(other)(self)
        
    def __mul__(self, other):
        if isinstance(other, Tensor):
            return EWiseMul()(self, other)
        else:
            return MulScalar(other)(self)
        
    def __sub__(self, other):
        if isinstance(other, Tensor):
            return EWiseAdd()(self, Negate()(other))
        else:
            return AddScalar(-other)(self)


def compute_gradient_of_variables(output_tensor, out_grad):
    node_to_output_grads_list: Dict[Tensor, List[Tensor]] = {} # dict结构，用于存储partial adjoint
    node_to_output_grads_list[output_tensor] = [out_grad] 
    reverse_topo_order = list(reversed(find_topo_sort([output_tensor]))) # 请自行实现拓扑排序函数
    for node in reverse_topo_order:
        node.grad = sum(node_to_output_grads_list[node]) # 求node的partial adjoint之和，存入属性grad
        if node.is_leaf():
            continue
        for i, grad in enumerate(node.op.gradient(node.grad, node)): # 计算node.inputs的partial adjoint
            input_node = node.inputs[i]
            if input_node not in node_to_output_grads_list:
                node_to_output_grads_list[input_node] = []
            node_to_output_grads_list[input_node].append(grad)

def find_topo_sort(node_list: List[Value]) -> List[Value]:
    visited = set()
    topo_order = []
    topo_sort_dfs(node_list[-1], visited, topo_order)
    return topo_order


def topo_sort_dfs(node, visited, topo_order):
    if node in visited:
        return
    visited.add(node)
    for pre_node in node.inputs:
        topo_sort_dfs(pre_node, visited, topo_order)
    topo_order.append(node)

class TensorOp(Op):
    def __call__(self, *args):
        return Tensor.make_from_op(self, args)

class EWiseAdd(TensorOp):
    def compute(self, a: NDArray, b: NDArray):
        return a+b
    
    def gradient(self, out_grad: Tensor, node: Tensor):
        return out_grad, out_grad
    
class AddScalar(TensorOp):
    def __init__(self, scalar):
        self.scalar = scalar
    
    def compute(self, a: NDArray):
        return a + self.scalar

    def gradient(self, out_grad: Tensor, node: Tensor):
        return (out_grad, )
    
class EWiseMul(TensorOp):
    def compute(self, a: NDArray, b: NDArray):
        return a * b

    def gradient(self, out_grad: Tensor, node: Tensor):
        a, b = node.inputs
        return out_grad * a, out_grad * b
    
class MulScalar(TensorOp):
    def __init__(self, scalar):
        self.scalar = scalar

    def compute(self, a: NDArray):
        return a * self.scalar

    def gradient(self, out_grad: Tensor, node: Tensor):
        return (out_grad * self.scalar,)
    
class Negate(TensorOp):
    def compute(self, a):
        return numpy.negative(a)

    def gradient(self, out_grad, node):
        return MulScalar(-1)(out_grad)

optim

In [None]:
from abc import abstractmethod

class ABC():
    pass

class Optimizer(ABC):
    def __init__(self, params):
        self.params = params
    
    @abstractmethod
    def step(self):
        pass

    def reset_grad(self):
        for p in self.params:
            p.grad = None

class SGD(Optimizer):
    def __init__(self, params, lr = 0.001):
        super().__init__(params)
        self.lr = lr
    
    def step(self):
        for i, param in enumerate(self.params):
            grad = Tensor(param.grad, dtype='float32').data
            param.data = param.data - grad * self.lr

ops

In [None]:
class ReLU(TensorOp):
    def compute(self, a):
        return numpy.maximum(a, 0)

    def gradient(self, out_grad, node):
        a = node.inputs[0].realize_cached_data()
        return out_grad * Tensor(a > 0)


def relu(a):
    return ReLU()(a)

init

In [None]:
import math
import numpy as np

def randn(*shape, mean=0.0, std=1.0, dtype="float32", requires_grad=False):
    array = np.random.randn(*shape) * std + mean
    return array

def init_He(in_features, out_features):
    s = np.random.normal(0, 1/in_features, in_features*out_features)
    s = s.reshape(in_features, out_features)
    return s

def init_Xavier(in_features, out_features, dtype):
    v = math.sqrt(2/(in_features+out_features))
    return randn(in_features, out_features,std=v, dtype=dtype)


nn

In [None]:
from abc import abstractmethod
from functools import reduce
from typing import List, Any

class ABC:
    pass

class Module(ABC):
    def __init__(self):
        self.training = True
    
    def parameters(self)->List["Tensor"]:
        return _unpack_params(self.__dict__)
    
    def _children(self):
    # 遍历当前对象属性，找出子模块（例如是 Module 类型的实例）
        children = []
        for attr in self.__dict__.values():
            if isinstance(attr, Module):
                children.append(attr)
        return children

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)
    
    @abstractmethod
    def forward(self):
        pass

    def eval(self):
        self.training = False
        for m in self._children():
            m.training = False

    def train(self):
        self.training = True
        for m in self._children():
            m.training = True


class Parameter(Tensor):
    pass

def _unpack_params(value: object) -> List[Tensor]:
    if isinstance(value, Parameter):
        return [value]
    elif isinstance(value, Module):
        return value.parameters()
    elif isinstance(value, dict):
        params = []
        for k, v in value.items():
            params += _unpack_params(v)
        return params
    elif isinstance(value, (list, tuple)):
        params = []
        for v in value:
            params += _unpack_params(v)
        return params
    else:
        return []
    
def _child_modules(value: object) -> List["Module"]:
    if isinstance(value, Module):
        modules = [value]
        modules.extend(_child_modules(value.__dict__))
        return modules
    if isinstance(value, dict):
        modules = []
        for k, v in value.items():
            modules += _child_modules(v)
        return modules
    elif isinstance(value, (list, tuple)):
        modules = []
        for v in value:
            modules += _child_modules(v)
        return modules
    else:
        return []
    
class Linear(Module):
    def __init__(self, in_features, out_features, bias=True, device=None, dtype="float32"):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        s = init_He(in_features, out_features)
        self.weight = Parameter(s, dtype="float32")
        if bias:
            self.bias = Tensor(np.zeros(self.out_features),dtype="float32")
        else:
            self.bias = None

    def forward(self, X: Tensor) -> Tensor:
        X_out = X @ self.weight
        if self.bias:
            return X_out + self.bias.broadcast_to(X_out.shape)
        return X_out
    
class Sequential(Module):
    def __init__(self, *modules):
        super().__init__()
        self.modules = modules

    def forward(self, x: Tensor)->Tensor:
        for module in self.modules:
            x = module(x)
        return x
    
class relu(Module):
    def forward(self, x: Tensor)->Tensor:
        return relu(x)

class Flatten(Module):
    def forward(self, X):
        size = reduce(lambda a, b: a * b, X.shape)
        return X.reshape((X.shape[0], size // X.shape[0]))

dataload

In [None]:
class Dataset():
    def __init__(self, X, y, transforms: Optional[List] = None):
        self.transforms = transforms        
        self.X = X
        self.y = y

    def __getitem__(self, index) -> object:
        imgs = self.X[index]
        labels = self.y[index]
        if len(imgs.shape) > 1:
            imgs = np.vstack([self.apply_transforms(img.reshape(28, 28, 1)) for img in imgs])
        else:
            imgs = self.apply_transforms(imgs.reshape(28, 28, 1))
        return (imgs, labels)

    def __len__(self) -> int:
        return self.X.shape[0]
    
    def apply_transforms(self, x):
        if self.transforms is not None:
            for tform in self.transforms:
                x = tform(x)
        return x

class DataLoader:
    dataset: Dataset
    batch_size: Optional[int]

    def __init__(self, dataset: Dataset, batch_size: Optional[int] = 1, shuffle: bool = False):
        self.dataset = dataset
        self.shuffle = shuffle
        self.batch_size = batch_size
        if not self.shuffle:
            self.ordering = np.array_split(np.arange(len(dataset)), range(batch_size, len(dataset), batch_size))

    def __iter__(self):
        if self.shuffle:
            self.ordering = np.array_split(np.random.permutation(len(self.dataset)), range(self.batch_size, len(self.dataset), self.batch_size))
        self.idx = -1
        return self

    def __next__(self):
        self.idx += 1
        if self.idx >= len(self.ordering):
            raise StopIteration
        batch_indices = self.ordering[self.idx]
        return tuple([Tensor(x, device = self.device) for x in self.dataset[batch_indices]])
'''
def load_svmfile(filename):
    X = []
    Y = []
    with open(filename, 'r') as f:
        filelines = f.readlines()
        for fileline in filelines:
            fileline = fileline.strip().split(' ')
            #print(fileline)
            Y.append(int(fileline[0]))
            tmp = []
            for t in fileline[1:]:
                if len(t)==0:
                    continue
                tmp.append(float(t.split(':')[1]))
            X.append(tmp)
    
    return np.array(X), np.array(Y)

dataset1 = 'svmguide1.bin'
dataset2 = 'svmguide1.t.bin'
print('Start loading dataset {}'.format(dataset1))
X, Y = load_svmfile(dataset1) # train set
X_test, Y_test = load_svmfile('{}'.format(dataset2))
training_data = Dataset(X, Y)
test_data = Dataset(X_test, Y_test)
train_dataloader = DataLoader(training_data, batch_size=64)
test_dataloader = DataLoader(test_data, batch_size=64)'
'''

network

In [None]:
class NeuralNetwork(Module):
    def __init__(self):
        super().__init__()
        self.flatten = Flatten()
        self.linear_relu_stack = Sequential(
            Linear(28*28, 512),
            ReLU(),
            Linear(512, 512),
            ReLU(),
            Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork()

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # 将model设为训练模式，对batch normalization和dropout很重要
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # 计算模型输出和损失
        pred = model(X)
        loss = loss_fn(pred, y)

        # 反向传播
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test_loop(dataloader, model, loss_fn):
    # 将model设为测试模式，对batch normalization和dropout很重要
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    for X, y in dataloader:
        pred = model(X)
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

def CrossEntropyLoss(y, pred):
    c = 0
    for i in range(len(y)):
        a = 0
        for j in range(len(y[i])):
            a += -y[i][j] * math.log(pred[i][j])
        c += a
    return c

learning_rate = 0.001
loss_fn = CrossEntropyLoss
optimizer = SGD(model.parameters(), lr=learning_rate)
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
print("Done!")