# 汇聚层

实现汇聚层的前向传播

In [1]:
import torch
from torch import nn
from d2l import torch as d2l

def pool2d(X, pool_size, mode='max'):
    p_h, p_w = pool_size
    Y = torch.zeros((X.shape[0] - p_h + 1, X.shape[1] - p_w + 1))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            if mode == 'max':
                Y[i, j] = X[i: i + p_h, j: j + p_w].max()
            elif mode == 'avg':
                Y[i, j] = X[i: i + p_h, j: j + p_w].mean()
    return Y

验证二维最大汇聚层的输出

In [2]:
X = torch.tensor([[0.0, 1.0, 2.0], [3.0, 4.0, 5.0], [6.0, 7.0, 8.0]])
pool2d(X, (2, 2))

tensor([[4., 5.],
        [7., 8.]])

验证平均汇聚层

In [3]:
pool2d(X, (2, 2), 'avg')

tensor([[2., 3.],
        [5., 6.]])

填充和步幅

In [4]:
X = torch.arange(16, dtype=torch.float32).reshape((1, 1, 4, 4))
X

tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]]]])

深度学习框架中的步幅与汇聚窗口的大小相同

In [5]:
pool2d = nn.MaxPool2d(3)
pool2d(X)

tensor([[[[10.]]]])

In [7]:
from torch.nn import functional as F

F.max_pool2d(X, 2)

tensor([[[[ 5.,  7.],
          [13., 15.]]]])

In [11]:
Y = torch.arange(64, dtype=torch.float32).reshape(1,1,8,8)
Y

tensor([[[[ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11., 12., 13., 14., 15.],
          [16., 17., 18., 19., 20., 21., 22., 23.],
          [24., 25., 26., 27., 28., 29., 30., 31.],
          [32., 33., 34., 35., 36., 37., 38., 39.],
          [40., 41., 42., 43., 44., 45., 46., 47.],
          [48., 49., 50., 51., 52., 53., 54., 55.],
          [56., 57., 58., 59., 60., 61., 62., 63.]]]])

In [13]:
F.max_pool2d(Y, 2)

tensor([[[[ 9., 11., 13., 15.],
          [25., 27., 29., 31.],
          [41., 43., 45., 47.],
          [57., 59., 61., 63.]]]])

In [14]:
F.max_pool2d(Y, 3)

tensor([[[[18., 21.],
          [42., 45.]]]])

In [15]:
F.max_pool2d(Y, 4)

tensor([[[[27., 31.],
          [59., 63.]]]])

In [17]:
F.max_pool2d(Y, 5)

tensor([[[[36.]]]])

填充和步幅可以手动设定

In [18]:
pool2d = nn.MaxPool2d(3, padding=1, stride=2)
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]]]])

In [20]:
F.max_pool2d(X, 3, padding=1, stride=3)

tensor([[[[ 5.,  7.],
          [13., 15.]]]])

设定一个任意大小的矩形汇聚窗口，并分别设定填充和步幅的高度和宽度

In [8]:
pool2d = nn.MaxPool2d((2, 3), stride=(2, 3), padding=(0, 1))
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]]]])

汇聚层在每个输入通道上单独运算

In [23]:
torch.stack((X, X+1), 0)

tensor([[[[[ 0.,  1.,  2.,  3.],
           [ 4.,  5.,  6.,  7.],
           [ 8.,  9., 10., 11.],
           [12., 13., 14., 15.]]]],



        [[[[ 1.,  2.,  3.,  4.],
           [ 5.,  6.,  7.,  8.],
           [ 9., 10., 11., 12.],
           [13., 14., 15., 16.]]]]])

In [30]:
torch.stack((X, X+1), 0).shape

torch.Size([2, 1, 1, 4, 4])

In [28]:
torch.cat((X, X+1), 0).shape

torch.Size([2, 1, 4, 4])

In [29]:
torch.cat((X, X+1), 1).shape

torch.Size([1, 2, 4, 4])

In [31]:
X = torch.cat((X, X + 1), 1)
X

tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]],

         [[ 1.,  2.,  3.,  4.],
          [ 5.,  6.,  7.,  8.],
          [ 9., 10., 11., 12.],
          [13., 14., 15., 16.]]]])

In [32]:
pool2d = nn.MaxPool2d(3, padding=1, stride=2)
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]],

         [[ 6.,  8.],
          [14., 16.]]]])

## 练习

1. 尝试将平均汇聚层作为卷积层的特殊情况实现。

In [65]:
import numpy as np

class AvgPool(nn.Module):
    def __init__(self, kernel_size, padding=0, stride=None):
        super().__init__()
        if type(kernel_size) == int:
            self.weight = nn.Parameter(torch.ones((1, 1, kernel_size, kernel_size), dtype=torch.float32)  / (kernel_size ** 2), requires_grad=False)
        elif type(kernel_size) == tuple:
            self.weight = nn.Parameter(torch.ones((1, 1) + kernel_size, dtype=torch.float32) / np.prod(kernel_size), requires_grad=False)
        self.padding = padding
        self.stride = kernel_size if stride is None else stride

    def forward(self, x):
        return F.conv2d(x, weight=self.weight, padding=self.padding, stride=self.stride)

In [66]:
X = torch.arange(64, dtype=torch.float32).reshape((1, 1, 8, 8))
F.avg_pool2d(X, 2), F.avg_pool2d(X, 3)

(tensor([[[[ 4.5000,  6.5000,  8.5000, 10.5000],
           [20.5000, 22.5000, 24.5000, 26.5000],
           [36.5000, 38.5000, 40.5000, 42.5000],
           [52.5000, 54.5000, 56.5000, 58.5000]]]]),
 tensor([[[[ 9., 12.],
           [33., 36.]]]]))

In [54]:
nn.AvgPool2d(2)(X), nn.AvgPool2d(2)(X)

(tensor([[[[ 4.5000,  6.5000,  8.5000, 10.5000],
           [20.5000, 22.5000, 24.5000, 26.5000],
           [36.5000, 38.5000, 40.5000, 42.5000],
           [52.5000, 54.5000, 56.5000, 58.5000]]]]),
 tensor([[[[ 4.5000,  6.5000,  8.5000, 10.5000],
           [20.5000, 22.5000, 24.5000, 26.5000],
           [36.5000, 38.5000, 40.5000, 42.5000],
           [52.5000, 54.5000, 56.5000, 58.5000]]]]))

In [67]:
AvgPool(2)(X), AvgPool(3)(X)

(tensor([[[[ 4.5000,  6.5000,  8.5000, 10.5000],
           [20.5000, 22.5000, 24.5000, 26.5000],
           [36.5000, 38.5000, 40.5000, 42.5000],
           [52.5000, 54.5000, 56.5000, 58.5000]]]]),
 tensor([[[[ 9.0000, 12.0000],
           [33.0000, 36.0000]]]]))

In [75]:
def corr2d_stride(X, K):
    h, w = K.shape
    hx, wx = X.shape
    Y = torch.stack(([X[i:i+h, j:j+w].reshape(1,-1) for i in range(0, hx-h+1, h) for j in range(0, wx-w+1, w)]), 0) 
    Z = Y @ K.reshape(-1, 1)
    return Z.reshape(hx // h, wx // w)

class AvgPool2(nn.Module):
    def __init__(self, kernel_size):
        super().__init__()
        if type(kernel_size) == int:
            self.weight = nn.Parameter(torch.ones((kernel_size, kernel_size), dtype=torch.float32)  / (kernel_size ** 2), requires_grad=False)
        elif type(kernel_size) == tuple:
            self.weight = nn.Parameter(torch.ones(kernel_size, dtype=torch.float32) / np.prod(kernel_size), requires_grad=False)

    def forward(self, x):
        return corr2d_stride(x, self.weight)

X = torch.arange(64, dtype=torch.float32).reshape((8, 8))
AvgPool2(2)(X), AvgPool2(3)(X)

(tensor([[ 4.5000,  6.5000,  8.5000, 10.5000],
         [20.5000, 22.5000, 24.5000, 26.5000],
         [36.5000, 38.5000, 40.5000, 42.5000],
         [52.5000, 54.5000, 56.5000, 58.5000]]),
 tensor([[ 9.0000, 12.0000],
         [33.0000, 36.0000]]))

In [78]:
def corr2d_avg(X, K):
    h, w = K.shape
    hx, wx = X.shape
    Y = torch.zeros((hx // h, wx // w))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y[i, j] = (X[(i*h):(i*h+h), (j*w):(j*w+w)] * K).mean()
    return Y

class AvgPool3(nn.Module):
    def __init__(self, kernel_size):
        super().__init__()
        if type(kernel_size) == int:
            self.weight = nn.Parameter(torch.ones((kernel_size, kernel_size), dtype=torch.float32), requires_grad=False)
        elif type(kernel_size) == tuple:
            self.weight = nn.Parameter(torch.ones(kernel_size, dtype=torch.float32), requires_grad=False)

    def forward(self, x):
        return corr2d_avg(x, self.weight)

X = torch.arange(64, dtype=torch.float32).reshape((8, 8))
AvgPool3(2)(X), AvgPool3(3)(X)

(tensor([[ 4.5000,  6.5000,  8.5000, 10.5000],
         [20.5000, 22.5000, 24.5000, 26.5000],
         [36.5000, 38.5000, 40.5000, 42.5000],
         [52.5000, 54.5000, 56.5000, 58.5000]]),
 tensor([[ 9., 12.],
         [33., 36.]]))

2. 尝试将最大汇聚层作为卷积层的特殊情况实现。

In [72]:
def corr2d_max(X, K):  
    h, w = K.shape
    hx, wx = X.shape
    Y = torch.zeros((hx // h, wx // w))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y[i, j] = (X[(i*h):(i*h+h), (j*w):(j*w+w)] * K).max()
    return Y

class MaxPool2(nn.Module):
    def __init__(self, kernel_size):
        super().__init__()
        if type(kernel_size) == int:
            self.weight = nn.Parameter(torch.ones((kernel_size, kernel_size), dtype=torch.float32), requires_grad=False)
        elif type(kernel_size) == tuple:
            self.weight = nn.Parameter(torch.ones(kernel_size, dtype=torch.float32), requires_grad=False)

    def forward(self, x):
        return corr2d_max(x, self.weight)
    
X = torch.arange(64, dtype=torch.float32).reshape((8, 8))
MaxPool2(2)(X), MaxPool2(3)(X)

(tensor([[ 9., 11., 13., 15.],
         [25., 27., 29., 31.],
         [41., 43., 45., 47.],
         [57., 59., 61., 63.]]),
 tensor([[18., 21.],
         [42., 45.]]))

In [73]:
nn.MaxPool2d(2)(X.reshape((1,1,8,8))), nn.MaxPool2d(3)(X.reshape((1,1,8,8)))

(tensor([[[[ 9., 11., 13., 15.],
           [25., 27., 29., 31.],
           [41., 43., 45., 47.],
           [57., 59., 61., 63.]]]]),
 tensor([[[[18., 21.],
           [42., 45.]]]]))

3. 我们是否需要最小汇聚层？可以用已知函数替换它吗？

In [83]:
X = torch.arange(64, dtype=torch.float32).reshape((1, 1, 8, 8))
F.max_pool2d(X * -1, 2) * -1

tensor([[[[ 0.,  2.,  4.,  6.],
          [16., 18., 20., 22.],
          [32., 34., 36., 38.],
          [48., 50., 52., 54.]]]])