# Python数据模型

## 数据模型
Python数据模型对Python的整体框架进行了描述，其规范了Python构建模块的接口，包括且不限于序列、迭代器、函数、类以及上下文管理器

在Python中一些特殊方法会以两个下划线开始并以两个下划线结束，因此这些方法也被称为双下方法(dunder method)，这些特殊方法提供了自定义类与下述的功能交互的“接口”：
* 迭代
* 集合类
* 属性访问
* **运算符重载**
* 函数和方法的调用
* 对象的创建和销毁
* 字符串表示形式和格式化
* 管理上下文(with)

最为简单而常用的用法就是len()函数，对于len()函数，其默认会调用对应的\_\_len\_\_()方法。在自定义的类中则可以重载这一方法。这使得Python自带的数据结构以及自定义的类能够使用相同的函数执行类似的功能(一致性)。即这些特殊方法在保持内置类型的效率以及语言的一致性之间找到了一个平衡点。

显然，这样的处理是有相当意义的。例如对于一些自定义的运算，若想定义自定义类的运算，最为直接的方法就是定义对应的函数，但是Python允许通过对对应的特殊方法进行重载实现运算符的重载。例如numpy中既可以使用dot()进行点积运算(自定义函数)，也可以直接使用*符号(运算符重载，具体实现为重载\_\_mul\_\_方法)。

### 简单示例

In [30]:
# 简单示例
# 自定义类型实现了__len__以及__gititem__的重载
# 这使得类FrenchDeck能够直接使用Python内置len()函数以及[]操作
# 执行类似于Python内置类型"list"能够执行的功能

import collections

Card = collections.namedtuple("Card", ["rank", "suit"])

class FrenchDeck:
    ranks = [str(n) for n in range(2, 11)] + list("JQKA")
    suits = "spades diamonds clubs hearts".split()

    def __init__(self):
        self._cards = [Card(rank, suit) for suit in self.suits 
                                        for rank in self.ranks]

    def __len__(self):
        return len(self._cards)
    
    def __getitem__(self, position):
        return self._cards[position]


# -------  实例  -------
deck = FrenchDeck()

# -------  对len()的重载  -------
print("\n对len()函数的重载:")
print(len(deck))

# -------  对[]操作的重载 -------
# 根据索引取值
print("\n根据索引0取值:")
print(deck[0])
# 切片
print("\n切片:")
print(deck[:3])
# 迭代
print("\n迭代:")
for card in deck:
    print(card)
# 对其他标准库的支持
print("\n随机选择:")
import random
print(random.choice(deck))

# -------  重排序  -------
suit_values = dict(spades=3, hearts=2, diamonds=1, clubs=0)
def spades_high(card):
    rank_value = FrenchDeck.ranks.index(card.rank)
    return rank_value * len(suit_values) + suit_values[card.suit]
# 对sorted函数的支持
print("\n对sorted函数的支持:")
for card in sorted(deck, key=spades_high):
    print(card)


对len()函数的重载:
52

根据索引0取值:
Card(rank='2', suit='spades')

切片:
[Card(rank='2', suit='spades'), Card(rank='3', suit='spades'), Card(rank='4', suit='spades')]

迭代:
Card(rank='2', suit='spades')
Card(rank='3', suit='spades')
Card(rank='4', suit='spades')
Card(rank='5', suit='spades')
Card(rank='6', suit='spades')
Card(rank='7', suit='spades')
Card(rank='8', suit='spades')
Card(rank='9', suit='spades')
Card(rank='10', suit='spades')
Card(rank='J', suit='spades')
Card(rank='Q', suit='spades')
Card(rank='K', suit='spades')
Card(rank='A', suit='spades')
Card(rank='2', suit='diamonds')
Card(rank='3', suit='diamonds')
Card(rank='4', suit='diamonds')
Card(rank='5', suit='diamonds')
Card(rank='6', suit='diamonds')
Card(rank='7', suit='diamonds')
Card(rank='8', suit='diamonds')
Card(rank='9', suit='diamonds')
Card(rank='10', suit='diamonds')
Card(rank='J', suit='diamonds')
Card(rank='Q', suit='diamonds')
Card(rank='K', suit='diamonds')
Card(rank='A', suit='diamonds')
Card(rank='2', suit='clubs')
Ca

上述例子显示，自定义类完全可以有与内置类相似的功能。
值得注意的是双下方法通常不应该被直接调用，例如对于上述例子，通常不应该尝试使用deck.\_\_len\_\_()。通过内置函数使用特殊方法往往是最好的选择。

### 运算符重载

通过重载特殊方法对运算符进行重载是有好处的。其最明显的好处就是能够保证一致性。例如对于实数的二元加法可以使用运算符+。现在想实现向量的加法，一个选择是创建自定义函数，例如定义函数vector_add(x_1, x_2)；另一个选择就是进行运算符重载，通过运算符重载实现使用运算符+对向量进行加法运算的目的。

In [31]:
from math import hypot

class Vector:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2
    
    def __repr__(self):
        return "Vector({}, {})".format(self.x_1, self.x_2)
    
    def __abs__(self):
        return hypot(self.x_1, self.x_2)
    
    def __bool__(self):
        return bool(abs(self))

    def __add__(self, other):
        x_1 = self.x_1 + other.x_1
        x_2 = self.x_2 + other.x_2
        return Vector(x_1, x_2)
    
    def __mul__(self, scalar):
        return Vector(self.x_1 * scalar, self.x_2 * scalar)


# -------  实例  -------
v1 = Vector(2, 4)
v2 = Vector(2, 1)

# -------  加法重载 -------  
print("\n加法:")
print(v1 + v2)

# -------  绝对值重载  -------  
print("\n绝对值:")
print(abs(v1))

# -------  数乘重载 -------
print("\n数乘:")
print(v1 * 3)


加法:
Vector(4, 5)

绝对值:
4.47213595499958

数乘:
Vector(6, 12)


#### \_\_repr\_\_ & \_\_str\_\_
上述定义的Vector除了实现一些运算符的重载外还实现了方法\_\_repr\_\_，这个方法定义了一个对象的字符串形式。当需要打印该对象时，\_\_repr\_\_会被调用。而若没有定义\_\_repr\_\_则会打印该对象所在的内存地址。与\_\_repr\_\_类似的还有方法\_\_str\_\_，不同在于后者在被print输出或者str()函数转换为字符串时调用。当没有实现\_\_str\_\_但使用print输出该实例时，会自动调用\_\_repr\_\_

In [32]:
# vector 1
# 不实现__repr__以及__str__
class ReprVector1:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2

# vector2
# 实现__repr__，但不实现__str__
class ReprVector2:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2
    
    def __repr__(self):
        return "Vector({}, {})".format(self.x_1, self.x_2)

# vector3
# 实现__str__，但不实现__repr__
class ReprVector3:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2
    
    def __str__(self):
        return "Vector({}, {})".format(self.x_1, self.x_2)


v1 = ReprVector1(1, 1)
v2 = ReprVector2(1, 1)
v3 = ReprVector3(1, 1)

# print 输出
print("\nprint函数输出")
print("向量v1:", v1)
print("向量v2:", v2)
print("向量v3:", v3)

# 直接打印实例
print("\n直接打印实例:")
print("向量v3")
v3



print函数输出
向量v1: <__main__.ReprVector1 object at 0x00000211E8736C08>
向量v2: Vector(1, 1)
向量v3: Vector(1, 1)

直接打印实例:
向量v3


<__main__.ReprVector3 at 0x211e8cf07c8>

可以看到，上述的Vector3在控制台直接打印时没有正确输出其对应的"字符串表示形式"，但是在print函数中，Vector3正确输出。

#### \_\_bool\_\_

bool()函数的返回结果同样可以自定义。当使用bool()函数时，会自动调用\_\_bool\_\_()方法，若不存在\_\_bool\_\_()方法则会尝试调用\_\_len\_\_()，若\_\_len\_\_()也没有实现则会默认为真。

In [33]:
class BoolVector1:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2

class BoolVector2:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2

    def __bool__(self):
        return bool(self.x_1 or self.x_2)

class BoolVector3:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2
    
    def __len__(self):
        return bool(self.x_1 or self.x_2)

class BoolVector4:

    def __init__(self, x_1=0, x_2=0):
        self.x_1 = x_1
        self.x_2 = x_2
    
    def __len__(self):
        return bool(2)


v1 = BoolVector1(0, 0)
v2 = BoolVector2(0, 0)
v3 = BoolVector3(0, 0)
v4 = BoolVector4(0, 0)

print(bool(v1))
print(bool(v2))
print(bool(v3))
print(bool(v4))

True
False
False
True


可以看到，

BoolVector1既没有实现\_\_bool\_\_也没有实现\_\_len\_\_，因此输出为True

BoolVector2实现了\_\_bool\_\_，没有实现\_\_len\_\_，因此输出为False

BoolVector3没有实现\_\_bool\_\_，但是实现了\_\_len\_\_，并判断向量是否为0，因此输出为False

BoolVector4没有实现\_\_bool\_\_，但是实现了\_\_len\_\_，并始终返回bool(2)，因此输出为True

#### 一个例子：自动微分

自动微分的实现可以被视为上述运算符重载的一个特殊应用。对于自动微分来说，每一次运算不仅要计算得到运算结果，还需要记录梯度，因此需要对运算符进行重载以实现这一功能需求。

总的来说，对于自动微分需要重载大量运算符，下面就是带有自动微分功能的一个基本实现。这里实现的是自动微分的反向模式(Reverse mode)

In [34]:
import random
import numpy as np


random.seed(1024)
np.random.seed(1024)

def as_tensor(obj):
    if not isinstance(obj, MyTensor):
        obj = MyTensor(obj)
    return obj

def build_binary_ops_result_tensor(tensor1, tensor2, grad_func_tensor1, grad_func_tensor2, values):
    """
    建立二元运算结果的tensor
    """
    requires_grad = tensor1.requires_grad or tensor2.requires_grad
    
    dependency = list()
    if tensor1.requires_grad:
        dependency.append({"tensor": tensor1, "grad_func": grad_func_tensor1})
    if tensor2.requires_grad:
        dependency.append({"tensor": tensor2, "grad_func": grad_func_tensor2})

    return MyTensor(values, requires_grad, dependency)

def build_unary_ops_result_tensor(tensor, grad_func, values):
    """
    建立一元运算结果的tensor
    """

    requires_grad = tensor.requires_grad
    dependency = list()

    if tensor.requires_grad:
        dependency.append({"tensor": tensor, "grad_func": grad_func})
    
    return MyTensor(values, requires_grad, dependency) 


class MyTensor(object):

    def __init__(self, values, requires_grad=False, dependency=None):
        self._values = np.array(values)
        self._shape = self._values.shape

        self.grad = None
        self.requires_grad = requires_grad

        if self.requires_grad:
            self.zero_grad()

        if dependency is None:
            self.dependency = list()
        else:
            self.dependency = dependency
        
    def zero_grad(self):
        self.grad = np.zeros(self._shape)
    
    @property
    def values(self):
        return self._values
    
    @property
    def shape(self):
        return self._shape
    
    @values.setter
    def values(self, values):
        self._values = np.array(values)
        self.grad = None

    def backward(self, grad=None):
        """
        反向传播核心代码，累计当前tensor的梯度，同时反向传播梯度
        """
        assert self.requires_grad, "Call backward() on a non-requires-grad tensor"

        if grad is None:
            grad = 1.0
        grad = np.array(grad)

        # 梯度叠加
        self.grad += grad

        # 反向传播梯度到直接依赖的运算
        for dep in self.dependency:
            # 注意这里是直接将梯度传播到依赖的运算，而不是传播梯度叠加的结果
            # 其原因在于这里使用的是深度优先遍历
            # 由于在反向传播梯度后，会进行直接依赖的运算反向传播，因此是深度优先
            # 这么一来在遍历完成后，所有经过当前运算传递到依赖运算的路径均会被搜索到
            
            # 比如
            # 假设当前节点前有两个后继节点
            # 那么反向传播流程为

            # 梯度清零
            # 后继节点一反向传播，当前节点梯度+grad_1
            # 当前节点继续反向传播，前置节点梯度+grad_1
            # 另一条路径：后继节点二反向传播，当前节点梯度+grad_2
            # 当前节点继续反向传播，前置节点梯度+grad_2

            # 若在上述过程中每次传递的是累计梯度，则会导致前置节点梯度+grad_1+(grad_1+grad_2)
            # 即在第二条路径中对第一条路径的梯度又叠加了一次
            grad_cal_method_for_dep = dep["grad_func"](grad)
            # 迭代调用直接依赖的运算的反向传播
            dep["tensor"].backward(grad_cal_method_for_dep)
    
    # 运算重载，对于反向传播，其不仅要计算值，还需要计算梯度
    # 重载运算

    # 二元运算：
    # 矩阵乘法：左乘、右乘、原位乘
    # 加法：左加、右加、原位加
    # 减法：左减、右减、原位减
    # 按位乘法：左乘、右乘、原位乘

    # 一元运算：
    # 取反

    def __add__(self, other):
        """
        右加法重载
        
        return: self + other
        """
        return self._add(self, as_tensor(other))
    
    def __radd__(self, other):
        """
        左加法重载

        return: other + self
        """
        return self._add(as_tensor(other), self)
    
    def __iadd__(self, other):
        """
        原位加法重载

        self += other

        原位加法不会生成新的节点，因此不会记录梯度
        """
        self.values = self.values + as_tensor(other).values
        return self

    def _add(self, tensor1, tensor2):
        """
        运算为：c = a + b

        Dc/Da = 1
        Dc/Db = 1

        在本运算中，要求 a.shape == b.shape == c.shape
        或者使用numpy默认的broadcast机制

        对于维度匹配的加法，梯度直接传递即可
        对于维度无法匹配需要采用broadcast机制进行计算的加法，需要对维度进行处理

        broadcast机制：
        这里仅考虑二维的情况

        1. 对于形如 (m, n) + (n, ) -> (m, n)形式的broadcast:
        对于第一个加数，将计算结果的梯度直接赋予当前加数即可
        对于第二个加数，其需要将计算结果的梯度沿第一维度叠加，然后赋予当前加数

        2. 对于形如(m, n) + (1, n) -> (m, n)形式的broadcast：
        对于第一个加数，将计算结果的梯度直接赋予当前加数即可
        对于第二个加数，其需要将计算结果的梯度沿第一维度叠加，并保留维度，然后赋予当前加数
        """
        _result = tensor1.values + tensor2.values

        def grad_func_tensor1(grad):

            # 处理第一种情况
            # 此时，若当前tensor的维度要小于grad的维度，则说明存在第一种broadcast
            for _ in range(grad.ndim - tensor1.values.ndim):
                grad = grad.sum(axis=0)
            # 处理第二种情况
            # 此时，若当前tensor的维度等于grad的维度，但是存在为1的维度，则说明可能是第二种broadcast
            for i, dim in enumerate(tensor1.values.shape):
                if dim == 1:
                    grad = grad.sum(axis=i, keepdims=True)
            return grad

        def grad_func_tensor2(grad):
            # 同上
            for _ in range(grad.ndim - tensor2.values.ndim):
                grad = grad.sum(axis=0)
            for i, dim in enumerate(tensor2.values.shape):
                if dim == 1:
                    grad = grad.sum(axis=i, keepdims=True)
            return grad
        
        return build_binary_ops_result_tensor(tensor1, tensor2, grad_func_tensor1, grad_func_tensor2, _result)

    def __matmul__(self, other):
        """
        矩阵右乘

        return self @ other
        """
        return self._matmul(self, as_tensor(other))
    
    def __rmatmul__(self, other):
        """
        矩阵左乘
        
        return other @ self
        """
        return self._matmul(as_tensor(other), self)
    
    def __imatmul__(self, other):
        """
        原位乘法
        self @= other
        """
        self.values = self.values @ as_tensor(other).values
        return self

    def _matmul(self, tensor1, tensor2):
        """
        运算为: c = a @ b

        Dc/Da = grad @ b.T
        Dc/Db = a.T @ grad
        """
        if tensor1.shape[1] != tensor2.shape[0]:
            raise RuntimeError("RuntimeError: size mismatch, m1: {}, m2: {}".format(tensor1.shape, tensor2.shape))

        _result = tensor1.values @ tensor2.values
        def grad_func_tensor1(grad):
            return grad @ tensor2.values.T
        
        def grad_func_tensor2(grad):
            return tensor1.values.T @ grad

        return build_binary_ops_result_tensor(tensor1, tensor2, grad_func_tensor1, grad_func_tensor2, _result)

    def __mul__(self, other):
        """
        按位右乘

        reture self * other
        """
        return self._mul(self, as_tensor(other))

    def __rmul__(self, other):
        """
        按位左乘

        return other * self
        """
        return self._mul(as_tensor(other), self)
    
    def __imul__(self, other):
        """
        原位按位乘

        self *= other
        """
        self.values = self.values * as_tensor(other).values
        return self

    def _mul(self, tensor1, tensor2):
        """
        运算为: c = a * b

        Dc/Da = b
        Dc/Db = a

        在本运算中，要求 a.shape == b.shape == c.shape
        或者使用numpy默认的broadcast机制

        对于维度匹配的加法，梯度直接传递即可
        对于维度无法匹配需要采用broadcast机制进行计算的加法，需要对维度进行处理

        由于
        Dc/Da = b
        Dc/Db = a
        因此在计算梯度时，不同于add，按位乘法需要依赖numpy的broadcast首先计算梯度值，然后处理维度问题
        即：
        tensor1.grad = grad * tensor2.grad
        tensor2.grad = grad * tensor1.grad

        维度处理同add
        """
        _result = tensor1.values * tensor2.values

        def grad_func_tensor1(grad):
            grad = grad * tensor2.values

            # 处理第一种情况
            # 此时，若当前tensor的维度要小于grad的维度，则说明存在第一种broadcast
            for _ in range(grad.ndim - tensor1.values.ndim):
                grad = grad.sum(axis=0)
            # 处理第二种情况
            # 此时，若当前tensor的维度等于grad的维度，但是存在为1的维度，则说明可能是第二种broadcast
            for i, dim in enumerate(tensor1.values.shape):
                if dim == 1:
                    grad = grad.sum(axis=i, keepdims=True)
            return grad

        def grad_func_tensor2(grad):
            grad = grad * tensor1.values
            # 同上
            for _ in range(grad.ndim - tensor2.values.ndim):
                grad = grad.sum(axis=0)
            for i, dim in enumerate(tensor2.values.shape):
                if dim == 1:
                    grad = grad.sum(axis=i, keepdims=True)
            return grad
        
        return build_binary_ops_result_tensor(tensor1, tensor2, grad_func_tensor1, grad_func_tensor2, _result)


# 假设进行如下的运算
# x1 : [2, 2]
# x2 : [2, 1]
# x3 : [2, 1]
# y = x3 * (x1 @ (x2 + 1))

# y 对x1的偏导为：x3 @ (x2 + 1)^T
# y 对x2的偏导为：x1^T @ x3
# y 对x3的偏导为：x1 @ (x2 + 1)

tensor1 = MyTensor(np.ones((2, 2)) * 2, requires_grad=True)
tensor2 = MyTensor(np.zeros((2, 1)), requires_grad=True)
tensor3 = MyTensor(np.ones((2, 1)) * 3, requires_grad=True)

output_tensor = tensor3 * (tensor1 @ (tensor2 + 1))

# 梯度清零
output_tensor.zero_grad()
# 梯度反向传播
output_tensor.backward()

print("\n输出tensor对tensor1的偏导:")
print(tensor1.grad)
print("\n输出tensor对tensor2的偏导:")
print(tensor2.grad)
print("\n输出tensor对tensor3的偏导:")
print(tensor3.grad)


输出tensor对tensor1的偏导:
[[3. 3.]
 [3. 3.]]

输出tensor对tensor2的偏导:
[[12.]
 [12.]]

输出tensor对tensor3的偏导:
[[4.]
 [4.]]


上述定义的tensor类，重载了各类运算符，这使得在计算运算结果的同时也会记录相应的梯度，实现自动微分功能。相对更完整的自动微分实现（基本的激活函数和求和等操作）以及应用(全连接网络)可以参考[基本自动微分](https://github.com/koolo233/NeuralNetworks/blob/main/BasicAutomaticDifferentiation.ipynb)

### 总结

“通过实现特殊方法，自定义数据类型可以表现得跟内置类型一样，从而让我们写出更具表达力的代码——或者说，更具Python风格的代码”

确实相当方便，并且能够有效降低记忆成本