# DiscoSeqSampler 多模态数据处理示例

本 notebook 演示如何使用 DiscoSeqSampler 和 Lhotse 处理音频、图像和视频数据，创建用于训练的 cuts 文件。

## 第一部分：音频数据处理

音频 cuts 是包含音频片段及其元数据的基本单位。每个 cut 包含：
- 音频文件路径和基本信息（采样率、时长等）
- 监督信息（转录文本、说话人信息等）
- 自定义字段（语言、情感标签等）

### 方法 1：使用 Lhotse CLI（推荐）

对于标准数据集（如 LibriTTS），推荐使用 Lhotse 的命令行工具：

```bash
# 下载并准备 LibriTTS 数据集
lhotse download libritts -p dev-clean tests/data
lhotse prepare libritts -p dev-clean tests/data/LibriTTS tests/data/manifests/libritts

# 创建 cuts 文件
lhotse cut simple --force-eager \
    -r tests/data/manifests/libritts/libritts_recordings_dev-clean.jsonl.gz \
    -s tests/data/manifests/libritts/libritts_supervisions_dev-clean.jsonl.gz \
    examples/audio_cuts.jsonl.gz
```

### 方法 2：使用 Python 代码（自定义数据）

如果您有自定义的音频数据，可以使用以下代码创建 cuts：

In [None]:
from lhotse import CutSet, MonoCut, Recording, SupervisionSegment
from lhotse.audio.source import audioInfo
from lhotse.audio import AudioSource, AudioLoadingError
from pathlib import Path
from pprint import pprint

def make_audio_cuts_file(audio_dir):
    """
    从音频文件目录创建 cuts 文件。
    
    Args:
        audio_dir (str): 包含音频文件的目录路径
        
    Returns:
        None: 生成的 cuts 保存为 'audio_cuts.jsonl.gz'
    """
    cuts = []
    
    # 遍历目录中的所有音频文件（这里以 .mp4 为例，可根据需要修改扩展名）
    for audio_file in sorted(Path(audio_dir).glob("*/*.mp4")):
        # 从音频文件创建 Recording 对象，自动提取音频元数据
        recording = Recording.from_file(audio_file)

        # 将 Recording 转换为 Cut 对象
        cut = recording.to_cut()
        
        # 创建监督信息（包含转录文本、时间戳等）
        supervision = SupervisionSegment(
            id=recording.id,           # 监督段的唯一标识符
            recording_id=recording.id, # 对应的录音 ID
            start=0,                   # 开始时间（秒）
            duration=recording.duration, # 持续时间（秒）
            channel=0,                 # 音频通道（0 表示第一个通道）
            text="audio Text",         # 转录文本（实际应用中应替换为真实文本）
        )
        
        # 将监督信息附加到 cut 对象
        cut.supervisions.append(supervision)
        cuts.append(cut)
        
        # 每处理 10 个文件输出一次进度
        if len(cuts) % 10 == 0:
            print(f"\n已处理 {len(cuts)} 个音频文件...")
            pprint(cut.to_dict())
        
    # 创建 CutSet 并保存
    cutset = CutSet.from_cuts(cuts)
    cutset.to_jsonl("audio_cuts.jsonl.gz")
    print(f"已创建包含 {len(cuts)} 个音频 cuts 的文件：'audio_cuts.jsonl.gz'")

# 示例用法：修改为您的音频目录路径
audio_dir = "/Users/feiteng/Downloads/testdata/audios"
make_audio_cuts_file(audio_dir)

# 第二部分：图像数据处理

图像 cuts 用于处理图像数据，特别适用于视觉-语言模型训练。每个图像 cut 包含：
- 图像文件的路径和基本信息（宽度、高度等）
- 图像的文本描述或标注
- 自定义元数据（类别、标签等）

## 特性说明

- **灵活的图像附加方式**：支持文件路径、numpy 数组、字节数据等多种形式
- **自动元数据提取**：自动获取图像尺寸等信息
- **内存优化**：支持延迟加载，避免内存溢出
- **兼容性处理**：包含新旧版本 Lhotse 的兼容代码

## 使用场景

- 图像分类数据集处理
- 图像-文本配对数据准备
- 多模态模型训练数据创建

In [1]:
from lhotse.cut import MonoCut, CutSet
from pathlib import Path
from pprint import pprint
from typing import Union
import numpy as np


def attach_image(cut, key: str, path_or_object: Union[str, np.ndarray, bytes]):
    """
    将图像附加到 cut 对象的兼容性函数。
    
    适用于不同版本的 Lhotse，支持多种图像数据格式。
    
    Args:
        cut: Cut 对象
        key (str): 图像数据的键名
        path_or_object: 图像文件路径、numpy 数组或字节数据
        
    Returns:
        Cut: 附加了图像数据的 cut 对象
    """
    from lhotse.image.image import Image
    from lhotse.image.io import PillowInMemoryWriter

    # 处理不同类型的输入数据
    if isinstance(path_or_object, (str, Path)):
        # 文件路径：直接引用文件而不复制
        import PIL.Image as PILImage

        # 获取图像尺寸
        with PILImage.open(path_or_object) as img:
            width, height = img.size

        # 创建图像清单，指向原始文件
        path = Path(path_or_object)
        storage_key = str(path.name)      # 文件名
        storage_path = str(path.parent)   # 父目录路径

        image_manifest = Image(
            storage_type="pillow_files",
            storage_path=storage_path,
            storage_key=storage_key,
            width=width,
            height=height,
        )
    else:
        # numpy 数组或字节数据：使用内存写入器
        writer = PillowInMemoryWriter()
        with writer:
            image_manifest = writer.store_image(key, path_or_object)

    # 将图像清单附加到 cut 的自定义字段
    cut.custom[key] = image_manifest
    return cut

def make_image_cuts_file(image_dir):
    """
    从图像文件目录创建 cuts 文件。
    
    Args:
        image_dir (str): 包含图像文件的目录路径
        
    Returns:
        None: 生成的 cuts 保存为 'image_cuts.jsonl.gz'
    """
    cuts = []
    
    # 遍历目录中的所有图像文件
    for image_file in sorted(Path(image_dir).glob("*/*.jpg")):
        # 创建基础的 MonoCut 对象
        cut = MonoCut(
            id=image_file.stem,                        # 使用文件名（不含扩展名）作为 ID
            start=0.0,                                 # 开始时间
            duration=1.0,                              # 假设每张图像代表 1 秒的片段
            channel=0,                                 # 通道号
            supervisions=[],                           # 监督信息列表（可为空）
            custom={"image_text": "text prompt for image"},  # 自定义字段：图像描述文本
        )
        
        # 附加图像数据到 cut 对象
        try:
            # 尝试使用新版 Lhotse 的方法
            cut = cut.attach_image(key="image", path_or_object=image_file)
        except Exception as e:
            # 如果失败，使用兼容性函数
            cut = attach_image(
                cut=cut,
                key="image",
                path_or_object=image_file,
            )

        cuts.append(cut)

        # 每处理 100 个文件输出一次进度
        if len(cuts) % 100 == 0:
            print(f"\n已处理 {len(cuts)} 张图像...")
            pprint(cut.to_dict())
        
    # 创建 CutSet 并保存
    cutset = CutSet.from_cuts(cuts)
    cutset.to_jsonl("image_cuts.jsonl.gz")
    print(f"已创建包含 {len(cuts)} 个图像 cuts 的文件：'image_cuts.jsonl.gz'")

# 示例用法：修改为您的图像目录路径
image_dir = "/Users/feiteng/Downloads/testdata/images"
make_image_cuts_file(image_dir)


已处理 100 张图像...
{'channel': 0,
 'custom': {'image': {'height': 2560,
                      'storage_key': 'model-url-1.jpg',
                      'storage_path': '/Users/feiteng/Downloads/testdata/images/035',
                      'storage_type': 'pillow_files',
                      'width': 1920},
            'image_text': 'text prompt for image'},
 'duration': 1.0,
 'id': 'model-url-1',
 'start': 0.0,
 'supervisions': [],
 'type': 'MonoCut'}

已处理 200 张图像...
{'channel': 0,
 'custom': {'image': {'height': 2000,
                      'storage_key': 'model-url-1.jpg',
                      'storage_path': '/Users/feiteng/Downloads/testdata/images/069',
                      'storage_type': 'pillow_files',
                      'width': 1500},
            'image_text': 'text prompt for image'},
 'duration': 1.0,
 'id': 'model-url-1',
 'start': 0.0,
 'supervisions': [],
 'type': 'MonoCut'}

已处理 300 张图像...
{'channel': 0,
 'custom': {'image': {'height': 2462,
                      'storag

# 第三部分：视频数据处理

视频 cuts 是最复杂的数据类型，因为它们可能包含音频轨道、视频帧，以及相关的文本描述。

## 处理场景

### 1. 带音频轨道的视频
- 使用 `Recording.from_file()` 直接创建
- 自动提取音频信息（采样率、时长等）
- 保留视频元数据

### 2. 无音频轨道的视频
- 使用 `torchcodec` 或 `ffmpeg` 提取视频信息
- 手动创建 Recording 对象
- 设置空的音频通道

## 依赖要求

```bash
conda install ffmpeg          # 视频处理后端
pip install torchcodec       # 视频解码库（推荐）
```

## 视频元数据

每个视频 cut 包含：
- **基本信息**：时长、帧率、分辨率
- **音频信息**：采样率、通道数（如果有音频）
- **自定义字段**：文本描述、类别标签等

## 应用场景

- 视频分类数据集
- 视频-文本配对数据
- 音视频同步训练
- 多模态序列建模

In [2]:
# 安装视频处理依赖：conda install ffmpeg
from torchcodec.decoders import VideoDecoder
from lhotse import CutSet, MonoCut, Recording, SupervisionSegment
from lhotse.audio.source import VideoInfo
from lhotse.audio import AudioSource, AudioLoadingError
from pathlib import Path
from pprint import pprint

def make_video_cuts_file(video_dir):
    """
    从视频文件目录创建 cuts 文件。
    
    处理两种类型的视频：
    1. 带音频轨道的视频：直接使用 Recording.from_file()
    2. 无音频轨道的视频：手动创建 Recording 对象
    
    Args:
        video_dir (str): 包含视频文件的目录路径
        
    Returns:
        None: 生成的 cuts 保存为 'video_cuts.jsonl.gz'
    """
    cuts = []
    
    # 遍历目录中的所有视频文件
    for video_file in sorted(Path(video_dir).glob("*/*.mp4")):
        try:
            # 方法 1：尝试直接从视频文件创建 Recording（适用于带音频的视频）
            recording = Recording.from_file(video_file)
            
        except (AudioLoadingError, RuntimeError, TypeError) as e:
            # 方法 2：视频文件无音频轨道，手动创建 Recording
            print(f"视频文件 {video_file} 无音频轨道，手动创建 recording...")
            
            device = "cpu"  # 或使用 "cuda" 进行 GPU 加速
            decoder = VideoDecoder(str(video_file), device=device)

            # 提取视频元数据
            video_info = VideoInfo(
                fps=decoder.metadata.average_fps,        # 帧率
                num_frames=decoder.metadata.num_frames,  # 总帧数
                width=decoder.metadata.width,            # 视频宽度
                height=decoder.metadata.height,          # 视频高度
            )
                    
            # 手动创建 Recording 对象
            recording = Recording(
                id=video_file.stem,                       # 使用文件名作为 ID
                sampling_rate=None,                       # 无音频，采样率为 None
                num_samples=None,                         # 无音频，样本数为 None
                duration=decoder.metadata.duration_seconds,  # 视频时长
                channel_ids=[],                           # 空音频通道（避免初始化错误）
                sources=[
                    AudioSource(
                        type="file",
                        channels=None,                     # 无音频通道
                        source=str(video_file),           # 视频文件路径
                        video=video_info,                 # 视频元数据
                    )
                ],
            )

        # 将 Recording 转换为 Cut 对象
        cut = recording.to_cut()
        
        # 创建监督信息
        supervision = SupervisionSegment(
            id=recording.id,
            recording_id=recording.id,
            start=0,
            duration=recording.duration,
            channel=0,
            text="Video Text Prompt",  # 视频描述文本（实际应用中应替换为真实描述）
        )
        
        # 将监督信息附加到 cut
        cut.supervisions.append(supervision)
        cuts.append(cut)
        
        # 每处理 10 个文件输出一次进度
        if len(cuts) % 10 == 0:
            print(f"\n已处理 {len(cuts)} 个视频文件...")
            pprint(cut.to_dict())
        
    # 创建 CutSet 并保存
    cutset = CutSet.from_cuts(cuts)
    cutset.to_jsonl("video_cuts.jsonl.gz")
    print(f"已创建包含 {len(cuts)} 个视频 cuts 的文件：'video_cuts.jsonl.gz'")

# 示例用法：修改为您的视频目录路径
video_dir = "/Users/feiteng/Downloads/testdata/videos"
make_video_cuts_file(video_dir)


已处理 10 个视频文件...
{'channel': [],
 'duration': 8.5085,
 'id': 'video',
 'recording': {'channel_ids': [],
               'duration': 8.5085,
               'id': 'video',
               'num_samples': 0,
               'sampling_rate': 0,
               'sources': [{'channels': [],
                            'source': '/Users/feiteng/Downloads/testdata/videos/010/video.mp4',
                            'type': 'file',
                            'video': {'fps': 23.976023976023978,
                                      'height': 800,
                                      'num_frames': 204,
                                      'width': 640}}]},
 'start': 0.0,
 'supervisions': [{'channel': 0,
                   'duration': 8.5085,
                   'id': 'video',
                   'recording_id': 'video',
                   'start': 0,
                   'text': 'Video Text Prompt'}],
 'type': 'MultiCut'}
视频文件 /Users/feiteng/Downloads/testdata/videos/038/video.mp4 无音频轨道，手动创建 recording.

# 总结与下一步

## 本示例完成的任务

通过本 notebook，我们学习了如何：

1. **音频数据处理**
   - 使用 Lhotse CLI 和 Python API 创建音频 cuts
   - 添加转录文本和监督信息
   - 处理不同格式的音频文件

2. **图像数据处理**
   - 创建图像 cuts 并附加图像数据
   - 处理图像元数据（尺寸、路径等）
   - 兼容不同版本的 Lhotse API

3. **视频数据处理**
   - 处理带音频和无音频的视频文件
   - 提取视频元数据（帧率、分辨率等）
   - 使用 torchcodec 进行视频解码

## 生成的文件

运行完整个 notebook 后，您将得到三个 cuts 文件：
- `audio_cuts.jsonl.gz` - 音频数据的 cuts
- `image_cuts.jsonl.gz` - 图像数据的 cuts  
- `video_cuts.jsonl.gz` - 视频数据的 cuts

## 下一步：使用 DiscoSeqSampler

有了 cuts 文件后，您可以使用 DiscoSeqSampler 的各种采样器：

```python
from discoss.sampler import SimpleCutSampler, BucketingCutSampler, DynamicBucketingCutSampler
from discoss.constraint import TokenConstraint
from lhotse import CutSet

# 加载 cuts
cuts = CutSet.from_file("audio_cuts.jsonl.gz")

# 创建约束条件（例如：最大 token 数）
constraint = TokenConstraint(max_tokens=1000)

# 使用简单采样器
sampler = SimpleCutSampler(cuts, constraint=constraint, shuffle=True)

# 使用分桶采样器（按序列长度分组）
sampler = BucketingCutSampler(cuts, constraint=constraint, shuffle=True)

# 使用动态分桶采样器（自适应分组）
# cuts = CutSet.from_file("audio_cuts.jsonl.gz")
cuts = CutSet.from_jsonl_lazy("audio_cuts.jsonl.gz")  # streaming
sampler = DynamicBucketingCutSampler(cuts, constraint=constraint, shuffle=True)
```

## 进阶应用

- **分布式训练**：在多 GPU 环境中使用 DiscoSeqSampler 实现高效的数据并行
- **多模态融合**：结合音频、图像、视频 cuts 进行多模态模型训练
- **序列长度优化**：使用智能采样减少训练时的填充和计算浪费
- **自定义约束**：根据具体需求实现自定义的采样约束条件

## 参考资源

- [DiscoSeqSampler 项目主页](https://github.com/lifeiteng/DiscoSeqSampler)
- [Lhotse 文档](https://lhotse.readthedocs.io/)
- [测试用例](../tests/) - 查看更多使用示例