# 自定义量化张量

参考：[writing-your-own-quantized-tensor](https://docs.pytorch.org/ao/stable/subclass_basic.html#writing-your-own-quantized-tensor)

`torchao` 中的量化建立在张量子类的基础上。它们是 `torchao` 提供灵活的低精度计算推理和训练支持的主要扩展点，同时可以与重要的 PyTorch 功能（如 `torch.compile`、`autograd` 和分布式原语）兼容。

在本节中，将展示利用张量子类相比模块替换的优势，并通过简单的示例来说明如何使用这种方法表达量化。

## 什么是张量子类？

Tensor 子类就是从 {class}`torch.Tensor` 继承的类。它们允许用户在其模型中现有的算子之间插入自定义的计算逻辑，使得顶级 {mod}`torch` 命名空间中的函数（如 {func}`torch.add`）能够无缝工作。

tensor 子类的方法的明显替代方案是模块替换：例如，将模型中的所有 `nn.Linear` 模块替换为自定义的 `Int8QuantizedLinear` 模块。与这种方法相比，使用 tensor 子类有几个重要的优势：
- **更细粒度的集成点**。模块替换在模块级别截获计算，因此对于依赖于 `torch` 函数或原生模块变体（例如，稍微修改过的 `nn.Linear` 版本）的模型来说不起作用。相比之下，由于 tensor 子类在函数/算子级别截获计算，只要使用相同的函数/算子，就可以对模型进行量化。
- **更好的组合性**。使用模块替换进行多个函数的组合是笨拙的。例如，将两个现有的 `Int8QuantizedLinear` 和 `DistributedLinear` 模块组合起来，用户需要创建另一个线性类来重复这些功能。通过子类化张量可以简单地通过将一个子类包裹在另一个子类中来绕过这个问题。如果外部张量（例如 `DTensor`）知道内部张量是量化过的，那么它可以在使用更少的网络和内存带宽的情况下执行昂贵的 `allgather` 算子，从而提供性能上的好处。
- **重用 PyTorch 组件**。使用张量子类来表达量化是自然的，因为量化张量只是具有不同数据类型的 torch.Tensors。模型结构不会改变（nn.Linears 仍然保持为 nn.Linears），因此后续的优化步骤也可以保持与之前完全相同。

## 使用模块替换进行量化

从简单的例子开始，说明如何使用模块替换实现仅权重的 8 位对称量化。所有代码都可以在该[示例脚本](https://github.com/pytorch/ao/tree/main/tutorials/examples/quantized_module_swap.py)中找到。将使用以下函数将 32 位浮点张量量化为 8 位整数张量：

In [1]:
from typing import Tuple
import torch

def int8_symmetric_quantize(
    fp32_tensor: torch.Tensor,
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Symmetrically quantize the torch.float32 tensor into torch.int8.
    Return a 2-tuple of (quantized value, scale).

    input: dimensions=[M, N], dtype=torch.float32
    output: dimensions=[M, N], dtype=torch.int8
    scale: dimensions=[M, 1], dtype=torch.float32
    """
    quant_min = -128
    quant_max = 127
    min_val = torch.amin(fp32_tensor, dim=[1], keepdim=False)
    max_val = torch.amax(fp32_tensor, dim=[1], keepdim=False)
    min_val_neg = torch.min(min_val, torch.zeros_like(min_val))
    max_val_pos = torch.max(max_val, torch.zeros_like(max_val))
    max_val_pos = torch.max(-min_val_neg, max_val_pos)
    scale = max_val_pos / (float(quant_max - quant_min) / 2)
    scale = scale.view(fp32_tensor.shape[0], -1)
    out = torch.round(fp32_tensor * (1.0 / scale))
    out = torch.clamp(out, quant_min, quant_max).to(torch.int8)
    return out, scale

接下来，将创建新的 `QuantizedLinear` 模块，该模块会调用这个函数以动态量化权重：

In [2]:
class QuantizedLinear(torch.nn.Linear):
    """
    Linear module that performs dynamic and symmetric weight-only
    int8 quantization.
    """
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        w_int8, scale = int8_symmetric_quantize(self.weight)
        return torch.matmul(x, w_int8.t().to(x.dtype)) * scale.t()

    @classmethod
    def from_float(cls, mod: torch.nn.Linear):
        new_linear = cls(mod.in_features, mod.out_features, mod.bias)
        new_linear.weight = mod.weight
        return new_linear

那么，唯一需要做的的就是将模型中的 `nn.Linear` 模块替换为 `QuantizedLinear`。用这个玩具模型来进行演示：

In [3]:
import copy

class ToyModel(torch.nn.Module):
    def __init__(self, m: int, n: int, k: int):
        super().__init__()
        self.linear1 = torch.nn.Linear(m, n, bias=False)
        self.linear2 = torch.nn.Linear(n, k, bias=False)

    def forward(self, x):
        x = self.linear1(x)
        x = self.linear2(x)
        return x

float_model = ToyModel(64, 128, 32).cuda()
quantized_model = copy.deepcopy(float_model)

# Swap torch.nn.Linear with QuantizedLinear
for name, child in quantized_model.named_children():
    if type(child) == torch.nn.Linear:
        new_linear = QuantizedLinear.from_float(child)
        setattr(quantized_model, name, new_linear)

验证模型现在使用了 `QuantizedLinear` 模块。现在，模型可以使用了！

In [5]:
print(float_model)

ToyModel(
  (linear1): Linear(in_features=64, out_features=128, bias=False)
  (linear2): Linear(in_features=128, out_features=32, bias=False)
)


In [6]:
print(quantized_model)

ToyModel(
  (linear1): QuantizedLinear(in_features=64, out_features=128, bias=False)
  (linear2): QuantizedLinear(in_features=128, out_features=32, bias=False)
)


这种简单方法的重要缺点是灵活性不足。目前这种方法只适用于原生的 PyTorch 模块，但如果模型中有稍微修改过的线性模块，例如支持分布式训练的模块，这种方法就不适用了。此外，如果模型直接调用线性函数版本（{func}`torch.nn.functional.linear`），这种方法也无法工作。

此外，假设希望将该特征与分布进行组合，而分布也是通过模块替换实现的。除了创建同时包含这两种功能的新模块外，没有干净的方式来实现这一点。这些限制可以通过使用张量子类来解决，张量子类是一种更优雅的方式来在模型中插入自定义计算，例如量化。

## 基于 `__torch_dispatch__` 的张量子类实现量化

接下来，将使用基于 `__torch_dispatch__` 的张量子类重新实现上述量化技术。

Tensor 子类（通常利用 `__torch_dispatch__`）是 PyTorch 中非常强大且灵活的扩展点。作为扩展点，它们主要有两个目的：
- 张量子类允许你重写几乎所有 PyTorch API 的实现，并且广泛用于实现其他 PyTorch 功能
- 张量子类允许你将额外的元数据与你的张量数据结合。一些示例
    - `[分布式]` 张量在各个节点间如何划分的元数据（[`DTensor`](https://github.com/pytorch/pytorch/blob/main/torch/distributed/tensor/_api.py#L217)，[文档](https://pytorch.org/docs/stable/distributed.tensor.html#pytorch-dtensor-distributed-tensor)）
    - `[量化]` scale/zero_point 元数据（[`AffineQuantizedTensor`](https://github.com/pytorch/ao/blob/main/torchao/dtypes/affine_quantized_tensor.py#L46)）
    - `[raggedness]` 元数据（[`NestedTensor`](https://github.com/pytorch/pytorch/blob/main/torch/nested/_internal/nested_tensor.py#L53)，[文档](https://pytorch.org/tutorials/prototype/nestedtensor.html#getting-started-with-nested-tensors)）

In [7]:
from torchao.dtypes import affine_quantized_tensor

先不讨论这些，开始定义用于对称量化的基本张量子类：

In [15]:
from typing import Any
class Int8SymmetricTensor(torch.Tensor):
    """
    Our subclass represents a tensor that has been quantized to int8
    It will hold two inner tensors:
      int_data: int8[M, N]
      scale: fp32[M, 1]
    """

    @staticmethod
    @torch._dynamo.disable
    def __new__(cls, int_data: torch.Tensor, scale: torch.Tensor):
        return torch.Tensor._make_wrapper_subclass(
            cls,
            int_data.shape,
            strides=int_data.stride(),
            storage_offset=int_data.storage_offset(),
            dtype=scale.dtype,
            device=int_data.device,
        )

    @torch._dynamo.disable
    def __init__(self, int_data: torch.Tensor, scale: torch.Tensor):
        # inner data expected to be quantized already
        assert int_data.dtype is torch.int8
        # we could do more work to support ndim > 2!
        assert int_data.ndim == 2
        assert scale.ndim == 2
        self.int_data = int_data
        self.scale = scale

    def __tensor_flatten__(self) -> tuple[list[str], Any]:
        """
        Returns a tuple of:
          names of all inner tensor attributes (two in our case)
          any other additional, non-tensor metadata.

        Needed for PT2 support.
        """
        return ["int_data", "scale"], None

    @classmethod
    def __tensor_unflatten__(cls, tensor_data_dict, extra_metadata, outer_size=None, outer_stride=None):
        """
         __tensor_unflatten__ should effectively undo __tensor_flatten__.

        inputs:
          a dict mapping names of inner tensor attributes back to the tensors
          the constant metadata from __tensor_flatten__
        output:
          a new instance of your subclass

        Needed for PT2 support.
        """
        assert extra_metadata is None
        int_data = tensor_data_dict["int_data"]
        scale = tensor_data_dict["scale"]
        return Int8SymmetricTensor(int_data, scale)

    def __repr__(self):
        return f'Int8SymmetricTensor(int_data={repr(self.int_data)}, scale={repr(self.scale)})'

    @staticmethod
    def from_float(float_tensor):
        """
        Actually performs the symmetric quantization.
        In our simple inference example we will quantize weights "ahead-of-time",
        although later in a training example we can quantize/dequantize
        during model execution, inside of our __torch_dispatch__

        input:
          float32 torch.Tensor
        output:
          Int8SymmetricTensor
        """
        int8_tensor, scale = int8_symmetric_quantize(float_tensor)
        return Int8SymmetricTensor(int8_tensor, scale)

    @classmethod
    def __torch_dispatch__(cls, func, types, args, kwargs):
        """
        Called for each ATen operator that our subclass is passed as an input to.
        We need to define our own implementation for every operator here.
        """
        if kwargs is None:
            kwargs = {}
        if func not in op_implementations_dict:
            raise AssertionError(f'Int8SymmetricTensor does not yet support op: {str(func)}')
        return op_implementations_dict[func](func, *args, **kwargs)


# Convenience function for registering our own implementation
# to every ATen operator in PyTorch
op_implementations_dict = {}
def register_op(ops: list[torch._ops.OpOverload]):
    def impl_decorator(op_impl):
        global op_implementations_dict
        for op in ops:
            op_implementations_dict[op] = op_impl
        return op_impl

    return impl_decorator

在上述代码中，做了几件事情：

- 定义了基本的“包装”张量子类——它本质上是容器对象，包含一些内部数据（特别是，两个对应于 int8 数据和缩放的张量）
- 定义了`__torch_dispatch__`实现，它将在模型对任何子类输入调用任何 ATen 算子时被调用
- （为 PT2 支持）定义了`__tensor_flatten__`/`__tensor_unflatten__`方法。这是我们子类能够与 {func}`torch.compile` 协同工作的几个要求中最大的一个（稍后会有更多说明）。它实际上告诉 {func}`torch.compile` 如何将子类“去糖化”为其内部组件。
- （为 PT2 支持）为构造方法（`__new__`和`__init__`）添加了 {func}`torch._dynamo.disable` 装饰器（稍后会有更多说明）。

## 应该实现哪些算子？

PyTorch 的算子相当庞大。与其试图让新张量子类实现 $100\%$ 的覆盖，不如只专注于上面玩具模型所需的算子。

模型中调用哪些算子呢？这样就能知道首先需要实现什么。最直接的方法是反复运行模型，查看哪些算子在子类中出错。更优雅的方法是记录模型在执行过程中遇到的所有算子。这可以通过另一个 `LoggingTensor` 子类来实现，就像这个例子一样。

实现以下必要的算子：

In [16]:
from torch.utils._python_dispatch import return_and_correct_aliasing

@register_op([torch.ops.aten.mm.default])
def int8_mm(func, x, weight):
    assert isinstance(weight, Int8SymmetricTensor), "Int8SymmetricTensor: matmul currently only supports the weight in low precision, not the input!"
    return torch.mm(x, weight.int_data.to(x.dtype)) * weight.scale

@register_op([
    torch.ops.aten.detach.default,
    torch.ops.aten.t.default,
])
def int8_view_ops(func, *args, **kwargs):
    assert isinstance(args[0], Int8SymmetricTensor)
    out_data = func(args[0].int_data, *args[1:], **kwargs)
    out_scale = func(args[0].scale, *args[1:], **kwargs)
    out = Int8SymmetricTensor(out_data, out_scale)
    return return_and_correct_aliasing(func, args, kwargs, out)

你会很快注意到：模型本身只包含几个线性层，但看到一些如 `aten.t` 和 `aten.mm` 的算子会调用子类。背景如下：

- 有一些存在于 C++ 中的算子分解，它们运行在张量子类“之上”。linear 就是其中一个这样的算子（分解就在[这里](https://github.com/pytorch/pytorch/blob/main/aten/src/ATen/native/LinearAlgebra.cpp#L2006)）。
- 分解在某种程度上是有益的，因为它们可以缩小子类作者需要实现的 API 大小。但如果您宁愿重写“高级”算子而不是其分解中的底层算子，那么分解可能会带来痛苦。
- 如果你希望在更高层次上覆盖某些算子（如 Linear），可以使用`__torch_function__`（[示例](https://github.com/pytorch/pytorch/blob/main/torch/nested/_internal/nested_tensor.py#L336)）。值得注意的是，如果你需要自动求导支持，那么在`__torch_function__`层进行的任何覆盖都需要以可微的方式编写，而你在`__torch_dispatch__`中进行的任何覆盖将自动可微。

在实现中有一些值得注意的细节：

- 你会注意到不再需要在 `mm` 实现内部转置权重/scales。那是因为在到达 `aten.mm` 算子之前，转置“已经完成”了。
- `aten.mm` 实现不会返回张量子类输出。从这个意义上说，量化子类的“传播”在 matmuls 结束时结束。这对应于权重是低精度的，但需要在高精度下执行 matmuls 本身。通常，子类作者可以自由选择他们的子类对哪些算子进行或不对进行传播。如果你希望你的模型中的每个函数都被量化（包括所有逐点和归约算子），你可以编写你的子类实现来量化每个操算子的输出，并始终返回子类。
- 能够重用相同的实现来处理 4 种视图算子。通常，许多算子可能使用相当通用的实现：解包任何子类输入，在内部张量上运行底层算子，并将输出重新包装到子类中。
    - 然而，你能否始终重用某个实现，则取决于你试图做什么。例如，通过调用内部数据和内部缩放张量的相同 `transpose(dim0, dim1)` 方法，在子类中实现了 `transpose(dim0, dim1)`。如果缩放张量和数据张量的维度不同，这种方法将无法工作，因此在这种情况下，转置需要自定义实现。

## 比较输出

好了，现在用两种量化版本运行模型，并确认它们给出相同的输出！

In [17]:
float_model = ToyModel(64, 128, 32).cuda()
quantized_model_module_swap = copy.deepcopy(float_model)
quantized_model_subclass = copy.deepcopy(float_model)

# Swap torch.nn.Linear with QuantizedLinear
for name, child in quantized_model_module_swap.named_children():
    if type(child) == torch.nn.Linear:
        new_linear = QuantizedLinear.from_float(child)
        setattr(quantized_model_module_swap, name, new_linear)

# Swap torch.nn.Linear weights with Int8SymmetricTensor subclasses
for name, child in quantized_model_subclass.named_children():
    if type(child) == torch.nn.Linear:
        subclass_param = Int8SymmetricTensor.from_float(child.weight)
        child.weight = torch.nn.Parameter(subclass_param, requires_grad=True)

with torch.no_grad():
    x = torch.randn(64, 64, 64, device='cuda')
    out_module_swap = quantized_model_module_swap(x)
    out = quantized_model_subclass(x)
    print(torch.allclose(out, out_module_swap))  # prints True

    # We can also use torch.compile to fuse some of our quantized logic
    out_compiled = torch.compile(quantized_model_subclass)(x)
    print(torch.allclose(out, out_compiled))  # prints True

True




True


在本教程中，展示了如何构建简单的量化张量子类。这是本系列两个教程中的第一部分。[下一篇](https://docs.pytorch.org/ao/stable/subclass_advanced.html)文章将讨论如何为您的张量子类添加更多高级功能，例如使其可训练、与 DTensors 组合以及添加张量并行支持。有关 torchao 中 `AffineQuantizedTensor` 如何使用张量子类构建的更详细[示例](https://github.com/pytorch/ao/blob/main/tutorials/developer_api_guide/my_dtype_tensor_subclass.py)。