# 手撕深度学习代码系列

## 注意力机制系列

> 参考：https://zhuanlan.zhihu.com/p/366592542

**1. Attention**

输入是`query`，`key`和`value`，注意力机制首先计算`query`与每个`key`的关联性（compatibility），每个关联性作为每个`value`的权重（weight），各个权重与`value`的乘积相加得到输出。

> 例如，厨房里有苹果、青菜、西红柿、玛瑙筷子、朱砂碗，每个物品都有一个key（$d_k$维向量）和value（ $d_v$维向量）。现在有一个“红色”的query（$d_q$维向量），注意力机制首先计算“红色”的query与苹果的key、青菜的key、西红柿的key、玛瑙筷子的key、朱砂碗的key的关联性，再计算得到每个物品对应的权重，最终输出 =（苹果的权重x苹果的value + 青菜的权重x青菜的value + 西红柿的权重x西红柿的value + 玛瑙筷子的权重x玛瑙筷子的value + 朱砂碗的权重x朱砂碗的value）。最终输出包含了每个物品的信息，由于苹果、西红柿的权重较大（因为与“红色”关联性更大），因此最终输出受到苹果、西红柿的value的影响更大。

![Scaled Dot-Product Attention计算过程](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/11111.jpg)

注意：
+ 注意力掩码的作用是允许我们发送不同长度的批次数据一次性的发送到transformer中。在代码中是通过将所有序列填充到相同的长度。注意力掩码中，我们的输入是0和1，但是在最终的计算时，会将在将无效位置的注意力权重设置为一个很小的值，通常为负无穷（-inf），以便在计算注意力分数时将其抑制为接近零的概率。

![y9pRHt](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/y9pRHt.png)

In [1]:
# 1.Scaled Dot-Product Attention from "Attention is all you need"
import torch
import torch.nn as nn
import numpy as np
class ScaledDotProductAttention(nn.Module):
    def __init__(self, scale):
        super().__init__()
        self.scale = scale
        self.softmax = nn.Softmax(dim=2)
        
    def forward(self, q, k, v, mask=None):
        # q: [b, c_q, d_q], k: [b, c_q, d_k], v: [b, c_q, d_v]
        u = torch.bmm(q, k.transpose(1, 2)) / self.scale  # 1&2. Matmul and scale -> [b, c_q, c_k]
        
        if mask is not None:
            u = u.masked_fill(mask, -np.inf) # 3.mask
        attn = self.softmax(u) # 4
        output = torch.bmm(attn, v) # 5.output [b, c_q, c_k]*[b, c_v, d_v] -> [b, c_q, d_v]
        
        return attn, output

if __name__ == "__main__":
    n_q, n_k, n_v = 2, 4, 4
    d_q, d_k, d_v = 128, 128, 64
    batch = 2

    q = torch.randn(batch, n_q, d_q)
    k = torch.randn(batch, n_k, d_k)
    v = torch.randn(batch, n_v, d_v)
    mask = torch.zeros(batch, n_q, n_k).bool()

    attention = ScaledDotProductAttention(scale=np.power(d_k, 0.5))
    attn, output = attention(q, k, v, mask=mask)

    print(attn.shape)
    print(output.shape)

torch.Size([2, 2, 4])
torch.Size([2, 2, 64])


知识点：
+ `torch.bmm`: batch matrix multiplication, `[b, n ,d]`和`[b, d, p]`两个矩阵相乘
+ `u.masked_fill(mask, value)`: 对tensor将mask中为True的元素对应的基础Tensor的元素设置为值value。


**2. multi-head attention**

只求一次注意力的过程可以叫做单头注意力。多头注意力就是对同样的Q, K, V求多次注意力，得到多个不同的output，再把这些不同的output连接起来得到最终的output。
**多头注意允许模型在不同位置共同注意来自不同表示子空间的信息**。不同头部的output就是从不同层面（representation subspace）考虑关联性而得到的输出。

> 例如，以“红色”为query，第一个头部（从食物层面考虑）得到的output受到苹果、西红柿的value的影响更大；第二个头部（从餐具层面考虑）得到的output受到玛瑙筷子、朱砂碗的value的影响更大。相比单头注意力，多头注意力可以考虑到更多层面的信息。

![mnI5Bk](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/mnI5Bk.jpg)

In [2]:
class MultiHeadAttention(nn.Module):
    """ Multi-Head Attention """

    def __init__(self, n_head, d_k_, d_v_, d_k, d_v, d_o):
        super().__init__()

        self.n_head = n_head
        self.d_k = d_k
        self.d_v = d_v

        self.fc_q = nn.Linear(d_k_, n_head * d_k)
        self.fc_k = nn.Linear(d_k_, n_head * d_k)
        self.fc_v = nn.Linear(d_v_, n_head * d_v)

        self.attention = ScaledDotProductAttention(scale=np.power(d_k, 0.5))

        self.fc_o = nn.Linear(n_head * d_v, d_o)

    def forward(self, q, k, v, mask=None):

        n_head, d_q, d_k, d_v = self.n_head, self.d_k, self.d_k, self.d_v

        batch, n_q, d_q_ = q.size()
        batch, n_k, d_k_ = k.size()
        batch, n_v, d_v_ = v.size()

        q = self.fc_q(q) # 1.单头变多头
        k = self.fc_k(k)
        v = self.fc_v(v)
        q = q.view(batch, n_q, n_head, d_q).permute(2, 0, 1, 3).contiguous().view(-1, n_q, d_q)
        k = k.view(batch, n_k, n_head, d_k).permute(2, 0, 1, 3).contiguous().view(-1, n_k, d_k)
        v = v.view(batch, n_v, n_head, d_v).permute(2, 0, 1, 3).contiguous().view(-1, n_v, d_v)

        if mask is not None:
            mask = mask.repeat(n_head, 1, 1)
        attn, output = self.attention(q, k, v, mask=mask) # 2.当成单头注意力求输出

        output = output.view(n_head, batch, n_q, d_v).permute(1, 2, 0, 3).contiguous().view(batch, n_q, -1) # 3.Concat
        output = self.fc_o(output) # 4.仿射变换得到最终输出

        return attn, output


if __name__ == "__main__":
    n_q, n_k, n_v = 2, 4, 4
    d_q_, d_k_, d_v_ = 128, 128, 64

    q = torch.randn(batch, n_q, d_q_)
    k = torch.randn(batch, n_k, d_k_)
    v = torch.randn(batch, n_v, d_v_)    
    mask = torch.zeros(batch, n_q, n_k).bool()

    mha = MultiHeadAttention(n_head=8, d_k_=128, d_v_=64, d_k=256, d_v=128, d_o=128)
    attn, output = mha(q, k, v, mask=mask)

    print(attn.size())
    print(output.size())

torch.Size([16, 2, 4])
torch.Size([2, 2, 128])


注意：
+ 多头注意力主要是多了linear层，

**3. self-attention**

当注意力的query和key、value全部来自于同一堆东西时，就称为自注意力。如下图所示，query和key、value全都来源于X。

> 举个例子：厨房里有苹果、青菜、西红柿、玛瑙筷子、朱砂碗，每个物品都会计算得到一个query，以及一个key和value。“苹果”的query与苹果、青菜、西红柿、玛瑙筷子、朱砂碗的key和value做注意力，得到最终输出。其他物品的query也如此操作。这样，输入5个物品，有5个query，得到5个输出，相当于将这5个物品换了一种表示形式，而这新的表示形式（得到的输出）每个都是是考虑了所有物品的信息的。

自注意力通过X求query和key、value的计算过程如下图所示：

![568hTZ](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/568hTZ.png)

In [3]:
class SelfAttention(nn.Module):
    
    def __init__(self, n_head, d_k, d_v, d_x, d_o):
        super().__init__()
        
        self.wq = nn.Parameter(torch.Tensor(d_x, d_q))
        self.wk = nn.Parameter(torch.Tensor(d_x, d_k))
        self.wv = nn.Parameter(torch.Tensor(d_x, d_v))
        self.init_parameters()
        
        self.mha = MultiHeadAttention(n_head, d_k, d_v, d_k, d_v, d_o)

    def init_parameters(self):
        for param in self.parameters():
            nn.init.xavier_uniform_(param)
            
    def forward(self, x, mask=None):
        q = torch.matmul(x, self.wq)
        k = torch.matmul(x, self.wk)
        v = torch.matmul(x, self.wv)
        
        attn, output = self.mha(q, k, v, mask=mask)
        return attn, output

if __name__ == "__main__":
    n_x = 4
    d_x = 80

    x = torch.randn(batch, n_x, d_x)
    mask = torch.zeros(batch, n_x, n_x).bool()

    selfattn = SelfAttention(n_head=8, d_k=128, d_v=64, d_x=80, d_o=80)
    attn, output = selfattn(x, mask=mask)

    print(attn.size())
    print(output.size())


torch.Size([16, 4, 4])
torch.Size([2, 4, 80])


**4.关于mask的设置**
对于self-attention，其mask如下：

![ALQi0b](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/ALQi0b.png)

![97z24v](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/97z24v.png)



## Conv2D卷积实现

> 参考：https://www.zhihu.com/tardis/bd/art/349683405

In [4]:

def conv2d(x, kernel, bias, stride=None, pad=None):
    n, c, h, w = x.shape
    d, c, k, j = kernel.shape
    
    # padding
    x_pad = torch.zeros(n, c, h+2*padding, w+2*padding)
    if pad is not None and pad > 0:
        x_pad[:, :, pad:-pad, pad:-pad] = x
    else:
        x_pad = x
        
    # conv
    x_pad = x_pad.unfold(2, k, stride)  # [n, c, h, w] -> [n, c, (h+2*padding-k*2//2)//stride, h, j]
    x_pad = x_pad.unfold(3, j, stride)  # [n, c, (h+2*padding-k*2//2)//stride, h, j] -> [n, c, (h+2*padding-k*2//2)//stride, (h+2*padding-k*2//2)//stride, k, j]
    out = torch.einsum(                          # 按照滑动窗相乘，
        'nchwkj,dckj->ndhw',                    # 并将所有输入通道卷积结果累加 [nchwkj,dckj]->ndhw 表示在k,j维度上进行点乘，注意h,w和我们前面定义的h,w不一样
        x_pad, weight)
    out = out + bias.view(1, -1, 1, 1)          # 添加通道维度偏置值
    return out
    
    

说明：
1. `Einsum`：在Einsum中，箭头从左边到右边消失了什么参数，那公式前就加一个带什么参数的求和符。本案例中消失了k，因此我们需要在加上对带k的求和符，转化为数学公式如下：

![kD99VS](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/kD99VS.png)

![Uqx74s](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/Uqx74s.png)

2. `torch.unfold()`将原始张量进行分片，也就是分成每个kernel乘的对应位置的大小的矩阵

## Python实现BN批量归一化

> 参考
> + 代码: https://zhuanlan.zhihu.com/p/100672008
> + 原理分析: https://zhuanlan.zhihu.com/p/609131550, https://www.cnblogs.com/huwj/p/10765114.html

![gK8Q6I](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/gK8Q6I.png)

In [5]:
class MyBN:
    def __init__(self, momentum=0.9, eps, n_features):
        self._momentum = momentum
        self._eps = eps
        self._running_mean = 0
        self._running_var = 1
        self._beta = np.zeros(shape=(n_features,))
        self._gamma = np.ones(shape=(n_features,))
        
   def batch_norm(self, x):
        mu = x.mean(axis=0)
        var = v.var(axis=0)
        self.runing_mean = (1-self._momentum)*mu + self._momentum*self._running_mean
        self.runing_var = (1-self._momentum)*var + self._momentum*self._running_var
        x_hat = (x - mu) / np.sqrt(var + self._eps)
        y = self._gamma * x_hat + self._beta
        
       return y

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 10)

## Dice Loss
> 参考： https://blog.csdn.net/liangjiu2009/article/details/107352164

![s6Ndmh](https://ossjiyaoliu.oss-cn-beijing.aliyuncs.com/uPic/s6Ndmh.png)

In [None]:
def dice_loss(pred, target, smooth = 1.):
    # shape: b, ...
    b = pred.shape[0]
    pred = pred.view(b, -1)
    target = target.view(b, -1)
    
    intersection = (pred * target).sum(1)
    dice = (2 * intersection + smooth) / (pred.sum(1) + target.sum(1) + smooth)
    
    return (1 - dice).mean()
    

## Pytorch 使用torch.utils.data.Dataset类来构建自定义的数据集

In [6]:
import os
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms

class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        初始化数据集。
        :param root_dir: 包含图像文件的根目录。
        :param transform: 应用于图像的可选变换。
        """
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.labels = []

        # 遍历目录，收集图像路径和标签
        for filename in os.listdir(root_dir):
            if filename.endswith('.jpg'):  # 假设图像文件后缀为.jpg
                label = filename.split('_')[0]  # 从文件名中提取标签
                image_path = os.path.join(root_dir, filename)
                self.images.append(image_path)
                self.labels.append(label)

    def __len__(self):
        """
        返回数据集中的图像数量。
        """
        return len(self.images)

    def __getitem__(self, idx):
        """
        根据索引获取一个图像和它的标签。
        """
        image_path = self.images[idx]
        image = Image.open(image_path).convert('RGB')  # 确保图像是RGB格式

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        return image, label

# 创建数据集的变换
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # 调整图像大小
    transforms.ToTensor(),  # 转换为Tensor
])

# 创建数据集实例
dataset = CustomDataset(root_dir='path_to_your_dataset', transform=transform)

# 现在可以使用PyTorch的DataLoader来加载数据集
from torch.utils.data import DataLoader

data_loader = DataLoader(dataset, batch_size=32, shuffle=True)
# 

