In [1]:
import os
from dataclasses import dataclass
from typing import Dict, Sequence
import torch
from torch.utils.data import Dataset
from datasets import load_dataset, concatenate_datasets
from transformers import AutoProcessor
import glob
import transformers
from torch.nn.utils.rnn import pad_sequence

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
import io
from PIL import Image
import webdataset as wds
from tqdm import tqdm
import os
from braceexpand import braceexpand

# 原始 URL 模板（含 braceexpand 语法）
input_data_url_template = "https://facevcstandard.blob.core.windows.net/doch/data/laion2B-en-aesthetic/{00000..05247}.tar?sv=2023-01-03&st=2025-06-05T05%3A41%3A53Z&se=2025-06-12T05%3A41%3A00Z&skoid=1ff6eda0-bcb1-4b77-9ff2-64bae2665820&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2025-06-05T05%3A41%3A53Z&ske=2025-06-12T05%3A41%3A00Z&sks=b&skv=2023-01-03&sr=c&sp=racwdxltf&sig=C31qRQqAV3QeH26BHrCzoXHZptZzaCg29hwFFdwrAlA%3D"

# 使用 braceexpand 展开为完整的 URL 列表 
input_data_urls = list(braceexpand(input_data_url_template))

threshold_bytes = 4000  # 图像大小阈值  

# 用于保存异常数据的索引
bad_samples = []  # 格式: (tar_file_name, local_index_in_tar)

# 遍历所有 tar 文件
for url in input_data_urls:

    try:
        # 创建 WebDataset 并解包
        dataset = wds.WebDataset(url)

        # 使用 enumerate 加上计数器来记录当前是第几个样本
        for local_index, sample in enumerate(tqdm(dataset, desc=f"Scanning {os.path.basename(url)}")):
            # 提取图像字段（jpg 或 png）
            image_data = None
            if "jpg" in sample:
                image_data = sample["jpg"]
            elif "png" in sample:
                image_data = sample["png"]

            # 提取文本字段（通常是 txt）
            text_data = sample.get("txt", None)

            # 如果没有图像或文本，跳过
            if image_data is None or text_data is None:
                continue

            file_size = len(image_data)

            # 判断是否小于阈值
            if file_size < threshold_bytes:
                bad_samples.append((url, local_index))
                
                # 🔥 实时

    except Exception as e:
        print(f"❌ Error processing {url}: {e}")
        continue

# 最后输出所有异常样本的位置（可选）
for tar_file, idx in bad_samples:
    print(f"[{os.path.basename(tar_file)}] Sample index: {idx}")

Scanning 00000.tar?sv=2023-01-03&st=2025-06-05T05%3A41%3A53Z&se=2025-06-12T05%3A41%3A00Z&skoid=1ff6eda0-bcb1-4b77-9ff2-64bae2665820&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2025-06-05T05%3A41%3A53Z&ske=2025-06-12T05%3A41%3A00Z&sks=b&skv=2023-01-03&sr=c&sp=racwdxltf&sig=C31qRQqAV3QeH26BHrCzoXHZptZzaCg29hwFFdwrAlA%3D: 4688it [00:12, 366.14it/s]
Scanning 00001.tar?sv=2023-01-03&st=2025-06-05T05%3A41%3A53Z&se=2025-06-12T05%3A41%3A00Z&skoid=1ff6eda0-bcb1-4b77-9ff2-64bae2665820&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2025-06-05T05%3A41%3A53Z&ske=2025-06-12T05%3A41%3A00Z&sks=b&skv=2023-01-03&sr=c&sp=racwdxltf&sig=C31qRQqAV3QeH26BHrCzoXHZptZzaCg29hwFFdwrAlA%3D: 4557it [00:14, 309.45it/s]
Scanning 00002.tar?sv=2023-01-03&st=2025-06-05T05%3A41%3A53Z&se=2025-06-12T05%3A41%3A00Z&skoid=1ff6eda0-bcb1-4b77-9ff2-64bae2665820&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2025-06-05T05%3A41%3A53Z&ske=2025-06-12T05%3A41%3A00Z&sks=b&skv=2023-01-03&sr=c&sp=racwdxltf&sig=C31qRQqAV3QeH26BHrCzoX

In [None]:
len(bad_samples)

In [None]:
bad_samples

In [None]:
import os
import json
import webdataset as wds
from urllib.parse import urlparse
from tqdm import tqdm

# 输入输出路径
input_dir = "/home/v-haodongli/mnt/v-haodongli-container/cot_output_test"
output_dir = "/home/v-haodongli/mnt/v-haodongli-container/cot_output_test_clean"

# 示例 bad_samples 结构
# bad_samples = [
#     (
#         "https://facevcstandard.blob.core.windows.net/doch/data/laion2B-en-aesthetic/00001.tar?sv=2023-01-03&st=2025-06-05T05%3A41%3A53Z&se=2025-06-12T05%3A41%3A00Z&skoid=1ff6eda0-bcb1-4b77-9ff2-64bae2665820&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2025-06-05T05%3A41%3A53Z&ske=2025-06-12T05%3A41%3A00Z&sks=b&skv=2023-01-03&sr=c&sp=racwdxltf&sig=C31qRQqAV3QeH26BHrCzoXHZptZzaCg29hwFFdwrAlA%3D",
#         493
#     ),
#     (
#         "https://facevcstandard.blob.core.windows.net/doch/data/laion2B-en-aesthetic/00001.tar?sv=2023-01-03&st=2025-06-05T05%3A41%3A53Z&se=2025-06-12T05%3A41%3A00Z&skoid=1ff6eda0-bcb1-4b77-9ff2-64bae2665820&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2025-06-05T05%3A41%3A53Z&ske=2025-06-12T05%3A41%3A00Z&sks=b&skv=2023-01-03&sr=c&sp=racwdxltf&sig=C31qRQqAV3QeH26BHrCzoXHZptZzaCg29hwFFdwrAlA%3D",
#         612
#     )
# ]
# 构建映射：{ tar_filename: set(indices) } 
bad_sample_map = {}
for url, index in bad_samples:
    filename = os.path.basename(urlparse(url).path)
    if filename not in bad_sample_map:
        bad_sample_map[filename] = set()
    bad_sample_map[filename].add(index)

# 创建输出目录
os.makedirs(output_dir, exist_ok=True)

# 遍历所有 .tar 文件
for filename in tqdm(os.listdir(input_dir), desc="Processing .tar files"):
    if not filename.endswith(".tar"):
        continue

    input_path = os.path.join(input_dir, filename)
    output_path = os.path.join(output_dir, filename)

    if filename not in bad_sample_map:
        print(f"{filename} has no bad samples. Copying directly.")
        os.system(f"cp {input_path} {output_path}")
        continue

    # 获取当前文件要过滤的索引集合
    bad_indices = bad_sample_map[filename]

    print(f"Filtering {filename}, removing {len(bad_indices)} bad samples...")

    # 用 WebDataset 读取原始数据
    dataset = wds.WebDataset(f"pipe:cat {input_path}")

    # 写入新 .tar 文件
    with wds.TarWriter(output_path) as sink:
        for idx, sample in enumerate(dataset):
            if idx in bad_indices:
                print(f"Skipping bad sample at index {idx} in {filename}")
                continue
            sink.write(sample)