# Python 知识

# 代码分析

## 类继承关系
```mermaid
classDiagram
    %% 元类层次结构
    class typing.Protocol
    class SplitterT
    class BaseSplitter
    class RangeSplitter
    class RollingSplitter
    class ExpandingSplitter
    
    %% 继承关系
    typing.Protocol <|-- SplitterT
    BaseSplitter <|-- RangeSplitter
    BaseSplitter <|-- RollingSplitter
    BaseSplitter <|-- ExpandingSplitter
```

## def split_ranges_into_sets

### 例子

In [None]:
from vectorbt.generic.splitters import split_ranges_into_sets

In [None]:
# 示例1：简单分割，50%训练，50%测试
start_idxs = [0, 50]
end_idxs = [99, 149]
set_lens = (0.5,)  # 50%作为第一个数据集
for train_idx, test_idx in split_ranges_into_sets(start_idxs, end_idxs, set_lens):
    print(f"训练集: {train_idx}, 测试集: {test_idx}")

In [None]:
# 示例2：三分割，50%训练，25%验证，25%测试
start_idxs = [0, 50]
end_idxs = [99, 149]
set_lens = (0.5, 0.25)  # 50%训练，25%验证，剩余25%测试
for train_idx, valid_idx, test_idx in split_ranges_into_sets(start_idxs, end_idxs, set_lens):
    print(f"训练集: {train_idx}, 验证集: {valid_idx}, 测试集: {test_idx}")

In [None]:
# 示例3：固定数量分割
start_idxs = [0, 50]
end_idxs = [99, 149]
set_lens = (50, 30)  # 50个样本训练，30个样本验证，剩余测试
for train_idx, valid_idx, test_idx in split_ranges_into_sets(start_idxs, end_idxs, set_lens):
    print(f"训练集: {train_idx}, 验证集: {valid_idx}, 测试集: {test_idx}")

In [None]:
start_idxs = [0, 50]
end_idxs = [99, 149]
# 示例4：反向分割（测试集在前）
set_lens = (50, 30)
left_to_right = False  # 变长训练集在前，固定测试集在后
for train_idx, valid_idx, test_idx in split_ranges_into_sets(start_idxs, end_idxs, set_lens, left_to_right):
    print(f"训练集: {train_idx}, 验证集: {valid_idx}, 测试集: {test_idx}")

### 源码

```python
def split_ranges_into_sets(start_idxs: tp.ArrayLike,
                           end_idxs: tp.ArrayLike,
                           set_lens: tp.MaybeSequence[tp.Sequence[float]] = (),
                           left_to_right: tp.MaybeSequence[bool] = True) -> RangesT:
    start_idxs = np.asarray(start_idxs)
    end_idxs = np.asarray(end_idxs)
    checks.assert_len_equal(start_idxs, end_idxs)

    for i in range(len(start_idxs)):
        start_idx = start_idxs[i]
        end_idx = end_idxs[i]

        range_len = end_idx - start_idx + 1
        new_set_lens = []
        if len(set_lens) == 0:
            yield (np.arange(start_idx, end_idx + 1),)
        else:
            if checks.is_sequence(set_lens[0]):
                _set_lens = set_lens[i]
            else:
                _set_lens = set_lens
            if checks.is_sequence(left_to_right):
                _left_to_right = left_to_right[i]
            else:
                _left_to_right = left_to_right
            for j, set_len in enumerate(_set_lens):
                if 0 < set_len < 1:
                    set_len = math.floor(set_len * range_len)
                if set_len == 0:
                    raise ValueError(f"Set {j} in the range {i} is empty")
                new_set_lens.append(set_len)
            if sum(new_set_lens) < range_len:
                if _left_to_right:
                    new_set_lens = new_set_lens + [range_len - sum(new_set_lens)]
                else:
                    new_set_lens = [range_len - sum(new_set_lens)] + new_set_lens
            else:
                raise ValueError(f"Range of length {range_len} too short to split into {len(_set_lens) + 1} sets")

            idx_offset = 0
            set_ranges = []
            for set_len in new_set_lens:
                new_idx_offset = idx_offset + set_len
                set_ranges.append(np.arange(start_idx + idx_offset, start_idx + new_idx_offset))
                idx_offset = new_idx_offset

            yield tuple(set_ranges)
```

## class SplitterT(tp.Protocol)
分割器协议类型，任何实现了 `def split(self, X: tp.ArrayLike, **kwargs) -> RangesT` 的类型都是 `SplitterT` 类型。

```python
class SplitterT(tp.Protocol):
    def split(self, X: tp.ArrayLike, **kwargs) -> RangesT:
        ...
```

## class BaseSplitter
分割器基类，具体类型的分割器须继承该类后实现 `def split(self, X: tp.ArrayLike, **kwargs) -> RangesT` 方法。

```python
class BaseSplitter:
    def split(self, X: tp.ArrayLike, **kwargs) -> RangesT:
        raise NotImplementedError
```

## class RangeSplitter(BaseSplitter)
将时间序列数据分割成指定数量或长度的连续区间。

`split` 方法的参数：
- `X` (array-like): 要分割的数据，可以是pandas Series/DataFrame或numpy数组
- `n` (int, optional): 分割的区间数量
  - 如果range_len未指定，数据将被均匀分成n个区间
  - 如果同时指定，将从可能的区间中均匀选择n个
- `range_len` (float, optional): 每个区间的长度
  - 如果是0-1之间的小数，表示占总长度的比例
  - 如果是>=1的整数，表示绝对长度
  - 如果n未指定，将生成尽可能多的区间
- `min_len` (int): 区间的最小长度，短于此长度的区间将被过滤
- `start_idxs` (array-like, optional): 自定义起始索引数组
  - 可以是numpy数组（绝对位置）或pandas Index（标签）
- `end_idxs` (array-like, optional): 自定义结束索引数组
  - 可以是numpy数组（绝对位置）或pandas Index（标签）
  - 结束索引是包含的（inclusive）
- `**kwargs`: 传递给split_ranges_into_sets的额外参数

返回：
    RangesT: 分割结果生成器，每次迭代返回该分割的所有数据集索引

In [None]:
import pandas as pd
import numpy as np
from vectorbt.generic.splitters import RangeSplitter

dates = pd.date_range('2020-01-01', periods=100, freq='D')
prices = pd.Series(np.random.randn(100).cumsum(), index=dates)

splitter = RangeSplitter()

# 示例1：分成3个等长区间
for ranges in splitter.split(prices, n=3):
    print(ranges)

In [None]:
# 示例2：每个区间30天
for ranges in splitter.split(prices, range_len=30):
    print(ranges)

In [None]:
# 示例3：自定义起始和结束日期
start_dates = pd.Index(['2020-01-01', '2020-02-01', '2020-03-01'])
end_dates = pd.Index(['2020-01-31', '2020-02-29', '2020-03-31'])
for ranges in splitter.split(prices, start_idxs=start_dates, end_idxs=end_dates):
    print(ranges)

```python
class RangeSplitter(BaseSplitter):
    def split(self,
              X: tp.ArrayLike,
              n: tp.Optional[int] = None,
              range_len: tp.Optional[float] = None,
              min_len: int = 1,
              start_idxs: tp.Optional[tp.ArrayLike] = None,
              end_idxs: tp.Optional[tp.ArrayLike] = None, **kwargs) -> RangesT:
        X = to_any_array(X)
        if isinstance(X, (pd.Series, pd.DataFrame)):
            index = X.index
        else:
            index = pd.Index(np.arange(X.shape[0]))

        if start_idxs is None and end_idxs is None:
            if range_len is None and n is None:
                raise ValueError("At least n, range_len, or start_idxs and end_idxs must be set")
            if range_len is None:
                range_len = len(index) // n
            if 0 < range_len < 1:
                range_len = math.floor(range_len * len(index))
            start_idxs = np.arange(len(index) - range_len + 1)
            end_idxs = np.arange(range_len - 1, len(index))
        elif start_idxs is None or end_idxs is None:
            raise ValueError("Both start_idxs and end_idxs must be set")
        else:
            if isinstance(start_idxs, pd.Index):
                start_idxs = np.asarray([find_first_occurrence(idx, index) for idx in start_idxs])
            else:
                start_idxs = np.asarray(start_idxs)
            if isinstance(end_idxs, pd.Index):
                end_idxs = np.asarray([find_first_occurrence(idx, index) for idx in end_idxs])
            else:
                end_idxs = np.asarray(end_idxs)

        start_idxs, end_idxs = np.broadcast_arrays(start_idxs, end_idxs)
        range_lens = end_idxs - start_idxs + 1
        min_len_mask = range_lens >= min_len
        if not np.any(min_len_mask):
            raise ValueError(f"There are no ranges that meet range_len>={min_len}")
        start_idxs = start_idxs[min_len_mask]
        end_idxs = end_idxs[min_len_mask]

        if n is not None:
            if n > len(start_idxs):
                raise ValueError(f"n cannot be bigger than the maximum number of ranges {len(start_idxs)}")
            idxs = np.round(np.linspace(0, len(start_idxs) - 1, n)).astype(int)
            start_idxs = start_idxs[idxs]
            end_idxs = end_idxs[idxs]

        return split_ranges_into_sets(start_idxs, end_idxs, **kwargs)
```

## class RollingSplitter(BaseSplitter)
滚动分割器：固定大小的时间窗口在时间序列上滑动。

`split` 方法的参数：
- `X` (array-like): 要分割的时间序列数据
- `n` (int, optional): 要生成的窗口数量
  - 如果指定，将从所有可能的窗口中均匀选择n个
  - 如果未指定，将生成所有可能的窗口
- `window_len` (float, optional): 窗口长度
  - 如果是0-1之间的小数，表示占总长度的比例
  - 如果是>=1的整数，表示绝对长度
  - 如果未指定，将根据n计算窗口长度
- `min_len` (int): 窗口的最小长度，短于此长度的窗口将被过滤
- `**kwargs`: 传递给split_ranges_into_sets的额外参数，如：
  - `set_lens`: 指定训练集、验证集、测试集的长度比例
  - `left_to_right`: 指定分割方向

返回值：
    RangesT: 分割结果生成器，每次迭代返回该窗口的所有数据集索引

In [None]:
import pandas as pd
import numpy as np
from vectorbt.generic.splitters import RollingSplitter

# 创建示例数据（1年的日度数据）
dates = pd.date_range('2020-01-01', periods=365, freq='D')
prices = pd.Series(np.random.randn(365).cumsum(), index=dates)

splitter = RollingSplitter()

# 示例1：30天窗口，生成所有可能的窗口
windows = list(splitter.split(prices, window_len=30))
print(f"生成了 {len(windows)} 个窗口")
for ranges in splitter.split(prices, window_len=30):  # 显示前3个窗口
    print(ranges)

In [None]:
# 示例2：选择10个窗口，每个窗口60天
windows = list(splitter.split(prices, n=10, window_len=60))
print(f"选择了 {len(windows)} 个窗口")
for ranges in splitter.split(prices, n=10, window_len=60):
    print(ranges)

In [None]:
# 示例3：滚动窗口分割为训练集和测试集
# 80%用于训练，20%用于测试
length = len(list(splitter.split(prices, window_len=50, set_lens=(0.8,))))
print(f"窗口数量: {length}")
for range in splitter.split(prices, window_len=50, set_lens=(0.8,)):
    print(range)

```python
class RollingSplitter(BaseSplitter):
    def split(self,
              X: tp.ArrayLike,
              n: tp.Optional[int] = None,
              window_len: tp.Optional[float] = None,
              min_len: int = 1,
              **kwargs) -> RangesT:
        """Split by rolling a window.

        `**kwargs` are passed to `split_ranges_into_sets`."""
        X = to_any_array(X)
        if isinstance(X, (pd.Series, pd.DataFrame)):
            index = X.index
        else:
            index = pd.Index(np.arange(X.shape[0]))

        if window_len is None and n is None:
            raise ValueError("At least n or window_len must be set")
        if window_len is None:
            window_len = len(index) // n
        if 0 < window_len < 1:
            window_len = math.floor(window_len * len(index))
        start_idxs = np.arange(len(index) - window_len + 1)
        end_idxs = np.arange(window_len - 1, len(index))

        window_lens = end_idxs - start_idxs + 1
        min_len_mask = window_lens >= min_len
        if not np.any(min_len_mask):
            raise ValueError(f"There are no ranges that meet window_len>={min_len}")
        start_idxs = start_idxs[min_len_mask]
        end_idxs = end_idxs[min_len_mask]

        if n is not None:
            if n > len(start_idxs):
                raise ValueError(f"n cannot be bigger than the maximum number of windows {len(start_idxs)}")
            idxs = np.round(np.linspace(0, len(start_idxs) - 1, n)).astype(int)
            start_idxs = start_idxs[idxs]
            end_idxs = end_idxs[idxs]

        return split_ranges_into_sets(start_idxs, end_idxs, **kwargs)
```

## class ExpandingSplitter(BaseSplitter)
扩展分割器：窗口起始位置固定，窗口大小逐步增加。

`split` 方法的参数：
- `X` (array-like): 要分割的时间序列数据
- `n` (int, optional): 要生成的窗口数量
  - 如果指定，将从所有可能的窗口中均匀选择n个
  - 如果未指定，将生成所有可能的窗口
- `min_len` (int): 窗口的最小长度，短于此长度的窗口将被过滤
- `**kwargs`: 传递给split_ranges_into_sets的额外参数，如：
  - `set_lens`: 指定训练集、验证集、测试集的长度比例
  - `left_to_right`: 指定分割方向

返回：
    RangesT: 分割结果生成器，每次迭代返回该窗口的所有数据集索引

In [None]:
import pandas as pd
import numpy as np
from vectorbt.generic.splitters import ExpandingSplitter

# 创建示例数据（1年的日度数据）
dates = pd.date_range('2020-01-01', periods=365, freq='D')
prices = pd.Series(np.random.randn(365).cumsum(), index=dates)

splitter = ExpandingSplitter()

# 示例1：生成所有可能的扩展窗口
windows = list(splitter.split(prices, min_len=30))
print(f"生成了 {len(windows)} 个窗口")
for ranges in splitter.split(prices, min_len=30):  # 显示前3个窗口
    print(ranges)

In [None]:
# 示例2：选择10个扩展窗口
windows = list(splitter.split(prices, n=10, min_len=30))
print(f"选择了 {len(windows)} 个窗口")
for ranges in splitter.split(prices, n=10, min_len=30):
    print(ranges)

In [None]:
# 示例3：扩展窗口分割为训练集和测试集
# 前80%用于训练，后20%用于测试
for ranges in splitter.split(prices, n=5, set_lens=(0.8,), min_len=50):
    print(ranges)

```python
class ExpandingSplitter(BaseSplitter):
    def split(self,
              X: tp.ArrayLike,
              n: tp.Optional[int] = None,
              min_len: int = 1,
              **kwargs) -> RangesT:

        X = to_any_array(X)
        if isinstance(X, (pd.Series, pd.DataFrame)):
            index = X.index
        else:
            index = pd.Index(np.arange(X.shape[0]))

        start_idxs = np.full(len(index), 0)
        end_idxs = np.arange(len(index))

        window_lens = end_idxs - start_idxs + 1
        min_len_mask = window_lens >= min_len
        if not np.any(min_len_mask):
            raise ValueError(f"There are no ranges that meet window_len>={min_len}")
        start_idxs = start_idxs[min_len_mask]
        end_idxs = end_idxs[min_len_mask]

        if n is not None:
            if n > len(start_idxs):
                raise ValueError(f"n cannot be bigger than the maximum number of windows {len(start_idxs)}")
            idxs = np.round(np.linspace(0, len(start_idxs) - 1, n)).astype(int)
            start_idxs = start_idxs[idxs]
            end_idxs = end_idxs[idxs]

        return split_ranges_into_sets(start_idxs, end_idxs, **kwargs)
```