# CNN神经网络训练模型

## CNN神经网络类

In [1]:
import numpy as np
class CNN:
    def __init__(self,input_size,conlayer,poolingLayer,fclayer):
        '''
        input_size: 数据大小，一个元组，例如图片（28，28，1）
        conlayer: 卷积层结构，一个列表，包含每个卷积层的参数元组 (num_filters, filter_size, padding, stride)
        poolingLayer: 池化层结构(最大池化层)，一个整数，池化层的池化大小
        fclayer: 全连接层，一个元组，(num_units, num_classes)
        '''
        self.input_size = input_size
        self.conlayer = conlayer
        self.poolingLayer = poolingLayer
        self.fclayer = fclayer
        self.learn_rate = 0.1
        
        self.conv_layers = []
        for i, (num_filters, filter_size, padding, stride) in enumerate(conlayer):
            if i == 0:
                input_channels = input_size[-1]
            else:
                input_channels = conlayer[i - 1][0]  # 上一层的滤波器数量

            # 初始化卷积层的权重
            conv_weights = np.random.randn(num_filters, filter_size, filter_size, input_channels) / (filter_size * filter_size)
            conv_bias = np.zeros(num_filters)
            self.conv_layers.append((conv_weights, conv_bias, padding, stride))

        num_fc_input = (input_size[0] // (2 ** len(conlayer))) * (input_size[1] // (2 ** len(conlayer))) * conlayer[-1][0]
        num_fc_units, num_classes = fclayer

        # 初始化全连接层的权重
        self.fc_weights = np.random.randn(num_fc_input, num_fc_units) / num_fc_input
        self.fc_bias = np.zeros(num_fc_units)
        # 初始化输出层权重
        self.output_weights = np.random.randn(num_fc_units, num_classes) / num_fc_units
        self.output_bias = np.zeros(num_classes)
    def ReLU(x):
        return np.maximum(0, x)
    
    def conv_forward(self, x, conv_weights, conv_bias, padding, stride):
        """
        卷积层的前向传播
        参数：
        - x: 输入数据，例如图像数据
        - conv_weights: 卷积核的权重
        - conv_bias: 卷积核的偏置
        - padding: 填充大小
        - stride: 步长
        
        return：
        - output: 经过卷积操作后的输出
        """
        xsize, h, w, _ = x.shape[1:]
        num_filters, _, _, _ = conv_weights.shape
        conv_output = np.zeros(xsize, ((h - 2*padding) // stride + 1,
                                (w - 2*padding) // stride + 1), num_filters)

        # 填充0
        padded_input = np.pad(x, ((0, 0), (padding, padding), (padding, padding), (0, 0)), mode='constant')

        for k in range(xsize):
            for i in range(0, h - 2*padding, stride):
                for j in range(0, w - 2*padding, stride):
                        im_region = padded_input[k, i:(i + conv_weights.shape[1] + 1), j:(j + conv_weights.shape[1] + 1),:]
                        im_region = [im_region*num_filters]
                        conv_output[k, i, j] = ReLU(np.sum(im_region * conv_weights, axis=(1, 2, 3)) + conv_bias)

        return conv_output
    
    def pool_forward(self, x):
        """
        池化层的前向传播
        参数：
        - x: 输入数据，例如经过卷积层后的输出
        return：
        - output: 经过池化操作后的输出
        """
        xsize, h, w, num_filters = x.shape
        pool_output = np.zeros(xsize, ((h // self.poolingLayer), (w // self.poolingLayer), num_filters))
        
        for k in range(xsize):
            for f in range(num_filters):
                for i in range(0, h, self.poolingLayer):
                    for j in range(0, w, self.poolingLayer):
                        pool_output[k, i // self.poolingLayer, j // self.poolingLayer, f] = np.max(x[i:i+self.poolingLayer, j:j+self.poolingLayer, f])

        return pool_output
    
    def fc_forward(self, x, weights, bias):
        """
        全连接层的前向传播
        参数：
        - x: 输入数据，例如经过池化层后的输出
        - weights: 全连接层的权重
        - bias: 全连接层的偏置
        return：
        - output: 经过全连接层操作后的输出
        """
        self.fc_input = x.reshape((x.shape[0], -1))
        fc_output = np.dot(self.fc_input, weights) + bias
        return fc_output
    
    def softmax(self, x):
        exp_values = np.exp(x - np.max(x, axis=-1, keepdims=True))
        probabilities = exp_values / np.sum(exp_values, axis=-1, keepdims=True)
        return probabilities
    
    def cross_entropy_loss(self, y_pred, y_true):
        """
        交叉熵损失函数
        """
        batch_size = y_pred.shape[0]
        loss = -np.sum(y_true * np.log(y_pred + 1e-9)) / batch_size
        return loss
    
    def cross_entropy_loss_backward(self, y_pred, y_true):
        """
        交叉熵损失函数的反向传播
        return：
        d_L_d_out: 梯度
        """
        batch_size = y_true.shape[0]
        d_L_d_out = -y_true / (y_pred + 1e-9)
        return d_L_d_out / batch_size
    
    def fc_backward(self, d_L_d_out):
        """
        全连接层的反向传播
        """
        d_L_d_fc_weights = np.dot(self.fc_input.T, d_L_d_out)
        d_L_d_fc_bias = np.sum(d_L_d_out, axis=0, keepdims=True)
        d_L_d_fc_input = np.dot(d_L_d_out, self.fc_weights.T)

        self.fc_weights -= self.learn_rate * d_L_d_fc_weights
        self.fc_bias -= self.learn_rate * d_L_d_fc_bias

        return d_L_d_fc_input
    
    def pool_backward(self, d_L_d_pool, y):
        """
        池化层的反向传播
        """
        d_L_d_input = np.zeros_like(y)

        for i in range(d_L_d_pool.shape[1]):
            for j in range(d_L_d_pool.shape[2]):
                for k in range(d_L_d_pool.shape[3]):
                    patch = y[:, i*self.pool_size:i*self.pool_size+self.pool_size, j*self.pool_size:j*self.pool_size+self.pool_size, k]
                    max_val = np.max(patch, axis=(1, 2))
                    d_L_d_input[:, i*self.pool_size:i*self.pool_size+self.pool_size, j*self.pool_size:j*self.pool_size+self.pool_size, k] = (patch == max_val[:, None, None]) * d_L_d_pool[:, i, j, k][:, None, None]

        return d_L_d_input
    
    def conv_backward(self, d_L_d_conv, x):
        """
        卷积层的反向传播

        """
        h, w, _ = x.shape[1:]
        num_filters, _, _, _ = self.conv_layers[0][0].shape
        d_L_d_input = np.zeros_like(x)
        d_L_d_conv_weights = np.zeros_like(self.conv_layers[0][0])
        d_L_d_conv_bias = np.zeros_like(self.conv_layers[0][1])

        padded_input = np.pad(x, ((0, 0), (1, 1), (1, 1), (0, 0)), mode='constant')

        for i in range(0, h, self.conv_layers[0][3]):
            for j in range(0, w, self.conv_layers[0][3]):
                im_region = padded_input[:, i:i+self.conv_layers[0][1], j:j+self.conv_layers[0][1]]
                for f in range(num_filters):
                    d_L_d_conv[:, i, j, f] = np.sum(d_L_d_conv[:, i:i+1, j:j+1, f][:, :, :, None] * self.conv_layers[0][0][None, :, :, :], axis=(1, 2, 3))
                    d_L_d_conv_weights[f, :, :, :] += np.sum(im_region * (d_L_d_conv[:, i, j, f])[:, None, None, None], axis=0)
                    d_L_d_conv_bias[f] += np.sum(d_L_d_conv[:, i, j, f], axis=0)

        self.conv_layers[0][0] -= self.learn_rate * d_L_d_conv_weights
        self.conv_layers[0][1] -= self.learn_rate * d_L_d_conv_bias

        return d_L_d_input

    def train(self, x_train, y_train, epochs, batch_size, learn_rate=0.1):
        """
        参数：
        - x_train: 训练数据
        - y_train: 训练标签
        - epochs: 迭代次数
        - batch_size: 批量大小
        - learn_rate: 学习率
        """
        self.learn_rate = learn_rate
        for epoch in range(epochs):
            for i in range(0, len(x_train), batch_size):
                x_batch = x_train[i:i+batch_size]
                y_batch = y_train[i:i+batch_size]

                conv_output = x_batch
                for conv_weights, conv_bias, padding, stride in self.conv_layers:
                    conv_output = self.conv_forward(conv_output, conv_weights, conv_bias, padding, stride)
                    conv_output = self.pool_forward(conv_output)
                
                # 全连接层输出
                fc_output = self.fc_forward(conv_output, self.fc_weights, self.fc_bias)
                output = self.softmax(fc_output)
                
                loss = self.cross_entropy_loss(output, y_batch)

                d_L_d_out = self.cross_entropy_loss_backward(output, y_batch)
                d_L_d_fc = self.fc_backward(d_L_d_out)
                d_L_d_pool = self.pool_backward(d_L_d_fc, conv_output)
                d_L_d_conv = self.conv_backward(d_L_d_pool, x_batch)

                acc = np.mean(np.argmax(output, axis=-1) == np.argmax(y_batch, axis=-1))
                print(f"Epoch {epoch + 1}, Batch {i // batch_size + 1}, Loss: {loss:.4f}, Accuracy: {acc:.4f}")

                
        

### 获取MINIST数据

In [2]:
import numpy as np
# 获取数据集
def read_labels(filename):
    with open(filename, 'rb') as f:
        f.read(4)
        num_items = int.from_bytes(f.read(4), 'big')
        # 读取标签数据
        labels = [int.from_bytes(f.read(1), 'big') for _ in range(num_items)]
    return labels


def read_images(filename):
    with open(filename, 'rb') as f:
        f.read(4)
        num_images = int.from_bytes(f.read(4), 'big')
        rows = int.from_bytes(f.read(4), 'big')
        cols = int.from_bytes(f.read(4), 'big')
        # 读取图像数据
        images = np.zeros((num_images, rows, cols), dtype=np.uint8)
        for i in range(num_images):
            for row in range(rows):
                for col in range(cols):
                    pixel = int.from_bytes(f.read(1), 'big')
                    images[i, row, col] = pixel
        images = images.reshape(-1,28,28,1)

    return images

test_labels = read_labels('./data/MNIST/test-labels-idx1-ubyte')
train_labels = read_labels("./data/MNIST/train-labels-idx1-ubyte")
test_images = read_images('./data/MNIST/test-images-idx3-ubyte')
train_images = read_images('./data/MNIST/train-images-idx3-ubyte')

# 归一化、one-hot编码
train_images = train_images.astype(np.float32) / 255.0
train_labels = np.eye(10)[train_labels]
test_images = test_images.astype(np.float32) / 255.0
test_labels = np.eye(10)[test_labels]

In [4]:
train_images.shape[1:]

(28, 28, 1)

In [6]:
inputSize = train_images.shape[1:]
conlSize = [(32,3,1,1), (32,3,1,1)]
fcSize = (128, 10)
poolSize = 2
learning_rate = 0.1
cnn_mnist = CNN(inputSize,conlSize,poolSize,fcSize)

In [7]:
cnn_mnist.train(train_images,train_labels, epochs=10, batch_size=64, learn_rate=0.1)

ValueError: operands could not be broadcast together with remapped shapes [original->remapped]: (3,2)  and requested shape (4,2)

In [8]:
import numpy as np  
  
# 假设的 im_region 和 conv_weights 数组  
im_region = np.random.rand(2, 3, 3, 1)  
conv_weights = np.random.rand(2, 3, 3, 1)  
  
# 计算乘积并求和  
result = np.sum(im_region * conv_weights, axis=(1, 2, 3))  
  
# 输出结果数组的形状  
print(result.shape)  # 输出: (2,)

(2,)


In [9]:
result

array([1.44728344, 2.31644442])