In [1]:
import numpy as np
import cv2
import os
import random
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import shutil


In [2]:
# 数据增强参数配置
AUGMENTATION_CONFIG = {
    'shift_pixels': [1000, 500, 1200, 1500, 250, 750],  # 左右平移像素数
    'stretch_factors': [0.7, 0.8, 0.9,  1.1, 1.2, 1.3, 1.4, 1.5, 1.6],  # 水平拉伸/收缩因子
    'enable_shift': True,
    'enable_stretch': True,
    'random_samples': 150  # 每个标签随机抽取的样本数
}

# 输入和输出文件夹路径
INPUT_DIR = 'images'
OUTPUT_DIR = 'images_augmented_random'

print(f"输入文件夹: {INPUT_DIR}")
print(f"输出文件夹: {OUTPUT_DIR}")
print(f"增强配置: {AUGMENTATION_CONFIG}")
print(f"每个标签随机抽取: {AUGMENTATION_CONFIG['random_samples']} 次")


输入文件夹: images
输出文件夹: images_augmented_random
增强配置: {'shift_pixels': [1000, 500, 1200, 1500, 250, 750], 'stretch_factors': [0.7, 0.8, 0.9, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6], 'enable_shift': True, 'enable_stretch': True, 'random_samples': 150}
每个标签随机抽取: 150 次


In [3]:
def horizontal_shift(image, shift_pixels):
    """
    左右平移图像
    
    Args:
        image: 输入图像 (numpy array)
        shift_pixels: 平移像素数，正数向右，负数向左
    
    Returns:
        shifted_image: 平移后的图像
    """
    height, width = image.shape[:2]
    
    # 创建平移矩阵
    M = np.float32([[1, 0, shift_pixels], [0, 1, 0]])
    
    # 执行平移变换
    shifted_image = cv2.warpAffine(image, M, (width, height), borderMode=cv2.BORDER_WRAP)
    
    return shifted_image

def horizontal_stretch(image, stretch_factor):
    """
    水平拉伸或收缩图像
    
    Args:
        image: 输入图像 (numpy array)
        stretch_factor: 拉伸因子，>1为拉伸，<1为收缩
    
    Returns:
        stretched_image: 拉伸后的图像
    """
    height, width = image.shape[:2]
    
    # 计算新的宽度
    new_width = int(width * stretch_factor)
    
    # 调整图像大小
    resized_image = cv2.resize(image, (new_width, height))
    
    # 如果拉伸后宽度大于原宽度，裁剪中心部分
    if new_width > width:
        start_x = (new_width - width) // 2
        stretched_image = resized_image[:, start_x:start_x + width]
    # 如果收缩后宽度小于原宽度，填充边缘
    elif new_width < width:
        pad_width = width - new_width
        pad_left = pad_width // 2
        pad_right = pad_width - pad_left
        
        if len(image.shape) == 3:  # RGB图像
            stretched_image = np.pad(resized_image, 
                                   ((0, 0), (pad_left, pad_right), (0, 0)), 
                                   mode='edge')
        else:  # 灰度图像
            stretched_image = np.pad(resized_image, 
                                   ((0, 0), (pad_left, pad_right)), 
                                   mode='edge')
    else:
        stretched_image = resized_image
    
    return stretched_image

def apply_augmentation(image, aug_type, param):
    """
    应用数据增强
    
    Args:
        image: 输入图像
        aug_type: 增强类型 ('shift' 或 'stretch')
        param: 增强参数
    
    Returns:
        augmented_image: 增强后的图像
    """
    if aug_type == 'shift':
        return horizontal_shift(image, param)
    elif aug_type == 'stretch':
        return horizontal_stretch(image, param)
    else:
        raise ValueError(f"不支持的增强类型: {aug_type}")

# 测试函数
print("数据增强函数定义完成！")


数据增强函数定义完成！


In [4]:
# 创建输出文件夹结构
def create_output_directories():
    """
    创建输出文件夹结构
    """
    if os.path.exists(OUTPUT_DIR):
        print(f"删除已存在的输出文件夹: {OUTPUT_DIR}")
        shutil.rmtree(OUTPUT_DIR)
    
    os.makedirs(OUTPUT_DIR)
    print(f"创建输出文件夹: {OUTPUT_DIR}")
    
    # 创建各个标签的子文件夹
    for label_dir in os.listdir(INPUT_DIR):
        if os.path.isdir(os.path.join(INPUT_DIR, label_dir)):
            output_label_dir = os.path.join(OUTPUT_DIR, label_dir)
            os.makedirs(output_label_dir)
            print(f"创建标签文件夹: {output_label_dir}")

create_output_directories()


创建输出文件夹: images_augmented_random
创建标签文件夹: images_augmented_random\label_0
创建标签文件夹: images_augmented_random\label_1
创建标签文件夹: images_augmented_random\label_2
创建标签文件夹: images_augmented_random\label_3
创建标签文件夹: images_augmented_random\label_4
创建标签文件夹: images_augmented_random\label_5
创建标签文件夹: images_augmented_random\label_6
创建标签文件夹: images_augmented_random\label_7
创建标签文件夹: images_augmented_random\label_8
创建标签文件夹: images_augmented_random\label_9


In [5]:
# 统计原始数据量
def count_original_images():
    """
    统计原始图像数量
    """
    total_images = 0
    label_counts = {}
    
    for label_dir in os.listdir(INPUT_DIR):
        label_path = os.path.join(INPUT_DIR, label_dir)
        if os.path.isdir(label_path):
            image_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
            count = len(image_files)
            label_counts[label_dir] = count
            total_images += count
    
    print("原始数据统计:")
    for label, count in label_counts.items():
        print(f"  {label}: {count} 张图像")
    print(f"  总计: {total_images} 张图像")
    
    return total_images, label_counts

original_total, original_counts = count_original_images()


原始数据统计:
  label_0: 56 张图像
  label_1: 55 张图像
  label_2: 55 张图像
  label_3: 55 张图像
  label_4: 55 张图像
  label_5: 55 张图像
  label_6: 55 张图像
  label_7: 55 张图像
  label_8: 55 张图像
  label_9: 55 张图像
  总计: 551 张图像


In [6]:
# 执行数据增强
def perform_random_data_augmentation():
    """
    执行随机数据增强处理，每个标签随机抽取指定次数
    """
    augmented_count = 0
    random.seed(42)  # 设置随机种子以确保结果可重现
    
    # 遍历所有标签文件夹
    for label_dir in os.listdir(INPUT_DIR):
        label_path = os.path.join(INPUT_DIR, label_dir)
        if not os.path.isdir(label_path):
            continue
            
        print(f"\n处理标签: {label_dir}")
        output_label_path = os.path.join(OUTPUT_DIR, label_dir)
        
        # 获取该标签下的所有图像文件
        image_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
        
        if not image_files:
            print(f"警告: 标签 {label_dir} 下没有图像文件")
            continue
            
        # 随机抽样次数
        num_samples = AUGMENTATION_CONFIG['random_samples']
        
        # 使用进度条显示处理进度
        for i in tqdm(range(num_samples), desc=f"处理 {label_dir}"):
            # 随机选择一张图像
            image_file = random.choice(image_files)
            image_path = os.path.join(label_path, image_file)
            
            # 读取原始图像
            original_image = cv2.imread(image_path)
            if original_image is None:
                print(f"警告: 无法读取图像 {image_path}")
                continue
            
            # 生成唯一的输出文件名（基于原始文件名、随机索引和时间戳）
            base_name = os.path.splitext(image_file)[0]
            unique_id = f"{base_name}_sample_{i:03d}"
            
            # 随机选择一种增强方法
            aug_methods = []
            if AUGMENTATION_CONFIG['enable_shift']:
                aug_methods.append('shift')
            if AUGMENTATION_CONFIG['enable_stretch']:
                aug_methods.append('stretch')
                
            if not aug_methods:
                print("警告: 没有启用任何增强方法")
                continue
                
            aug_method = random.choice(aug_methods)
            
            # 应用随机选择的增强方法
            if aug_method == 'shift':
                # 随机选择平移像素数
                shift_pixels = random.choice(AUGMENTATION_CONFIG['shift_pixels'])
                # 随机决定左移还是右移
                direction = random.choice([-1, 1])
                shift_amount = shift_pixels * direction
                
                # 应用平移
                augmented_image = apply_augmentation(original_image, 'shift', shift_amount)
                
                # 确定方向名称
                direction_name = "right" if direction > 0 else "left"
                output_path = os.path.join(output_label_path, f"{unique_id}_shift_{direction_name}_{abs(shift_amount)}.png")
                
            elif aug_method == 'stretch':
                # 随机选择拉伸因子
                stretch_factor = random.choice(AUGMENTATION_CONFIG['stretch_factors'])
                
                # 应用拉伸/收缩
                augmented_image = apply_augmentation(original_image, 'stretch', stretch_factor)
                
                # 确定拉伸类型名称
                type_name = "stretch" if stretch_factor > 1 else "compress"
                output_path = os.path.join(output_label_path, f"{unique_id}_{type_name}_{stretch_factor:.1f}.png")
            
            # 保存增强后的图像
            cv2.imwrite(output_path, augmented_image)
            augmented_count += 1
    
    return augmented_count

# 复制原始图像到增强文件夹
def copy_original_images():
    """
    将原始图像也复制到增强后的文件夹中
    """
    copied_count = 0
    
    print("\n开始复制原始图像...")
    
    # 遍历所有标签文件夹
    for label_dir in os.listdir(INPUT_DIR):
        label_path = os.path.join(INPUT_DIR, label_dir)
        if not os.path.isdir(label_path):
            continue
            
        output_label_path = os.path.join(OUTPUT_DIR, label_dir)
        
        # 获取该标签下的所有图像文件
        image_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
        
        # 使用进度条显示处理进度
        for image_file in tqdm(image_files, desc=f"复制 {label_dir} 原始图像"):
            # 源文件和目标文件路径
            source_path = os.path.join(label_path, image_file)
            dest_path = os.path.join(output_label_path, image_file)
            
            # 复制文件
            shutil.copy2(source_path, dest_path)
            copied_count += 1
    
    return copied_count

print("开始执行随机数据增强...")
total_augmented = perform_random_data_augmentation()
print(f"\n数据增强完成！")
print(f"总共生成 {total_augmented} 张增强图像")

# 复制原始图像
total_copied = copy_original_images()
print(f"\n原始图像复制完成！")
print(f"总共复制了 {total_copied} 张原始图像")
print(f"最终数据集总量: {total_augmented + total_copied} 张图像")


开始执行随机数据增强...

处理标签: label_0


处理 label_0: 100%|██████████| 150/150 [00:00<00:00, 168.18it/s]



处理标签: label_1


处理 label_1: 100%|██████████| 150/150 [00:00<00:00, 179.78it/s]



处理标签: label_2


处理 label_2: 100%|██████████| 150/150 [00:00<00:00, 198.59it/s]



处理标签: label_3


处理 label_3: 100%|██████████| 150/150 [00:00<00:00, 193.96it/s]



处理标签: label_4


处理 label_4: 100%|██████████| 150/150 [00:00<00:00, 198.99it/s]



处理标签: label_5


处理 label_5: 100%|██████████| 150/150 [00:00<00:00, 197.93it/s]



处理标签: label_6


处理 label_6: 100%|██████████| 150/150 [00:00<00:00, 185.32it/s]



处理标签: label_7


处理 label_7: 100%|██████████| 150/150 [00:00<00:00, 188.95it/s]



处理标签: label_8


处理 label_8: 100%|██████████| 150/150 [00:00<00:00, 185.35it/s]



处理标签: label_9


处理 label_9: 100%|██████████| 150/150 [00:00<00:00, 169.87it/s]



数据增强完成！
总共生成 1500 张增强图像

开始复制原始图像...


复制 label_0 原始图像: 100%|██████████| 56/56 [00:00<00:00, 1604.27it/s]
复制 label_1 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1622.00it/s]
复制 label_2 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1621.87it/s]
复制 label_3 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1594.42it/s]
复制 label_4 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1575.60it/s]
复制 label_5 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1621.88it/s]
复制 label_6 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1671.21it/s]
复制 label_7 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1414.08it/s]
复制 label_8 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1549.90it/s]
复制 label_9 原始图像: 100%|██████████| 55/55 [00:00<00:00, 1671.11it/s]


原始图像复制完成！
总共复制了 551 张原始图像
最终数据集总量: 2051 张图像





In [7]:
# 统计增强后的数据量
def count_augmented_images():
    """
    统计增强后的图像数量
    """
    total_images = 0
    label_counts = {}
    original_in_output = 0
    augmented_in_output = 0
    
    for label_dir in os.listdir(OUTPUT_DIR):
        label_path = os.path.join(OUTPUT_DIR, label_dir)
        if os.path.isdir(label_path):
            # 获取所有图像文件
            image_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
            count = len(image_files)
            label_counts[label_dir] = count
            total_images += count
            
            # 区分原始图像和增强图像
            original_files = [f for f in image_files if not ('_shift_' in f or '_stretch_' in f or '_compress_' in f)]
            augmented_files = [f for f in image_files if ('_shift_' in f or '_stretch_' in f or '_compress_' in f)]
            
            original_in_output += len(original_files)
            augmented_in_output += len(augmented_files)
    
    print("\n增强后数据统计:")
    for label, count in sorted(label_counts.items()):
        print(f"  {label}: {count} 张图像")
    print(f"  总计: {total_images} 张图像")
    print(f"  其中原始图像: {original_in_output} 张")
    print(f"  其中增强图像: {augmented_in_output} 张")
    
    return total_images, label_counts, original_in_output, augmented_in_output

augmented_total, augmented_counts, original_in_output, augmented_in_output = count_augmented_images()

# 计算增强倍数
print(f"\n数据增强效果:")
print(f"原始数据量: {original_total} 张")
print(f"复制的原始图像: {original_in_output} 张")
print(f"生成的增强图像: {augmented_in_output} 张")
print(f"增强后总数据量: {augmented_total} 张")
print(f"数据增长倍数: {augmented_total / original_total:.2f}x")



增强后数据统计:
  label_0: 206 张图像
  label_1: 205 张图像
  label_2: 205 张图像
  label_3: 205 张图像
  label_4: 205 张图像
  label_5: 205 张图像
  label_6: 205 张图像
  label_7: 205 张图像
  label_8: 205 张图像
  label_9: 205 张图像
  总计: 2051 张图像
  其中原始图像: 551 张
  其中增强图像: 1500 张

数据增强效果:
原始数据量: 551 张
复制的原始图像: 551 张
生成的增强图像: 1500 张
增强后总数据量: 2051 张
数据增长倍数: 3.72x


In [None]:
# 重新处理切片（保持颜色信息）
print("\n" + "="*60)
print("🎨 重新处理切片 - 保持颜色信息")
print("="*60)

# 删除旧的灰度切片文件夹
if os.path.exists("sliced_images"):
    print("删除旧的灰度切片文件夹...")
    shutil.rmtree("sliced_images")

# 重新创建文件夹
os.makedirs("sliced_images", exist_ok=True)
for label_dir in os.listdir(OUTPUT_DIR):
    if os.path.isdir(os.path.join(OUTPUT_DIR, label_dir)):
        label_output_dir = os.path.join("sliced_images", label_dir)
        os.makedirs(label_output_dir, exist_ok=True)
        print(f"创建标签切片文件夹: {label_output_dir}")

# 重新执行切片处理（彩色版本）
def slice_images_color():
    """
    对增强后的图像进行切片处理，保持RGB颜色信息
    """
    total_slices = 0
    
    print("\n开始处理彩色图像切片...")
    
    # 遍历所有标签文件夹
    for label_dir in os.listdir(OUTPUT_DIR):
        label_path = os.path.join(OUTPUT_DIR, label_dir)
        if not os.path.isdir(label_path):
            continue
            
        print(f"\n处理标签: {label_dir}")
        label_output_path = os.path.join("sliced_images", label_dir)
        
        # 获取该标签下的所有图像文件
        image_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
        
        label_slice_count = 0
        
        # 使用进度条显示处理进度
        for image_file in tqdm(image_files, desc=f"彩色切片处理 {label_dir}"):
            image_path = os.path.join(label_path, image_file)
            
            # 读取图像（保持彩色）
            data = cv2.imread(image_path, cv2.IMREAD_COLOR)
            if data is None:
                print(f"警告: 无法读取图像 {image_path}")
                continue
            
            # OpenCV读取的是BGR格式，转换为RGB格式
            data_rgb = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
            
            # 获取原始文件名（不含扩展名）
            base_name = os.path.splitext(image_file)[0]
            
            # 对图像进行切片
            slice_count = 0
            for start in range(0, data_rgb.shape[1] - window_size + 1, step_size):
                # 切片 (16, window_size, 3) - 保持3个颜色通道
                sub_img = data_rgb[:, start:start + window_size, :]
                
                # 转为 PIL Image（RGB格式）
                pil_img = Image.fromarray(sub_img.astype('uint8'))
                
                # resize 成 (64, 64) - 保持RGB格式
                resized_img = pil_img.resize(output_size, Image.BILINEAR)
                
                # 保存为RGB格式
                save_path = os.path.join(label_output_path, f"{base_name}_slice_{slice_count:03d}.png")
                resized_img.save(save_path)
                
                slice_count += 1
                label_slice_count += 1
                total_slices += 1
        
        print(f"  {label_dir}: 从 {len(image_files)} 张图像生成了 {label_slice_count} 张彩色切片")
    
    return total_slices

# 执行彩色切片处理
total_color_slices = slice_images_color()
print(f"\n✅ 彩色切片处理完成！")
print(f"总共生成了 {total_color_slices} 张彩色切片图像到 sliced_images 文件夹！")


In [None]:
# 执行图像切片处理
def slice_images():
    """
    对增强后的图像进行切片处理，将大图像切成小片段并压缩到64x64
    """
    total_slices = 0
    
    print("\n开始处理图像切片...")
    
    # 遍历所有标签文件夹
    for label_dir in os.listdir(OUTPUT_DIR):
        label_path = os.path.join(OUTPUT_DIR, label_dir)
        if not os.path.isdir(label_path):
            continue
            
        print(f"\n处理标签: {label_dir}")
        label_output_path = os.path.join("sliced_images", label_dir)
        
        # 获取该标签下的所有图像文件
        image_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
        
        label_slice_count = 0
        
        # 使用进度条显示处理进度
        for image_file in tqdm(image_files, desc=f"切片处理 {label_dir}"):
            image_path = os.path.join(label_path, image_file)
            
            # 读取图像（保持彩色）
            data = cv2.imread(image_path, cv2.IMREAD_COLOR)
            if data is None:
                print(f"警告: 无法读取图像 {image_path}")
                continue
            
            # OpenCV读取的是BGR格式，转换为RGB格式
            data_rgb = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
            
            # 获取原始文件名（不含扩展名）
            base_name = os.path.splitext(image_file)[0]
            
            # 对图像进行切片
            slice_count = 0
            for start in range(0, data_rgb.shape[1] - window_size + 1, step_size):
                # 切片 (16, window_size, 3) - 保持3个颜色通道
                sub_img = data_rgb[:, start:start + window_size, :]
                
                # 转为 PIL Image（RGB格式）
                pil_img = Image.fromarray(sub_img)
                
                # resize 成 (64, 64) - 保持RGB格式
                resized_img = pil_img.resize(output_size, Image.BILINEAR)
                
                # 保存
                save_path = os.path.join(label_output_path, f"{base_name}_slice_{slice_count:03d}.png")
                resized_img.save(save_path)
                
                slice_count += 1
                label_slice_count += 1
                total_slices += 1
        
        print(f"  {label_dir}: 从 {len(image_files)} 张图像生成了 {label_slice_count} 张切片")
    
    return total_slices

window_size = 2000    # 每个小片宽度
step_size = 1000      # 滑动步长（控制重叠程度）
output_size = (64, 64)  # 最终 resize 大小
os.makedirs("sliced_images", exist_ok=True)

# 为每个标签创建子文件夹
for label_dir in os.listdir(OUTPUT_DIR):
    if os.path.isdir(os.path.join(OUTPUT_DIR, label_dir)):
        label_output_dir = os.path.join("sliced_images", label_dir)
        os.makedirs(label_output_dir, exist_ok=True)
        print(f"创建标签切片文件夹: {label_output_dir}")
# 执行切片处理
total_generated_slices = slice_images()
print(f"\n✅ 切片处理完成！")
print(f"总共生成了 {total_generated_slices} 张切片图像到 sliced_images 文件夹！")


In [8]:
# 统计切片后的数据量
def count_sliced_images():
    """
    统计切片后的图像数量
    """
    total_slices = 0
    label_slice_counts = {}
    
    sliced_dir = "sliced_images"
    
    for label_dir in os.listdir(sliced_dir):
        label_path = os.path.join(sliced_dir, label_dir)
        if os.path.isdir(label_path):
            # 获取所有切片图像文件
            slice_files = [f for f in os.listdir(label_path) if f.endswith('.png')]
            count = len(slice_files)
            label_slice_counts[label_dir] = count
            total_slices += count
    
    print("\n切片后数据统计:")
    for label, count in sorted(label_slice_counts.items()):
        print(f"  {label}: {count} 张切片")
    print(f"  总计: {total_slices} 张切片图像")
    
    return total_slices, label_slice_counts

# 统计切片数据
total_slices, slice_counts = count_sliced_images()




切片后数据统计:
  label_0: 824 张切片
  label_1: 820 张切片
  label_2: 820 张切片
  label_3: 820 张切片
  label_4: 820 张切片
  label_5: 820 张切片
  label_6: 816 张切片
  label_7: 820 张切片
  label_8: 820 张切片
  label_9: 820 张切片
  总计: 8200 张切片图像
