# 卷积层

In [1]:
from abc import abstractmethod, ABC
import numpy as np

np.random.seed(99)

## Foundation

### Tensor

In [2]:
class Tensor:

    def __init__(self, data):
        self.data = np.array(data)
        self.grad = 0
        self.gradient_fn = lambda: None
        self.parents = set()

    def backward(self):
        if self.gradient_fn:
            self.gradient_fn()

        for p in self.parents:
            p.backward()

    def shape(self):
        return self.data.shape

    def size(self):
        return np.prod(self.data.shape[1:])

    def __str__(self):
        return str(self.data)

### Base Dataset

In [3]:
class Dataset(ABC):

    def __init__(self, batch_size=1):
        self.batch_size = batch_size
        self.load()
        self.train()

    @abstractmethod
    def load(self):
        pass

    def train(self):
        self.features = self.train_features
        self.labels = self.train_labels

    def eval(self):
        self.features = self.test_features
        self.labels = self.test_labels

    def shape(self):
        return Tensor(self.features).size(), Tensor(self.labels).size()

    def items(self):
        return Tensor(self.features), Tensor(self.labels)

    def __len__(self):
        return len(self.features) // self.batch_size

    def __getitem__(self, index):
        start = index * self.batch_size
        end = start + self.batch_size

        feature = Tensor(self.features[start: end])
        label = Tensor(self.labels[start: end])
        return feature, label

    @abstractmethod
    def estimate(self, predictions):
        pass

### Base Layer

In [4]:
class Layer(ABC):

    def __init__(self):
        self.training = True

    def __call__(self, x: Tensor):
        return self.forward(x)

    def train(self):
        self.training = True

    def eval(self):
        self.training = False

    @abstractmethod
    def forward(self, x: Tensor):
        pass

    def parameters(self):
        return []

    def __str__(self):
        return ''

### Base Loss Function

In [5]:
class Loss(ABC):

    def __call__(self, p: Tensor, y: Tensor):
        return self.loss(p, y)

    @abstractmethod
    def loss(self, p: Tensor, y: Tensor):
        pass

### Base Optimizer

In [6]:
class Optimizer(ABC):

    def __init__(self, parameters, lr):
        self.parameters = parameters
        self.lr = lr

    def clear(self):
        for p in self.parameters:
            p.grad = 0

    @abstractmethod
    def step(self):
        pass

### Base Model

In [7]:
class Model(ABC):

    def __init__(self, layer, loss, optimizer):
        self.layer = layer
        self.loss = loss
        self.optimizer = optimizer

    @abstractmethod
    def train(self, dataset, epochs):
        pass

    @abstractmethod
    def test(self, dataset):
        pass

## Data

### CNN Dataset

In [8]:
class CNNDataset(Dataset):

    def __init__(self, filename, batch_size=1):
        self.filename = filename
        super().__init__(batch_size)

    def load(self):
        with (np.load(self.filename, allow_pickle=True) as f):
            self.train_features, self.train_labels = self.normalize(f['x_train'], f['y_train'])
            self.test_features, self.test_labels = self.normalize(f['x_test'], f['y_test'])

    @staticmethod
    def normalize(x, y):
        inputs = np.expand_dims(x / 255, axis=1)
        targets = np.zeros((len(y), 10))
        targets[range(len(y)), y] = 1
        return inputs, targets

    def estimate(self, predictions):
        count = (predictions.data.argmax(axis=1) == self.labels.argmax(axis=1)).sum()
        total = len(self.labels)
        return count / total

## Model

### Linear Layer

In [9]:
class Linear(Layer):

    def __init__(self, in_size, out_size):
        super().__init__()
        self.weight = Tensor(np.random.rand(out_size, in_size) / in_size)
        self.bias = Tensor(np.zeros(out_size))

    def forward(self, x: Tensor):
        p = Tensor(x.data @ self.weight.data.T + self.bias.data)

        def gradient_fn():
            self.weight.grad += p.grad.T @ x.data / len(x.data)
            self.bias.grad += np.sum(p.grad, axis=0) / len(x.data)
            x.grad += p.grad @ self.weight.data

        p.gradient_fn = gradient_fn
        p.parents = {self.weight, self.bias, x}
        return p

    def parameters(self):
        return [self.weight, self.bias]

    def __str__(self):
        return f'weight: {self.weight}\nbias: {self.bias}'

### Sequential Layer

In [10]:
class Sequential(Layer):

    def __init__(self, layers):
        super().__init__()
        self.layers = layers

    def train(self):
        for l in self.layers:
            l.train()

    def eval(self):
        for l in self.layers:
            l.eval()

    def forward(self, x: Tensor):
        for l in self.layers:
            x = l(x)
        return x

    def parameters(self):
        return [p for l in self.layers for p in l.parameters()]

    def __str__(self):
        return '\n'.join(str(l) for l in self.layers if str(l))

### Convolutional Layer

In [11]:
class Convolution2D(Layer):

    def __init__(self, channel_size, kernel_size, out_size):
        super().__init__()
        self.channel_size = channel_size
        self.kernel_size = kernel_size
        self.out_size = out_size
        in_size = kernel_size ** 2 * channel_size
        self.weight = Tensor(np.random.rand(out_size, in_size) / in_size)
        self.bias = Tensor(np.zeros(out_size))

    def forward(self, x: Tensor):
        batches, channels, rows, columns = x.data.shape
        rows = rows - self.kernel_size + 1
        columns = columns - self.kernel_size + 1

        patches = []
        for b in range(batches):
            for c in range(channels):
                for r in range(rows):
                    for l in range(columns):
                        patch = x.data[b,
                                       c:c + self.channel_size,
                                       r:r + self.kernel_size,
                                       l:l + self.kernel_size]
                        patches.append(patch)
        patches = np.array(patches).reshape(batches, channels, rows, columns, -1)

        p = Tensor(patches @ self.weight.data.T + self.bias.data)

        def gradient_fn():
            self.weight.grad += p.grad.reshape(-1, self.out_size).T @ (patches.reshape(-1, self.kernel_size ** 2))
            self.bias.grad += np.sum(p.grad.reshape(-1, self.out_size), axis=0)

        p.gradient_fn = gradient_fn
        p.parents = {self.weight, self.bias}
        return p

    def parameters(self):
        return [self.weight, self.bias]

    def __str__(self):
        return f'weight: {self.weight}\nbias: {self.bias}'

### Flatten Layer

In [12]:
class Flatten(Layer):

    def forward(self, x: Tensor):
        p = Tensor(np.array(x.data.reshape(x.data.shape[0], -1)))

        def gradient_fn():
            x.grad += p.grad.reshape(x.data.shape)

        p.gradient_fn = gradient_fn
        p.parents = {x}
        return p

### Dropout Layer

In [13]:
class Dropout(Layer):

    def __init__(self, dropout_rate=0.3):
        super().__init__()
        self.dropout_rate = dropout_rate

    def forward(self, x: Tensor):
        if not self.training:
            return x

        mask = np.random.random(x.data.shape) > self.dropout_rate
        p = Tensor(x.data * mask)

        def gradient_fn():
            x.grad += p.grad * mask

        p.gradient_fn = gradient_fn
        p.parents = {x}
        return p

### ReLU Activation Function

In [14]:
class ReLU(Layer):

    def forward(self, x: Tensor):
        p = Tensor(np.maximum(0, x.data))

        def gradient_fn():
            x.grad += (p.data > 0) * p.grad

        p.gradient_fn = gradient_fn
        p.parents = {x}
        return p

### Tanh Activation Function

In [15]:
class Tanh(Layer):

    def forward(self, x: Tensor):
        p = Tensor(np.tanh(x.data))

        def gradient_fn():
            x.grad += p.grad * (1 - p.data ** 2)

        p.gradient_fn = gradient_fn
        p.parents = {x}
        return p

### Sigmoid Activation Function

In [16]:
class Sigmoid(Layer):

    def __init__(self, clip_range=(-100, 100)):
        super().__init__()
        self.clip_range = clip_range

    def forward(self, x: Tensor):
        z = np.clip(x.data, self.clip_range[0], self.clip_range[1])
        p = Tensor(1 / (1 + np.exp(-z)))

        def gradient_fn():
            x.grad += p.grad * p.data * (1 - p.data)

        p.gradient_fn = gradient_fn
        p.parents = {x}
        return p

### Softmax Activation Function

In [17]:
class Softmax(Layer):

    def __init__(self, axis=-1):
        super().__init__()
        self.axis = axis

    def forward(self, x: Tensor):
        exp = np.exp(x.data - np.max(x.data, axis=self.axis, keepdims=True))
        p = Tensor(exp / np.sum(exp, axis=self.axis, keepdims=True))

        def gradient_fn():
            grad = np.sum(p.data * p.grad, axis=self.axis, keepdims=True)
            x.grad += p.data * (p.grad - grad)

        p.gradient_fn = gradient_fn
        p.parents = {x}
        return p

### MSE Loss Function

In [18]:
class MSELoss(Loss):

    def loss(self, p: Tensor, y: Tensor):
        mse = Tensor(np.mean(np.square(y.data - p.data)))

        def gradient_fn():
            p.grad += -2 * (y.data - p.data)

        mse.gradient_fn = gradient_fn
        mse.parents = {p}
        return mse

### SGD Optimizer

In [19]:
class SGDOptimizer(Optimizer):

    def step(self):
        for p in self.parameters:
            p.data -= p.grad * self.lr

### Neural Network Model

In [20]:
class NNModel(Model):

    def train(self, dataset, epochs):
        self.layer.train()
        dataset.train()

        for epoch in range(epochs):
            for i in range(len(dataset)):
                features, labels = dataset[i]

                predictions = self.layer(features)
                error = self.loss(predictions, labels)

                self.optimizer.clear()
                error.backward()
                self.optimizer.step()

    def test(self, dataset):
        self.layer.eval()
        dataset.eval()

        features, labels = dataset.items()
        return self.layer(features)

## Configuration

### Learning Rate

In [21]:
LEARNING_RATE = 0.01

### Batch

In [22]:
BATCH_SIZE = 2

### Kernel

In [23]:
KERNEL_SIZE = 3

### Epoch

In [24]:
EPOCHS = 10

## Traning

### Iteration Training

In [25]:
dataset = CNNDataset('mini-mnist.npz', BATCH_SIZE)
feature, label = dataset[0]
_, channels, rows, columns = feature.shape()
conv_rows = rows - KERNEL_SIZE + 1
conv_columns = columns - KERNEL_SIZE + 1
layer = Sequential([Convolution2D(channels, KERNEL_SIZE, 16),
                    Flatten(),
                    Dropout(),
                    Linear(conv_rows * conv_columns * 16, 64),
                    ReLU(),
                    Linear(64, dataset.shape()[1]),
                    Softmax()])
loss = MSELoss()
optimizer = SGDOptimizer(layer.parameters(), lr=LEARNING_RATE)

model = NNModel(layer, loss, optimizer)
model.train(dataset, EPOCHS)

## Testing

### Estimating

In [26]:
predictions = model.test(dataset)

print(f'Accuracy: {dataset.estimate(predictions)}')

Accuracy: 0.928
