In [12]:
from jax import numpy as jnp


Polynomial = jnp.ndarray


def poly_x() -> Polynomial:
    return jnp.array([1, 0])


def poly_int(coeffs: Polynomial) -> Polynomial:
    return jnp.concatenate([coeffs / jnp.arange(len(coeffs), 0, -1), jnp.zeros(1)])


def poly_definite_int(coeffs: Polynomial, l: float, r: float) -> float:
    integral = poly_int(coeffs)
    return jnp.polyval(integral, r) - jnp.polyval(integral, l)


def poly_shift(p: Polynomial, h: float) -> Polynomial:
    """
    p(x) -> p(x - h)
    """

    res = jnp.zeros([1])

    x_m_h = jnp.array([1, -h])
    x_m_h_p = jnp.ones([1])

    for i in range(len(p)):
        res = jnp.polyadd(res, x_m_h_p * p[-i - 1])
        x_m_h_p = jnp.polymul(x_m_h_p, x_m_h)

    return res



In [13]:
"""
这个文件定义了两个“张量列车/张量链”相关的数据结构：

1) TT: Tensor Train（张量列车）——用一串三维 core 来表示一个高维张量
   - 每个 core 的形状是 (r_left, dim, r_right)
   - r_left, r_right 是“TT rank”（内部连接的秩）
   - dim 是这个维度的物理维度（例如每个变量离散取值个数）

2) TTOperator: TT 格式的线性算子（矩阵/算子）——用一串四维 core 来表示一个高维线性变换
   - 每个 core 的形状是 (r_left, dim_from, dim_to, r_right)
   - dim_from 是输入维度，dim_to 是输出维度

此外提供一些常用操作：
- 生成全 0 的 TT
- 随机生成 TT / TTOperator
- 把 TT / Operator “还原成完整张量/完整算子”（full_tensor / full_operator）
- TT 的 reverse / astype / 减法
- TT core 的转置（用于 reverse）
- 两个 TT 的减法（返回一个新的 TT，秩会增大）

注意：代码用 flax.struct.dataclass 让这个类是“不可变 pytree”，方便 JAX 的 jit/vmap/grad。
"""

from __future__ import annotations

from typing import Sequence, List

import jax
from jax import numpy as jnp
from flax import struct


@struct.dataclass
class TT:
    """
    Tensor Train (TT) 表示法。

    一个 n 维张量 A[i1, i2, ..., in] 被表示成 n 个 core 的连乘：
      core_k 的形状是 (r_{k}, dim_k, r_{k+1})
    其中：
      - dim_k 是第 k 维的大小（物理维度）
      - r_k 是 TT rank（内部连接维度）
      - 约定 r_0 = r_{n} = 1，这样整条链最终收缩成标量/张量元素

    这里 cores 存放的是一个 list，每个元素是 jnp.ndarray（三维）
    """

    @classmethod
    def zeros(cls, dims: Sequence[int], rs: Sequence[int]) -> TT:
        """
        构造一个全 0 的 TT。

        参数：
          dims: 每个维度的大小 [dim1, dim2, ..., dim_n]
          rs:   TT ranks（不包含两端的 1）[r1, r2, ..., r_{n-1}]
                注意长度必须是 n-1

        返回：
          TT 对象，其中每个 core 都是全 0 数组。
        """
        # TT 的标准约束：n 个 dims 对应 n-1 个内部 rank
        assert len(dims) == len(rs) + 1

        # 两端 rank 固定为 1：r0=1, rn=1
        rs = [1] + list(rs) + [1]

        # 逐个维度创建 core：形状 (r_left, dim, r_right)
        cores = [jnp.zeros((rs[i], dim, rs[i + 1])) for i, dim in enumerate(dims)]

        return cls(cores)

    @classmethod
    def generate_random(cls, key: jnp.ndarray, dims: Sequence[int], rs: Sequence[int]) -> TT:
        """
        随机生成一个 TT（每个 core 元素 ~ N(0,1)）。

        参数：
          key:  JAX 随机数 key
          dims: 每个维度大小
          rs:   内部 ranks（长度 n-1）

        返回：
          TT 对象，cores 为随机正态。
        """
        assert len(dims) == len(rs) + 1

        rs = [1] + list(rs) + [1]

        # 为每个 core 分配一个子 key，避免随机数重复
        keys = jax.random.split(key, len(dims))

        # 对每个维度 dim 生成 (r_left, dim, r_right) 的随机 core
        cores = [
            jax.random.normal(key, (rs[i], dim, rs[i + 1]))
            for i, (dim, key) in enumerate(zip(dims, keys))
        ]

        return cls(cores)

    # TT 的核心数据：n 个 core，每个 core 是 (r_left, dim, r_right)
    cores: List[jnp.ndarray]

    @property
    def n_dims(self):
        """返回张量的维数 n（也就是 core 的个数）。"""
        return len(self.cores)

    @property
    def full_tensor(self) -> jnp.ndarray:
        """
        把 TT 还原成“完整的高维张量”。

        实现方式：
          从第一个 core 开始，依次与后续 core 做 einsum 收缩 TT rank 维度。
          每次收缩掉上一个结果的右 rank，与下一个 core 的左 rank 对齐。

        结果形状：
          (dim1, dim2, ..., dim_n)
        """
        res = self.cores[0]  # (1, dim1, r2)
        for core in self.cores[1:]:
            # res:  (..., r)   core: (r, i, R)
            # -> (..., i, R)  把 r 收缩掉，拼接出新的物理维度 i
            res = jnp.einsum('...r,riR->...iR', res, core)

        # TT 两端 rank 都是 1，所以第一维和最后一维可以 squeeze 掉
        return jnp.squeeze(res, (0, -1))

    def reverse(self) -> TT:
        """
        把 TT 的维度顺序反过来（核心顺序翻转）。

        注意：
          翻转 core 的顺序后，每个 core 的左右 rank 方向也反了，
          所以需要对 core 做 transpose_core 来交换左右 rank 轴。
        """
        return TT([transpose_core(core) for core in self.cores[::-1]])

    def astype(self, dtype: jnp.dtype) -> TT:
        """把 TT 的所有 core 转成指定 dtype（例如 float32 / float64）。"""
        return TT([core.astype(dtype) for core in self.cores])

    def __sub__(self, other: TT):
        """定义 TT 的减法：self - other。"""
        return subtract(self, other)


@struct.dataclass
class TTOperator:
    """
    TT 格式的线性算子（你可以理解成高维矩阵/算子）。

    如果普通矩阵是 2D：A[out, in]
    那高维算子可以看成：A[i1..in, j1..jn]（输入 n 维 -> 输出 n 维）

    TT Operator 用 n 个 4D core 表示，每个 core 形状：
      (r_left, dim_from, dim_to, r_right)
    """

    @classmethod
    def generate_random(
        cls, key: jnp.ndarray, dims_from: Sequence[int], dims_to: Sequence[int], rs: Sequence[int]
    ) -> TTOperator:
        """
        随机生成一个 TT Operator（每个 core 元素 ~ N(0,1)）。

        参数：
          key:       JAX random key
          dims_from: 输入每维大小 [din1, din2, ..., din_n]
          dims_to:   输出每维大小 [dout1, dout2, ..., dout_n]
          rs:        内部 ranks（长度 n-1）

        返回：
          TTOperator 对象
        """
        n_dims = len(dims_from)

        # 基本一致性检查
        assert len(dims_from) == n_dims
        assert len(dims_to) == n_dims
        assert len(rs) + 1 == n_dims

        rs = [1] + list(rs) + [1]
        keys = jax.random.split(key, n_dims)

        # 每个 core 是 4D：(r_left, dim_from, dim_to, r_right)
        cores = [
            jax.random.normal(key, (rs[i], dim_from, dim_to, rs[i + 1]))
            for i, (dim_from, dim_to, key) in enumerate(zip(dims_from, dims_to, keys))
        ]

        return cls(cores)

    cores: List[jnp.ndarray]

    @property
    def full_operator(self) -> jnp.ndarray:
        """
        把 TT Operator 还原成完整算子（一个巨大的高维张量/矩阵）。

        逐 core einsum：
          res:  (..., r)
          core: (r, i, j, R)
          -> (..., i, j, R)

        最终 squeeze 掉两端 rank=1。
        结果形状：
          (din1, dout1, din2, dout2, ..., din_n, dout_n)
        （具体排列取决于 einsum 的写法，这里是按每个维度生成一对 (i,j)）
        """
        res = self.cores[0]
        for core in self.cores[1:]:
            res = jnp.einsum('...r,rijR->...ijR', res, core) # Einstein求和约定
        return jnp.squeeze(res, (0, -1)) 

    def reverse(self):
        """
        反转 operator 的 core 顺序。

        这里作者留了句注释：
          "idk, what should I do with axes 1 and 2."
        意思是：输入/输出物理轴 (dim_from, dim_to) 是否要交换、怎么交换，
        其实取决于你希望 reverse 后代表什么数学对象（是反转维度顺序？还是转置算子？）。

        当前实现：
          - core 顺序翻转
          - 把 rank 轴对调：把 axis 0 和 axis 3 互换
          - axis 1/2（输入/输出物理轴）保持不变
        """
        return TTOperator(
            [jnp.moveaxis(core, (0, 1, 2, 3), (3, 1, 2, 0)) for core in self.cores[::-1]]
        )


def transpose_core(core: jnp.ndarray) -> jnp.ndarray:
    """
    对 TT core 做“左右 rank 交换”。

    输入 core 形状：(r_left, dim, r_right)
    输出形状：(r_right, dim, r_left)

    用途：
      TT.reverse() 翻转 core 顺序时，需要把连接方向也翻过来。
    """
    return jnp.moveaxis(core, (0, 1, 2), (2, 1, 0))


def subtract(lhs: TT, rhs: TT) -> TT:
    """
    计算两个 TT 的差：lhs - rhs，并返回一个新的 TT。

    关键点（非常重要）：
    - 一般 TT 直接相减后，结果的 TT rank 会变大
    - 这里用的是经典的“block-diagonal 拼接”构造法：
        - 第一个 core：在右 rank 方向拼接 [lhs, -rhs]
        - 中间 core：做 2x2 的块对角拼接（lhs 在左上，rhs 在右下）
        - 最后一个 core：在左 rank 方向拼接 [lhs; rhs]

    这样保证：
      TT(full) = TT(lhs) - TT(rhs)

    代价：
      ranks 会变成原来的大约“相加”（更准确：中间 rank 变成 r1+r2）。
    """
    assert lhs.n_dims == rhs.n_dims

    # 只有 1 个维度时，TT 就是一个 (1, dim, 1) 的 core，直接相减即可
    if lhs.n_dims == 1:
        return TT([lhs.cores[0] - rhs.cores[0]])

    # 第一个 core：沿着右 rank 维（axis=-1）拼接
    # lhs: (1, d1, r1)  rhs: (1, d1, r1')
    # -> (1, d1, r1+r1')
    first = jnp.concatenate([lhs.cores[0], -rhs.cores[0]], axis=-1)

    # 最后一个 core：沿着左 rank 维（axis=0）拼接
    # lhs: (r_{n-1}, dn, 1)  rhs: (r'_{n-1}, dn, 1)
    # -> (r_{n-1}+r'_{n-1}, dn, 1)
    last = jnp.concatenate([lhs.cores[-1], rhs.cores[-1]], axis=0)

    # 中间 core：做块对角拼接（2x2 block）
    # 对每个位置 k：
    #   [ c1   0 ]
    #   [ 0   c2 ]
    inner = [
        jnp.concatenate(
            [
                # 上半块： [c1, 0]
                jnp.concatenate([c1, jnp.zeros((c1.shape[0], c1.shape[1], c2.shape[2]))], axis=-1),
                # 下半块： [0, c2]
                jnp.concatenate([jnp.zeros((c2.shape[0], c2.shape[1], c1.shape[2])), c2], axis=-1),
            ],
            axis=0,  # 沿左 rank 方向拼接成上下两块
        )
        for c1, c2 in zip(lhs.cores[1:-1], rhs.cores[1:-1])
    ]

    return TT([first] + inner + [last])


In [14]:
key = jax.random.PRNGKey(0)

dims = [4, 3, 5]   # 三个物理维度
rs   = [2, 3]      # 内部 ranks: r1=2, r2=3（长度 n-1）

tt1 = TT.generate_random(key, dims=dims, rs=rs)
tt2 = TT.generate_random(key, dims=dims, rs=rs)
tt3 = tt1-tt2

In [15]:
A1 = tt1.full_tensor
B1 = tt2.full_tensor
C1 = tt3.full_tensor
C1[1,1,1] - A1[1,1,1] + B1[1,1,1]

Array(4.7683716e-07, dtype=float32)

In [16]:
dims = [G.shape[1] for G in tt1.cores]
print(dims)

dims = [G.shape[1] for G in tt3.cores]
print(dims)

[4, 3, 5]
[4, 3, 5]


In [17]:
print(tt1.cores[0].shape)
print(tt3.cores[0].shape)


(1, 4, 2)
(1, 4, 4)


In [18]:
import jax
from flax import struct
from jax import numpy as jnp, ops, vmap

from ttde.tt.tensors import TT, TTOperator
from ttde.utils import cached_einsum


@struct.dataclass
class TTOpt:
    first: jnp.ndarray
    inner: jnp.ndarray
    last: jnp.ndarray

    @classmethod
    def zeros(cls, n_dims: int, dim: int, rank: int):
        return TTOpt(jnp.zeros([1, dim, rank]), jnp.zeros([n_dims - 2, rank, dim, rank]), jnp.zeros([rank, dim, 1]))

    @classmethod
    def from_tt(cls, tt: TT):
        return cls(tt.cores[0], jnp.stack(tt.cores[1:-1], axis=0), tt.cores[-1])

    @classmethod
    def rank_1_from_vectors(cls, vectors: jnp.ndarray):
        """
        vectors: [N_DIMS, DIM]
        """
        return cls(vectors[0, None, :, None], vectors[1:-1, None, :, None], vectors[-1, None, :, None])

    @classmethod
    def from_canonical(cls, vectors: jnp.ndarray):
        """
        vectors: [RANK, N_DIMS, DIM]
        """
        first = vectors[:, 0, :, None].T

        inner = jnp.zeros([vectors.shape[1] - 2, vectors.shape[0], vectors.shape[2], vectors.shape[0]])
        inner = inner.at[:, jnp.arange(vectors.shape[0]), :, jnp.arange(vectors.shape[0])].set(vectors[:, 1:-1, :])

        last = vectors[:, -1, :, None]

        return cls(first, inner, last)

    @property
    def n_dims(self) -> int:
        return 2 + self.inner.shape[0]

    def to_nonopt_tt(self):
        return TT([self.first, *self.inner, self.last])

    def abs(self) -> 'TTOpt':
        return TTOpt(jnp.abs(self.first), jnp.abs(self.inner), jnp.abs(self.last))


@struct.dataclass
class TTOperatorOpt:
    first: jnp.ndarray
    inner: jnp.ndarray
    last: jnp.ndarray

    @classmethod
    def from_tt_operator(cls, tt: TTOperator):
        return cls(tt.cores[0], jnp.stack(tt.cores[1:-1], axis=0), tt.cores[-1])

    @classmethod
    def rank_1_from_matrices(cls, matrices: jnp.ndarray):
        return cls(matrices[0, None, :, :, None], matrices[1:-1, None, :, :, None], matrices[-1, None, :, :, None])


@struct.dataclass
class NormalizedValue:
    value: jnp.ndarray
    log_norm: float

    @classmethod
    def from_value(cls, value):
        sqr_norm = (value ** 2).sum()
        norm_is_zero = sqr_norm == 0
        updated_sqr_norm = jnp.where(norm_is_zero, 1., sqr_norm)

        return cls(
            log_norm=jnp.where(norm_is_zero, -jnp.inf, .5 * jnp.log(updated_sqr_norm)),
            value=value / jnp.sqrt(updated_sqr_norm)
        )


def normalized_inner_product(tt1: TTOpt, tt2: TTOpt):
    def body(state, cores):
        G1, G2 = cores
        contracted = NormalizedValue.from_value(cached_einsum('ij,ikl,jkn->ln', state.value, G1, G2))
        return (
            NormalizedValue(
                value=contracted.value,
                log_norm=jnp.where(state.log_norm == -jnp.inf, -jnp.inf, state.log_norm + contracted.log_norm)
            ),
            None
        )

    state = NormalizedValue.from_value(cached_einsum('ikl,jkn->ln', tt1.first, tt2.first))
    state, _ = jax.lax.scan(body, state, (tt1.inner, tt2.inner))
    state, _ = body(state, (tt1.last, tt2.last))

    return state


def normalized_dot_operator(tt: TTOpt, tt_op: TTOperatorOpt):
    def body(x, A):
        c = jnp.einsum('rms,tmnu->rtnsu', x, A)
        return c.reshape(c.shape[0] * c.shape[1], c.shape[2], c.shape[3] * c.shape[4])

    return TTOpt(
        body(tt.first, tt_op.first),
        vmap(body)(tt.inner, tt_op.inner),
        body(tt.last, tt_op.last)
    )


In [19]:
"""
这个文件定义了两个“张量列车/张量链”相关的数据结构：

1) TT: Tensor Train（张量列车）——用一串三维 core 来表示一个高维张量
   - 每个 core 的形状是 (r_left, dim, r_right)
   - r_left, r_right 是“TT rank”（内部连接的秩）
   - dim 是这个维度的物理维度（例如每个变量离散取值个数）

2) TTOperator: TT 格式的线性算子（矩阵/算子）——用一串四维 core 来表示一个高维线性变换
   - 每个 core 的形状是 (r_left, dim_from, dim_to, r_right)
   - dim_from 是输入维度，dim_to 是输出维度

此外提供一些常用操作：
- 生成全 0 的 TT
- 随机生成 TT / TTOperator
- 把 TT / Operator “还原成完整张量/完整算子”（full_tensor / full_operator）
- TT 的 reverse / astype / 减法
- TT core 的转置（用于 reverse）
- 两个 TT 的减法（返回一个新的 TT，秩会增大）

注意：代码用 flax.struct.dataclass 让这个类是“不可变 pytree”，方便 JAX 的 jit/vmap/grad。
"""

from __future__ import annotations

from typing import Sequence, List, Optional

import jax
from jax import numpy as jnp
from flax import struct


@struct.dataclass
class TTNS:
    """Tensor Train Network State (tree-structured).

    Dimension convention for each core ``G_k``:
        G_k[alpha_parent, i_k, alpha_child1, alpha_child2, ...]

    ``alpha_parent`` is the virtual dimension to the parent (size 1 for root),
    ``i_k`` is the physical dimension, and the remaining axes correspond to the
    children in the order given by ``neighbors[k]`` with the parent filtered out.
    """

    cores: List[jnp.ndarray]
    neighbors: List[List[int]]
    root: Optional[int] = None
    parent: Optional[List[int]] = None

    @property
    def n_nodes(self) -> int:
        return len(self.cores)

    @property
    def n_dims(self) -> int:
        return self.n_nodes

    def _build_parent(self, root: int) -> List[int]:
        parent = [-1] * self.n_nodes
        parent[root] = root
        stack = [root]
        while stack:
            node = stack.pop()
            for nbr in self.neighbors[node]:
                if parent[nbr] != -1:
                    continue
                parent[nbr] = node
                stack.append(nbr)
        return parent

    def _resolve_parent(self) -> List[int]:
        if self.parent is not None:
            assert len(self.parent) == self.n_nodes
            return list(self.parent)
        root = 0 if self.root is None else self.root
        return self._build_parent(root)

    def _resolve_root(self, parent: List[int]) -> int:
        if self.root is not None:
            return self.root
        if -1 in parent:
            return parent.index(-1)
        for idx, value in enumerate(parent):
            if idx == value:
                return idx
        return 0

    def validate_tree(self) -> None:
        n_nodes = self.n_nodes
        assert len(self.neighbors) == n_nodes

        for node, nbrs in enumerate(self.neighbors):
            assert node not in nbrs
            assert len(set(nbrs)) == len(nbrs)
            for nbr in nbrs:
                assert 0 <= nbr < n_nodes
                assert node in self.neighbors[nbr]

        parent = self._resolve_parent()
        root = self._resolve_root(parent)
        assert 0 <= root < n_nodes

        visited = set()
        stack = [(root, -1)]
        while stack:
            node, prev = stack.pop()
            if node in visited:
                raise AssertionError("Graph contains a cycle")
            visited.add(node)
            for nbr in self.neighbors[node]:
                if nbr == prev:
                    continue
                stack.append((nbr, node))
        assert len(visited) == n_nodes

        for node, core in enumerate(self.cores):
            degree = len(self.neighbors[node])
            assert core.ndim == 2 + degree

        children_by_node = []
        for node in range(n_nodes):
            node_parent = parent[node]
            children = [nbr for nbr in self.neighbors[node] if nbr != node_parent]
            children_by_node.append(children)

        for node, core in enumerate(self.cores):
            if node == root:
                assert core.shape[0] == 1
                continue
            parent_node = parent[node]
            parent_children = children_by_node[parent_node]
            child_index = parent_children.index(node)
            parent_axis = 2 + child_index
            assert core.shape[0] == self.cores[parent_node].shape[parent_axis]

    def postorder(self) -> List[int]:
        parent = self._resolve_parent()
        root = self._resolve_root(parent)
        children_by_node = []
        for node in range(self.n_nodes):
            node_parent = parent[node]
            children = [nbr for nbr in self.neighbors[node] if nbr != node_parent]
            children_by_node.append(children)

        order = []
        stack = [(root, 0)]
        while stack:
            node, idx = stack.pop()
            children = children_by_node[node]
            if idx < len(children):
                stack.append((node, idx + 1))
                stack.append((children[idx], 0))
            else:
                order.append(node)
        return order


@struct.dataclass
class TT:
    """
    Tensor Train (TT) 表示法。

    一个 n 维张量 A[i1, i2, ..., in] 被表示成 n 个 core 的连乘：
      core_k 的形状是 (r_{k}, dim_k, r_{k+1})
    其中：
      - dim_k 是第 k 维的大小（物理维度）
      - r_k 是 TT rank（内部连接维度）
      - 约定 r_0 = r_{n} = 1，这样整条链最终收缩成标量/张量元素

    这里 cores 存放的是一个 list，每个元素是 jnp.ndarray（三维）
    """

    @classmethod
    def zeros(cls, dims: Sequence[int], rs: Sequence[int]) -> TT:
        """
        构造一个全 0 的 TT。

        参数：
          dims: 每个维度的大小 [dim1, dim2, ..., dim_n]
          rs:   TT ranks（不包含两端的 1）[r1, r2, ..., r_{n-1}]
                注意长度必须是 n-1

        返回：
          TT 对象，其中每个 core 都是全 0 数组。
        """
        # TT 的标准约束：n 个 dims 对应 n-1 个内部 rank
        assert len(dims) == len(rs) + 1

        # 两端 rank 固定为 1：r0=1, rn=1
        rs = [1] + list(rs) + [1]

        # 逐个维度创建 core：形状 (r_left, dim, r_right)
        cores = [jnp.zeros((rs[i], dim, rs[i + 1])) for i, dim in enumerate(dims)]

        return cls(cores)

    @classmethod
    def generate_random(cls, key: jnp.ndarray, dims: Sequence[int], rs: Sequence[int]) -> TT:
        """
        随机生成一个 TT（每个 core 元素 ~ N(0,1)）。

        参数：
          key:  JAX 随机数 key
          dims: 每个维度大小
          rs:   内部 ranks（长度 n-1）

        返回：
          TT 对象，cores 为随机正态。
        """
        assert len(dims) == len(rs) + 1

        rs = [1] + list(rs) + [1]

        # 为每个 core 分配一个子 key，避免随机数重复
        keys = jax.random.split(key, len(dims))

        # 对每个维度 dim 生成 (r_left, dim, r_right) 的随机 core
        cores = [
            jax.random.normal(key, (rs[i], dim, rs[i + 1]))
            for i, (dim, key) in enumerate(zip(dims, keys))
        ]

        return cls(cores)

    # TT 的核心数据：n 个 core，每个 core 是 (r_left, dim, r_right)
    cores: List[jnp.ndarray]

    @property
    def n_dims(self):
        """返回张量的维数 n（也就是 core 的个数）。"""
        return len(self.cores)

    @property
    def full_tensor(self) -> jnp.ndarray:
        """
        把 TT 还原成“完整的高维张量”。

        实现方式：
          从第一个 core 开始，依次与后续 core 做 einsum 收缩 TT rank 维度。
          每次收缩掉上一个结果的右 rank，与下一个 core 的左 rank 对齐。

        结果形状：
          (dim1, dim2, ..., dim_n)
        """
        res = self.cores[0]  # (1, dim1, r2)
        for core in self.cores[1:]:
            # res:  (..., r)   core: (r, i, R)
            # -> (..., i, R)  把 r 收缩掉，拼接出新的物理维度 i
            res = jnp.einsum('...r,riR->...iR', res, core)

        # TT 两端 rank 都是 1，所以第一维和最后一维可以 squeeze 掉
        return jnp.squeeze(res, (0, -1))

    def reverse(self) -> TT:
        """
        把 TT 的维度顺序反过来（核心顺序翻转）。

        注意：
          翻转 core 的顺序后，每个 core 的左右 rank 方向也反了，
          所以需要对 core 做 transpose_core 来交换左右 rank 轴。
        """
        return TT([transpose_core(core) for core in self.cores[::-1]])

    def astype(self, dtype: jnp.dtype) -> TT:
        """把 TT 的所有 core 转成指定 dtype（例如 float32 / float64）。"""
        return TT([core.astype(dtype) for core in self.cores])

    def __sub__(self, other: TT):
        """定义 TT 的减法：self - other。"""
        return subtract(self, other)

    def as_ttns(self) -> TTNS:
        n_dims = self.n_dims
        neighbors = [[] for _ in range(n_dims)]
        for idx in range(n_dims):
            if idx > 0:
                neighbors[idx].append(idx - 1)
            if idx < n_dims - 1:
                neighbors[idx].append(idx + 1)
        return TTNS(self.cores, neighbors, root=0)


@struct.dataclass
class TTOperator:
    """
    TT 格式的线性算子（你可以理解成高维矩阵/算子）。

    如果普通矩阵是 2D：A[out, in]
    那高维算子可以看成：A[i1..in, j1..jn]（输入 n 维 -> 输出 n 维）

    TT Operator 用 n 个 4D core 表示，每个 core 形状：
      (r_left, dim_from, dim_to, r_right)
    """

    @classmethod
    def generate_random(
        cls, key: jnp.ndarray, dims_from: Sequence[int], dims_to: Sequence[int], rs: Sequence[int]
    ) -> TTOperator:
        """
        随机生成一个 TT Operator（每个 core 元素 ~ N(0,1)）。

        参数：
          key:       JAX random key
          dims_from: 输入每维大小 [din1, din2, ..., din_n]
          dims_to:   输出每维大小 [dout1, dout2, ..., dout_n]
          rs:        内部 ranks（长度 n-1）

        返回：
          TTOperator 对象
        """
        n_dims = len(dims_from)

        # 基本一致性检查
        assert len(dims_from) == n_dims
        assert len(dims_to) == n_dims
        assert len(rs) + 1 == n_dims

        rs = [1] + list(rs) + [1]
        keys = jax.random.split(key, n_dims)

        # 每个 core 是 4D：(r_left, dim_from, dim_to, r_right)
        cores = [
            jax.random.normal(key, (rs[i], dim_from, dim_to, rs[i + 1]))
            for i, (dim_from, dim_to, key) in enumerate(zip(dims_from, dims_to, keys))
        ]

        return cls(cores)

    cores: List[jnp.ndarray]

    @property
    def full_operator(self) -> jnp.ndarray:
        """
        把 TT Operator 还原成完整算子（一个巨大的高维张量/矩阵）。

        逐 core einsum：
          res:  (..., r)
          core: (r, i, j, R)
          -> (..., i, j, R)

        最终 squeeze 掉两端 rank=1。
        结果形状：
          (din1, dout1, din2, dout2, ..., din_n, dout_n)
        （具体排列取决于 einsum 的写法，这里是按每个维度生成一对 (i,j)）
        """
        res = self.cores[0]
        for core in self.cores[1:]:
            res = jnp.einsum('...r,rijR->...ijR', res, core) # Einstein求和约定
        return jnp.squeeze(res, (0, -1)) 

    def reverse(self):
        """
        反转 operator 的 core 顺序。

        这里作者留了句注释：
          "idk, what should I do with axes 1 and 2."
        意思是：输入/输出物理轴 (dim_from, dim_to) 是否要交换、怎么交换，
        其实取决于你希望 reverse 后代表什么数学对象（是反转维度顺序？还是转置算子？）。

        当前实现：
          - core 顺序翻转
          - 把 rank 轴对调：把 axis 0 和 axis 3 互换
          - axis 1/2（输入/输出物理轴）保持不变
        """
        return TTOperator(
            [jnp.moveaxis(core, (0, 1, 2, 3), (3, 1, 2, 0)) for core in self.cores[::-1]]
        )
    def astype(self, dtype: jnp.dtype) -> "TTOperator":
        return TTOperator([core.astype(dtype) for core in self.cores])


def transpose_core(core: jnp.ndarray) -> jnp.ndarray:
    """
    对 TT core 做“左右 rank 交换”。

    输入 core 形状：(r_left, dim, r_right)
    输出形状：(r_right, dim, r_left)

    用途：
      TT.reverse() 翻转 core 顺序时，需要把连接方向也翻过来。
    """
    return jnp.moveaxis(core, (0, 1, 2), (2, 1, 0))


def subtract(lhs: TT, rhs: TT) -> TT:
    """
    计算两个 TT 的差：lhs - rhs，并返回一个新的 TT。

    关键点（非常重要）：
    - 一般 TT 直接相减后，结果的 TT rank 会变大
    - 这里用的是经典的“block-diagonal 拼接”构造法：
        - 第一个 core：在右 rank 方向拼接 [lhs, -rhs]
        - 中间 core：做 2x2 的块对角拼接（lhs 在左上，rhs 在右下）
        - 最后一个 core：在左 rank 方向拼接 [lhs; rhs]

    这样保证：
      TT(full) = TT(lhs) - TT(rhs)

    代价：
      ranks 会变成原来的大约“相加”（更准确：中间 rank 变成 r1+r2）。
    """
    assert lhs.n_dims == rhs.n_dims

    # 只有 1 个维度时，TT 就是一个 (1, dim, 1) 的 core，直接相减即可
    if lhs.n_dims == 1:
        return TT([lhs.cores[0] - rhs.cores[0]])

    # 第一个 core：沿着右 rank 维（axis=-1）拼接
    # lhs: (1, d1, r1)  rhs: (1, d1, r1')
    # -> (1, d1, r1+r1')
    first = jnp.concatenate([lhs.cores[0], -rhs.cores[0]], axis=-1)

    # 最后一个 core：沿着左 rank 维（axis=0）拼接
    # lhs: (r_{n-1}, dn, 1)  rhs: (r'_{n-1}, dn, 1)
    # -> (r_{n-1}+r'_{n-1}, dn, 1)
    last = jnp.concatenate([lhs.cores[-1], rhs.cores[-1]], axis=0)

    # 中间 core：做块对角拼接（2x2 block）
    # 对每个位置 k：
    #   [ c1   0 ]
    #   [ 0   c2 ]
    inner = [
        jnp.concatenate(
            [
                # 上半块： [c1, 0]
                jnp.concatenate([c1, jnp.zeros((c1.shape[0], c1.shape[1], c2.shape[2]))], axis=-1),
                # 下半块： [0, c2]
                jnp.concatenate([jnp.zeros((c2.shape[0], c2.shape[1], c1.shape[2])), c2], axis=-1),
            ],
            axis=0,  # 沿左 rank 方向拼接成上下两块
        )
        for c1, c2 in zip(lhs.cores[1:-1], rhs.cores[1:-1])
    ]

    return TT([first] + inner + [last])


+ 上面加载ttns的基本运算

In [20]:
import jax
import jax.numpy as jnp

def assert_allclose(a, b, rtol=1e-5, atol=1e-6, name=""):
    ok = jnp.allclose(a, b, rtol=rtol, atol=atol)
    if not bool(ok):
        max_err = jnp.max(jnp.abs(a - b))
        raise AssertionError(f"[{name}] not close, max|err|={float(max_err)}")

def tt_eval_by_definition(tt, idxs):
    # idxs: tuple(i1,...,id)
    res = tt.cores[0][:, idxs[0], :]          # shape (1, r1)
    for k in range(1, tt.n_dims):
        res = res @ tt.cores[k][:, idxs[k], :]  # (1, rk)
    return res[0, 0]

def test_tt_full_tensor(tt):
    dense = tt.full_tensor
    d = tt.n_dims
    dims = dense.shape
    # 全枚举（小规模才跑）
    for flat in range(int(jnp.prod(jnp.array(dims)))):
        idxs = jnp.unravel_index(flat, dims)
        val_def = tt_eval_by_definition(tt, tuple(int(x) for x in idxs))
        assert_allclose(val_def, dense[idxs], name="TT.full_tensor vs definition")

def op_eval_by_definition(op, ins, outs):
    # ins: (i1,...,id), outs: (j1,...,jd)
    # core shape: (rL, dim_from, dim_to, rR) => (rL, i, j, rR)
    res = op.cores[0][:, ins[0], outs[0], :]  # (1, r1)
    for k in range(1, len(op.cores)):
        res = res @ op.cores[k][:, ins[k], outs[k], :]
    return res[0, 0]

def test_op_full_operator(op):
    full = op.full_operator
    d = len(op.cores)
    din = [op.cores[k].shape[1] for k in range(d)]
    dout = [op.cores[k].shape[2] for k in range(d)]

    # 全枚举（小规模才跑）
    for flat_in in range(int(jnp.prod(jnp.array(din)))):
        ins = jnp.unravel_index(flat_in, din)
        ins = tuple(int(x) for x in ins)
        for flat_out in range(int(jnp.prod(jnp.array(dout)))):
            outs = jnp.unravel_index(flat_out, dout)
            outs = tuple(int(x) for x in outs)

            # full 的索引是 (i1,j1,i2,j2,...)
            idx = []
            for k in range(d):
                idx += [ins[k], outs[k]]
            idx = tuple(idx)

            val_def = op_eval_by_definition(op, ins, outs)
            assert_allclose(val_def, full[idx], name="TTOperator.full_operator vs definition")


# 假设你已经有 normalized_dot_operator, TTOpt, TTOperatorOpt
def test_dot_operator(tt, op):
    # dense X
    X = tt.full_tensor                         # shape (m1,...,md)
    m = X.shape
    d = len(m)

    # dense A (交错轴: i1,j1,i2,j2,...)
    A = op.full_operator                       # shape (m1,n1,m2,n2,...)
    n = tuple(op.cores[k].shape[2] for k in range(d))

    # 把 A 转成矩阵 [J, I]
    # 先把交错轴分组为 (i1,i2,...,id, j1,j2,...,jd) 或 (j..., i...) 都行，统一即可
    # 这里把 A 重新排列到 (j1,...,jd, i1,...,id)
    perm = []
    # 交错轴位置：i_k 在 2k，j_k 在 2k+1（0-based）
    for k in range(d):
        perm.append(2*k+1)   # j_k
    for k in range(d):
        perm.append(2*k)     # i_k
    A_mat = jnp.transpose(A, perm)             # shape (n1..nd, m1..md)
    A_mat = A_mat.reshape(int(jnp.prod(jnp.array(n))), int(jnp.prod(jnp.array(m))))

    X_vec = X.reshape(-1)
    Y_true = (A_mat @ X_vec).reshape(n)

    # TT 侧计算
    tt_opt = TTOpt.from_tt(tt)
    op_opt = TTOperatorOpt.from_tt_operator(op)
    y_opt = normalized_dot_operator(tt_opt, op_opt)
    Y = y_opt.to_nonopt_tt().full_tensor       # dense output

    assert_allclose(Y, Y_true, name="normalized_dot_operator correctness")


def recover_scalar(nv):
    # nv.value 可能是 (1,1)；统一 squeeze
    return jnp.exp(nv.log_norm) * jnp.squeeze(nv.value)

def test_normalized_inner_product(tt1, tt2):
    X = tt1.full_tensor.reshape(-1)
    Y = tt2.full_tensor.reshape(-1)
    true_ip = jnp.vdot(X, Y)  # 实数时就是 sum(X*Y)

    nv = normalized_inner_product(TTOpt.from_tt(tt1), TTOpt.from_tt(tt2))
    ip = recover_scalar(nv)

    assert_allclose(ip, true_ip, name="normalized_inner_product correctness")

def test_subtract(lhs, rhs):
    out = (lhs - rhs).full_tensor
    true = lhs.full_tensor - rhs.full_tensor
    assert_allclose(out, true, name="TT subtract correctness")

def test_reverse(tt):
    X = tt.full_tensor
    Y = tt.reverse().full_tensor
    true = jnp.transpose(X, axes=list(range(tt.n_dims))[::-1])
    assert_allclose(Y, true, name="TT reverse correctness")





In [21]:
def run_all_tests():
    key = jax.random.PRNGKey(0)

    # 小规模
    dims = [3, 2, 4, 3]   # d=4
    rs   = [2, 3, 2]      # 长度 d-1

    # 生成 TT
    from ttde.tt.tensors import TT, TTOperator  # 你项目里真实 import
    tt1 = TT.generate_random(key, dims, rs).astype(jnp.float64)
    tt2 = TT.generate_random(jax.random.PRNGKey(1), dims, rs).astype(jnp.float64)

    # 生成算子（这里用方阵：dims_from=dims_to）
    op = TTOperator.generate_random(jax.random.PRNGKey(2), dims, dims, rs).astype(jnp.float64)

    test_tt_full_tensor(tt1)
    test_tt_full_tensor(tt2)

    test_op_full_operator(op)

    test_normalized_inner_product(tt1, tt2)

    test_dot_operator(tt1, op)

    test_subtract(tt1, tt2)
    test_reverse(tt1)

    print("ALL TESTS PASSED")
run_all_tests()


AttributeError: 'TTOperator' object has no attribute 'astype'