# 要点总结
1. 常用函数，比如 jax.tree.map
2. 自定义 pytree 结点
3. 常见的与 pytree 兼容的结构，比如 NamedTuple 和 dataclass
4. Pytrees 和 JAX 变换，如何用 vmap 处理字典数据
5. 结点资格设置
6. 形状陷阱
7. pytree 的转置
8. 显式键路径

In [1]:
import jax
import jax.numpy as jnp
import numpy as np

# pytree 中的常用函数

In [3]:
# map 函数

# 我不太会写 lambda 函数，先快速上手一个
double = lambda x: 2*x
print(f"这是lambda语句的一个快速例子：{double(5)}")

# 再试试第二个例子
test = [1, 2, 3]
# 使用 lambda 和 map 生成新列表
new_list = list(map(lambda x: x + 1, test))
print(new_list)  # 输出: [2, 3, 4]

list_of_lists = [
    [1, 2, 3],
    [1, 2],
    [1, 2, 3, 4]
]

print(jax.tree.map(lambda x: double(x), list_of_lists))
#或者不用我的double函数，效果是一样的：
print(jax.tree.map(lambda x: 2*x, list_of_lists))
print(jax.tree.map(double, list_of_lists)) # 完全不适用 lambda 更简单

这是lambda语句的一个快速例子：10
[2, 3, 4]
[[2, 4, 6], [2, 4], [2, 4, 6, 8]]
[[2, 4, 6], [2, 4], [2, 4, 6, 8]]
[[2, 4, 6], [2, 4], [2, 4, 6, 8]]


In [4]:
# 我对 zip 的用法不熟，首先快速上手一个
list1 = ['a', 'b', 'c', 'd']
list2 = [1, 2, 3, 4]

# zip 函数的作用是将两个列表中的元素一一对应地组合成一个元组，返回一个迭代器
for x in zip(list1, list2):
    print(x)

# zip 函数的返回值是一个迭代器，所以我们可以将其转换为列表
list_of_tuples = list(zip(list1, list2))
print(list_of_tuples)

# zip 函数的返回值是一个迭代器，所以我们也可以将其转换为字典
dict_of_tuples = dict(zip(list1, list2))
print(dict_of_tuples)

# zip 函数的返回值是一个迭代器，所以我们也可以将其转换为集合
set_of_tuples = set(zip(list1, list2))
print(set_of_tuples)


# 以及创建一个 dict 的两种方法
## 使用 dict() 创建字典
my_dict = dict(a=1, b=2, c=3)
## 使用大括号 {} 创建字典
my_dict_alt = {'a': 1, 'b': 2, 'c': 3}

print(my_dict == my_dict_alt)  # 输出: True

('a', 1)
('b', 2)
('c', 3)
('d', 4)
[('a', 1), ('b', 2), ('c', 3), ('d', 4)]
{'a': 1, 'b': 2, 'c': 3, 'd': 4}
{('a', 1), ('d', 4), ('b', 2), ('c', 3)}
True


In [5]:
# 初始化神经网络的参数并使用 jax.tree.map 打印参数的类型和形状

def init_mlp_params(layer_widths):
  params = []
  for n_in, n_out in zip(layer_widths[:-1], layer_widths[1:]):
    params.append(
        dict(weights=np.random.normal(size=(n_in, n_out)) * np.sqrt(2/n_in),
             biases=np.ones(shape=(n_out,))
            )
    )
  return params

params = init_mlp_params([1, 128, 128, 1])

jax.tree.map(lambda x: (type(x),x.shape), params)

[{'biases': (numpy.ndarray, (128,)), 'weights': (numpy.ndarray, (1, 128))},
 {'biases': (numpy.ndarray, (128,)), 'weights': (numpy.ndarray, (128, 128))},
 {'biases': (numpy.ndarray, (1,)), 'weights': (numpy.ndarray, (128, 1))}]

In [None]:
# 现在我们可以写出 MLP 的前向传播函数、损失函数、以及参数更新方法

# Define the forward pass.
def forward(params, x):
  *hidden, last = params
  for layer in hidden:
    x = jax.nn.relu(x @ layer['weights'] + layer['biases'])
  return x @ last['weights'] + last['biases']

# Define the loss function.
def loss_fn(params, x, y):
  return jnp.mean((forward(params, x) - y) ** 2)

# Set the learning rate.
LEARNING_RATE = 0.0001

# Using the stochastic gradient descent, define the parameter update function.
# Apply `@jax.jit` for JIT compilation (speed).

def update_one(p, g):
    return p - LEARNING_RATE * g

@jax.jit
def update(params, x, y):
  # Calculate the gradients with `jax.grad`.
  grads = jax.grad(loss_fn)(params, x, y)  # 注意 jax.grad 自动对第一个参数求导
  # Note that `grads` is a pytree with the same structure as `params`.
  # `jax.grad` is one of many JAX functions that has
  # built-in support for pytrees.
  # This is useful - you can apply the SGD update using JAX pytree utilities.
  #return jax.tree.map(
  #    lambda p, g: p - LEARNING_RATE * g, params, grads
  #)
  return jax.tree.map(
      update_one, params, grads
  )

# 自定义 pytree 结点

In [None]:
class Special(object):
#object 是 Python 中所有类的基类 提供了一些基础方法，即使不显式继承 object，Python 也会隐式地将类继承自 object
  def __init__(self, x, y):
    self.x = x
    self.y = y

# jax.tree.leaves 是 JAX 提供的一个函数，用于提取 pytree 中的所有叶子节点。叶子节点是指 pytree 中不可再分解的最小单元。
jax.tree.leaves(
    [Special(0, 1),Special(2, 4)]
)

In [None]:
# 因为没有将 special 注册为 pytree，所以我们可以看到它的类型是 <class '__main__.Special'>，所以下面的句子会报错
# TypeError: unsupported operand type(s) for +: 'Special' and 'int'

'''
jax.tree.map(lambda x: x + 1,
  [Special(0, 1),Special(2, 4)]
             )
'''

In [None]:
# 注册自定义容器示例

# 继承：RegisteredSpecial 继承了 Special，因此它拥有 Special 的所有属性和方法。
# __repr__ 方法：重写了 __repr__ 方法，用于返回对象的可读字符串表示，便于调试和打印。
class RegisteredSpecial(Special):
  def __repr__(self):
    return f"RegisteredSpecial(x={self.x}, y={self.y})"

def special_flatten(v):
  """Specifies a flattening recipe.

  Params:
    v: The value of the registered type to flatten.
  Returns:
    A pair of an iterable with the children to be flattened recursively,
    and some opaque auxiliary data to pass back to the unflattening recipe.
    The auxiliary data is stored in the treedef for use during unflattening.
    The auxiliary data could be used, for example, for dictionary keys.
  """
  children = (v.x, v.y)
  aux_data = None
  return (children, aux_data)

def special_unflatten(aux_data, children):
  """Specifies an unflattening recipe.

  Params:
    aux_data: The opaque data that was specified during flattening of the
      current tree definition.
    children: The unflattened children

  Returns:
    A reconstructed object of the registered type, using the specified
    children and auxiliary data.
  """
  return RegisteredSpecial(*children)

from jax.tree_util import register_pytree_node
# Global registration
register_pytree_node(
    RegisteredSpecial,
    special_flatten,    # Instruct JAX what are the children nodes.
    special_unflatten   # Instruct JAX how to pack back into a `RegisteredSpecial`.
)
'''
register_pytree_node 需要三个输入参数：
需要注册的类：这里是 RegisteredSpecial，表示你要将哪个类注册为 pytree 节点。
flatten 方法：定义如何将对象分解为子节点（children）和辅助数据（auxiliary data）。
unflatten 方法：定义如何从子节点和辅助数据重新构造对象。
这两个方法（flatten 和 unflatten）共同指定了对象的序列化和反序列化方式，以便 JAX 能够正确处理该类的实例。

NB: flatten 输出的顺序和 unflatten 参数的顺序不同
'''

jax.tree.map(lambda x: x + 1,
  [RegisteredSpecial(0, 1),RegisteredSpecial(2, 4)]
             )

In [None]:
# 我对 NamedTuple 和 Tuple 的用法不熟，首先快速上手一个
# NamedTuple 可以用来创建不可变的对象
from typing import NamedTuple

class Point(NamedTuple):
    name: str
    x: int
    y: int

# 创建一个 Point 实例
p = Point("point", 3, 4)

# 访问属性
print(p) # 注意到 name 这个属性其实是被当成一个普通数据了
print(f"x: {p.x}, y: {p.y}")

# NamedTuple 实例是不可变的
# p.x = 5  # 这会报错，因为 NamedTuple 是不可变的

a_tuple = (1, 2, 3)
print(f"a_tuple的类型是{type(a_tuple)}，其第一个元素是{a_tuple[0]}.")
# 但是一个 namedtuple 的第一个元素却不算 name
# test[0] = 4 # 这会报错，因为元组是不可变的

In [None]:
# 有些类型的对象已经与 pytree 兼容了，所以我们可以直接使用不需要注册它们
# 比如 NamedTuple

from typing import NamedTuple, Any

class MyOtherContainer(NamedTuple):
  name: str
  a: Any
  b: Any
  c: Any

test = MyOtherContainer('test', 1, 2, 3)
for i in test:
  print(i)

# NamedTuple subclasses are handled as pytree nodes, so
# this will work out-of-the-box.
jax.tree.leaves([
    MyOtherContainer('Alice', 1, 2, 3),
    MyOtherContainer('Bob', 4, 3.4, 'test')
])
# 但是在转换后， name 这个属性却被视为数据了

In [None]:
# 我对 dataclass 的用法不熟，首先快速上手一个
# dataclass 是 Python 3.7 引入的一个装饰器，用于简化类的定义，特别是用于存储数据的类。

# dataclass 可以自动生成一些特殊方法，比如 __init__、__repr__ 和 __eq__ 等等。
# 这使得我们可以更方便地创建和使用数据类，而不需要手动编写这些方法。
# 我试过有没有类似的基类而不用 dataclass 装饰器的类，结果发现没有
from dataclasses import dataclass

@dataclass
class Point:
    x: int
    y: int


# 创建实例
p1 = Point(3, 4)
p2 = Point(3, 4)

# 自动生成的 __repr__ 方法
print(p1)  # 输出: Point(x=3, y=4)

# 自动生成的 __eq__ 方法
print(p1 == p2)  # 输出: True

In [None]:
# dataclass 可以简化注册流程

from dataclasses import dataclass
import functools

@functools.partial(jax.tree_util.register_dataclass,
                   data_fields=['a', 'b', 'c'],
                   meta_fields=['name'])
@dataclass
class MyDataclassContainer(object):
  name: str
  a: Any
  b: Any
  c: Any

# MyDataclassContainer is now a pytree node.
jax.tree.leaves([
  MyDataclassContainer('apple', 5.3, 1.2, jnp.zeros([4])),
  MyDataclassContainer('banana', np.array([3, 4]), -1., 0.)
])

In [None]:
# 同时dataclass 的对象对jit编译也很友好

@jax.jit
def f(x: MyDataclassContainer | MyOtherContainer):
  return x.a + x.b

# Works fine! `mdc.name` is static.
mdc = MyDataclassContainer('mdc', 1, 2, 5)
y = f(mdc)
print(y)

In [None]:
# namedtuple 对 jit 编译支持就一般了

moc = MyOtherContainer('moc', 1, 2, 3)
# 下面的句子会报错：
# TypeError: Error interpreting argument to <function f at 0x000002741BCB7100> as an abstract array. The problematic value is of type <class 'str'> and was passed to the function at path x.name.

#y = f(moc)

# Common pytree gotchas

In [None]:
a_tree = [jnp.zeros((2, 3)), jnp.zeros((3, 4))]
print("这是原来的全零：",a_tree)

# Try to make another pytree with ones instead of zeros.
shapes = jax.tree.map(lambda x: x.shape, a_tree)
#jax.tree.map(jnp.ones, shapes)

# 手动实现
ones = []
for x in a_tree:
    ones.append(jnp.ones(x.shape))
print(ones)

# 更加智能的实现 反正我拒绝使用 lambda 函数
ones_tree = jax.tree.map(jnp.ones_like, a_tree)
print(ones_tree)

In [7]:
# 这个句子会得到一个空列表
jax.tree.leaves([None, None, None])

[]

In [8]:
# 这个句子会得到一个包含三个 None 的列表
jax.tree.leaves([None, None, None], is_leaf=lambda x: x is None)

[None, None, None, 1, 2]

In [9]:
# is_leaf 函数可以有更多的用途

tree = [[], {}, [1, 2], {"key": 3}]

# 将空列表和空字典视为叶子节点
leaves = jax.tree_util.tree_leaves(tree)
print(leaves)  # 输出: [[], {}, 1, 2, 3]

leaves = jax.tree_util.tree_leaves(tree, is_leaf=lambda x: x == [] or x == {})
print(leaves)  # 输出: [[], {}, 1, 2, 3]


# 反正我就是想方设法不用 lambda 函数
def is_leaf(x):
    return x==[] or x=={}
leaves = jax.tree_util.tree_leaves(tree, is_leaf=is_leaf)
print(leaves)  # 输出: [[], {}, 1, 2, 3]

[1, 2, 3]
[[], {}, 1, 2, 3]
[[], {}, 1, 2, 3]


# Pytrees and JAX transformations
# 使用 vmap 处理 字典数据

In [None]:
import jax
import jax.numpy as jnp

def process_data(data_dict, constant_param):
  return {'processed_a': data_dict['a'] * 2 + constant_param,
          'processed_b': data_dict['b'] - 5}

# 假设我们有一批数据
batch_data = {
    'a': jnp.array([[1, 2], [3, 4], [5, 6]]),  # 形状 (3, 2)，我们想对第一个轴映射
    'b': jnp.array([10, 20, 30])             # 形状 (3,)，我们也想对第一个轴映射
}
constant_value = 100

In [None]:
# in_axes 指定了如何映射 process_data 的参数
# 第一个参数 (data_dict) 是一个字典，所以我们用一个字典来指定其内部元素的映射
#   'a': 0 表示映射 batch_data['a'] 的第 0 轴
#   'b': 0 表示映射 batch_data['b'] 的第 0 轴
# 第二个参数 (constant_param) 是一个标量，我们不希望映射它，所以用 None
vectorized_process_data = jax.vmap(process_data, in_axes=({'a': 0, 'b': 0}, None))

result = vectorized_process_data(batch_data, constant_value)
print(result)
# 输出将会是:
# {
#   'processed_a': DeviceArray([[102, 104], [106, 108], [110, 112]], dtype=int32),
#   'processed_b': DeviceArray([ 5, 15, 25], dtype=int32)
# }

In [None]:
def another_process(data_dict):
  return {'sum_a': jnp.sum(data_dict['a']), # 假设这里返回一个标量
          'expanded_b': data_dict['b'] * 2}

# 假设输入数据
input_dicts = {
    'a': jnp.array([[1,2],[3,4],[5,6]]), # 映射轴 0
    'b': jnp.array([[10],[20],[30]])     # 映射轴 0
}

# 我们希望 'sum_a' 在输出中仍然是一个批量的标量（即一个一维数组），
# 而 'expanded_b' 正常地将批处理轴放在第 0 轴。
# 如果 'sum_a' 在原始函数中返回的是标量，vmap后会变成一个向量。
# 如果我们想特殊处理 'sum_a' 的输出轴（例如，如果它不是被vmap化的），
# 或者确保输出的批处理轴在特定位置，out_axes可以是一个字典。

# 默认out_axes=0行为:
vectorized_another_process_default_out = jax.vmap(
    another_process,
    in_axes=({'a': 0, 'b': 0},)
)
output_default = vectorized_another_process_default_out(input_dicts)
# output_default['sum_a'] 将会是 jnp.array([3, 7, 11]) (批处理轴为0)
# output_default['expanded_b'] 将会是 jnp.array([[20],[40],[60]]) (批处理轴为0)
print(output_default)

In [None]:
# 使用 out_axes 的一个例子，虽然对于上面的函数不是特别必要，
# 但演示了其结构：
vectorized_another_process_custom_out = jax.vmap(
    another_process,
    in_axes=({'a': 0, 'b': 0},),
    out_axes={'sum_a': 0, 'expanded_b': 0} # 明确指定输出轴
)
output_custom = vectorized_another_process_custom_out(input_dicts)
# 结果与上面相同，因为0是默认值

# 更复杂的 out_axes 用法通常在需要将批处理轴插入到输出数组的特定位置时，
# 或者当函数输出的 pytree 的某些叶子节点不应该有新的批处理轴时（用 None）。
# 例如，如果原始函数返回 {'res': mapped_array, 'static_val': some_scalar}
# 并且我们希望 'static_val' 在 vmap 后仍然是那个标量（不添加批处理维度），
# 我们可以用 out_axes={'res': 0, 'static_val': None}。
# (注意: 对于标量，vmap 通常会自动处理，但对于已经是数组且不想再添加轴的情况，None 更为关键)

def complex_output_func(x_dict):
    return {'mapped': x_dict['val'] ** 2, 'fixed': jnp.array(99)}

vectorized_complex = jax.vmap(
    complex_output_func,
    in_axes=({'val': 0},),
    out_axes={'mapped': 0, 'fixed': None} # 'fixed' 将不会有批处理维度
)

input_complex = {'val': jnp.array([1,2,3])}
result_complex = vectorized_complex(input_complex)
print(result_complex)
# 输出将会是:
# {
#   'mapped': DeviceArray([2, 4, 6], dtype=int32),
#   'fixed': DeviceArray(99, dtype=int32) # 注意这里没有批处理维度
# }

In [None]:
# 我自己写的例子 注意vamp参数的形状和逗号

import jax
import jax.numpy as jnp

# 定义函数，计算 x 的平方加上 bias
def func(d):
    return {"result": d["x"] + 2 + d["bias"]}

def func(d):
    return d["x"]+ 2 + d["bias"]

# 数据字典，x 是一个批量的数组，bias 是一个标量
data = {
    "x": jnp.array([[12,23,34],[784,785,786]]), # 这是一个二维数组，形状是 (2, 3)
    "bias": 10.0             # 这是一个标量，会广播给每个元素
}

# 使用 vmap 并设置 in_axes

result0 = jax.vmap(func, in_axes=({'x': 0, 'bias': None},), out_axes=0)(data)
result1 = jax.vmap(func, in_axes=({'x': 0, 'bias': None},), out_axes=1)(data)
'''
# 或者分开写
vectorized_func = jax.vmap(func, in_axes=({'x': 0, 'bias': None},)) # 注意这里末尾的逗号 ,
result = vectorized_func(data)
# 效果是一样的
'''

print(result0,'\n', result1)

# Pytree 的转置

In [None]:
# 自己写函数 使用 lambda

def tree_transpose(list_of_trees):
  """
  Converts a list of trees of identical structure into a single tree of lists.
  """
  return jax.tree.map(lambda *xs: list(xs), *list_of_trees)

# Convert a dataset from row-major to column-major.
episode_steps = [dict(t=1, obs=3), dict(t=2, obs=4)]
tree_transpose(episode_steps)

In [None]:
# 自己写函数，不用 lambda

import jax

# 首先定义辅助函数
def pack_elements_into_list(*elements):
  """
  将所有传入的元素收集到一个列表中。
  """
  print(type(elements), elements)
  return elements #list(elements)

def tree_transpose_with_named_function(list_of_trees):
  """
  将具有相同结构的树的列表转换为由列表组成的单一树结构。
  使用一个命名的辅助函数代替 lambda。
  """
  # jax.tree.map 和 jax.tree_util.tree_map 效果一样
  return jax.tree.map(pack_elements_into_list, *list_of_trees)

# 示例数据
episode_steps = [dict(t=1, obs=3), dict(t=2, obs=4)]
print(episode_steps)

# 使用修改后的函数
result = tree_transpose_with_named_function(episode_steps)
print(result)
# 预期输出: {'t': [1, 2], 'obs': [3, 4]}

In [None]:
# 使用 JAX 内置的转置工具

jax.tree.transpose(
  outer_treedef = jax.tree.structure([0 for e in episode_steps]),
  inner_treedef = jax.tree.structure(episode_steps[0]),
  pytree_to_transpose = episode_steps
)

# [0 for e in episode_steps] 是一个遍历语句

# Explicit key paths

In [None]:
import collections

ATuple = collections.namedtuple("ATuple", ('name'))

tree = [1, {'k1': 2, 'k2': (3, 4)}, ATuple('foo')]
flattened, _ = jax.tree_util.tree_flatten_with_path(tree)

for key_path, value in flattened:
  print(f'Value of tree{jax.tree_util.keystr(key_path)}: {value}')

for key_path, _ in flattened:
  print(f'Key path of tree{jax.tree_util.keystr(key_path)}: {key_path}')
  #print(f'Key path of tree{jax.tree_util.keystr(key_path)}: {repr(key_path)}')
  # 这两种写法都是一样的呀
# repr 是 Python 内置的函数，用于返回对象的“官方字符串表示。