In [1]:
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: 
# Also add the following code, 
# so that every time the environment (kernel) starts, 
# just run the following code: 
import sys 
sys.path.append('external-libraries')

In [2]:



import matplotlib
import os
import random
import paddle
import zipfile
from paddle.io import Dataset
from PIL import Image

## 来源：百度飞桨平台的公开数据集中的交通指示牌分类数据集。
### **包含了生活中常见的58种交通信号标志**




 

![](https://ai-studio-static-online.cdn.bcebos.com/9e8295382c08452a95ae9e1ee03a8a69a7e072c35fbe4244818cec25979f2416)
![](https://ai-studio-static-online.cdn.bcebos.com/38ee23446e344cda85b221a4b2850950aedd29b7b68f4389a7dad7c3f0b3dcbc)


In [3]:
import matplotlib
matplotlib.use('Agg') 
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

In [None]:

from paddle.vision import transforms
train_transforms = transforms.Compose([
    transforms.Resize(256),  # 首先调整到固定大小
    transforms.RandomCrop(224),  # 然后随机裁剪
    transforms.RandomHorizontalFlip(prob=0.5),  # 水平翻转
    transforms.RandomRotation(15),  # 随机旋转，增强模型鲁棒性
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),  # 色彩扰动
    transforms.RandomErasing(prob=0.2, scale=(0.02, 0.2)),  # 随机擦除
    transforms.Transpose(),  # CHW格式转换
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 标准化
])

eval_transforms = transforms.Compose([
    transforms.Resize(256),  # 首先调整到固定大小
    transforms.CenterCrop(224),  # 然后中心裁剪
    transforms.Transpose(),  # CHW格式转换
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 标准化
])


In [None]:
import os
from PIL import Image
import paddle
from paddle.vision import transforms


# 定义用于ImageNet格式数据的自定义数据集
class ImageNetDataset(paddle.io.Dataset):
    def __init__(self, data_dir, file_list, label_list, transforms=None, shuffle=False):
        super(ImageNetDataset, self).__init__()
        self.data_dir = data_dir
        self.transforms = transforms
        
        # 读取文件列表
        self.file_paths = []
        self.labels = []
        with open(file_list, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 2:  # 确保行格式正确
                    self.file_paths.append(parts[0])
                    self.labels.append(int(parts[1]))
                    
        # 读取标签映射
        self.label_dict = {}
        try:
            with open(label_list, 'r') as f:
                for i, line in enumerate(f):
                    self.label_dict[i] = line.strip()
        except Exception as e:
            print(f"警告: 无法读取标签文件: {e}")
            # 如果无法读取标签文件，创建默认标签
            for i in set(self.labels):
                self.label_dict[i] = str(i)
                
        # 验证标签范围
        max_label = max(self.labels) if self.labels else 0
        if max_label >= len(self.label_dict):
            print(f"警告: 最大标签值 {max_label} 超出标签字典大小 {len(self.label_dict)}")
            # 扩展标签字典以包含所有标签
            for i in range(len(self.label_dict), max_label + 1):
                self.label_dict[i] = str(i)
                
        if shuffle:
            import random
            combined = list(zip(self.file_paths, self.labels))
            random.shuffle(combined)
            self.file_paths, self.labels = zip(*combined)
            self.file_paths = list(self.file_paths)  # 转回列表
            self.labels = list(self.labels)
    
    def __getitem__(self, index):
        img_path = os.path.join(self.data_dir, self.file_paths[index])
        try:
            img = Image.open(img_path).convert('RGB')
        except Exception as e:
            print(f"加载图像出错 {img_path}: {e}")
            # 返回一个黑色图像作为替代
            img = Image.new('RGB', (64, 64), color='black')
            
        if self.transforms:
            img = self.transforms(img)
            
        return img, self.labels[index]
    
    def __len__(self):
        return len(self.file_paths)

# 数据目录和文件路径
data_dir = './dataset/traffic_Data/DATA'
train_list = os.path.join(data_dir, 'train_list.txt')
val_list = os.path.join(data_dir, 'val_list.txt')
label_list = os.path.join(data_dir, 'labels.txt')
test_list = os.path.join(data_dir,'test_list.txt')

# 定义训练数据集
train_dataset = ImageNetDataset(
    data_dir=data_dir,
    file_list=train_list,
    label_list=label_list,
    transforms=train_transforms,
    shuffle=True)

# 定义评估数据集
eval_dataset = ImageNetDataset(
    data_dir=data_dir,
    file_list=val_list,
    label_list=label_list,
    transforms=eval_transforms)

test_dataset = ImageNetDataset(
    data_dir=data_dir,
    file_list=test_list,
    label_list=label_list,
    transforms=eval_transforms)

# 将数据集整合到字典中便于访问
datasets = {
    'train': train_dataset,
    'eval': eval_dataset,
    'test':test_dataset
}

# 创建数据加载器
batch_size = 64
loaders = {
    k: paddle.io.DataLoader(
        v, 
        batch_size=batch_size, 
        shuffle=(k=='train')
    ) for k, v in datasets.items()
}

# 打印数据集信息
print(f"训练集样本数: {len(train_dataset)}")
print(f"验证集样本数: {len(eval_dataset)}")
print(f"测试集数量:{len(test_dataset)}")
print(f"类别数: {len(train_dataset.label_dict)}")

训练集样本数: 2965
验证集样本数: 811
测试集数量:394
类别数: 58


使用本数据集在V100上训练，模型的训练过程预估为5分钟左右；如无GPU，则预估为30分钟左右。

In [6]:
import os, sys, time, random, math
import paddle
import numpy as np
import matplotlib.pyplot as plt
from paddle.vision import transforms
from paddle.vision.models import ShuffleNetV2
from PIL import Image
from IPython.display import clear_output

# 设置环境
sys.path.append('external-libraries')
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
matplotlib.use('Agg')

# 创建保存目录
save_dir = 'work/mymodel'
os.makedirs(save_dir, exist_ok=True)

In [7]:
# 数据目录
data_dir = './dataset/traffic_Data/DATA'

# 注意: 此处缺少训练集和验证集的具体创建代码
# 假设train_dataset和eval_dataset已经在其他地方定义
# 如果没有，需要添加数据加载和预处理代码

# 创建数据加载器
batch_size = 64
loaders = {
    k: paddle.io.DataLoader(
        v, 
        batch_size=batch_size, 
        shuffle=(k=='train')
    ) for k, v in datasets.items()
}

In [8]:
class WarmupCosineDecay:
    def __init__(self, base_lr, warmup_steps, warmup_start_lr, total_steps):
        self.base_lr = base_lr
        self.warmup_steps = warmup_steps
        self.warmup_start_lr = warmup_start_lr
        self.total_steps = total_steps
        self.current_step = 0
        
    def step(self):
        self.current_step += 1
        if self.current_step < self.warmup_steps:
            return self.warmup_start_lr + (self.base_lr - self.warmup_start_lr) * \
                   (self.current_step / self.warmup_steps)
        else:
            progress = (self.current_step - self.warmup_steps) / (self.total_steps - self.warmup_steps)
            return self.base_lr * 0.5 * (1 + math.cos(math.pi * progress))

# 计算总步数和训练配置
num_epochs = 100
steps_per_epoch = math.ceil(len(datasets['train']) / batch_size)
total_steps = steps_per_epoch * num_epochs

# 学习率调度参数
base_lr = 0.004
warmup_start_lr = 0.001
warmup_steps = int(total_steps * 0.1)
lr_scheduler = WarmupCosineDecay(base_lr, warmup_steps, warmup_start_lr, total_steps)

# 配置优化器和损失函数
loss_fn = paddle.nn.CrossEntropyLoss(label_smoothing=0.1)

In [9]:
from paddle.vision import mobilenet_v3_small

# 直接使用预训练的MobileNetV3模型
num_classes = len(datasets['train'].label_dict)
model = mobilenet_v3_small(num_classes=num_classes, pretrained=True)

# 创建优化器
from paddle.optimizer.lr import LambdaDecay

# 包装自定义学习率调度器
def lr_lambda(epoch):
    # 这里假设每调用一次返回当前学习率
    return lr_scheduler.step() / base_lr

scheduler = LambdaDecay(learning_rate=base_lr, lr_lambda=lr_lambda)

optimizer = paddle.optimizer.Adam(
    learning_rate=scheduler,
    parameters=model.parameters()
)



In [10]:
# 初始化训练记录
records = {'epochs':[], 'train_losses':[], 'val_accs':[], 'lrs':[]}

# 绘图配置
def setup_plots():
    plt.ion()  # 交互模式
    fig, (ax1, ax3) = plt.subplots(2, 1, figsize=(12, 10))
    fig.tight_layout(pad=5)
    ax2 = ax1.twinx()
    
    # 配置上半部分图表
    ax1.set_title('loss&acc', fontsize=15)
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss', color='blue')
    ax2.set_ylabel('Accuracy', color='red')
    ax2.set_ylim([0, 1.0])
    
    # 配置下半部分图表
    ax3.set_title('lr', fontsize=15)
    ax3.set_xlabel('Epochs')
    ax3.set_ylabel('Learning Rate')
    
    # 创建线条对象
    lines = {
        'loss': ax1.plot([], [], 'b-', label='Training Loss')[0],
        'acc': ax2.plot([], [], 'r-', label='Validation Accuracy')[0],
        'lr': ax3.plot([], [], 'g-', label='Learning Rate')[0]
    }
    
    # 添加图例和网格
    ax1.legend(loc='upper left')
    ax2.legend(loc='upper right')
    ax3.legend(loc='upper right')
    ax1.grid(True, alpha=0.3)
    ax3.grid(True, alpha=0.3)
    
    return fig, ax1, ax2, ax3, lines

# 更新图表函数
def update_plots(records, lines, ax1, ax2, ax3):
    for k, line in lines.items():
        data_key = 'train_losses' if k == 'loss' else ('val_accs' if k == 'acc' else 'lrs')
        line.set_data(records['epochs'], records[data_key])
    
    if records['epochs']:
        ax1.set_xlim(0, max(records['epochs']) + 1)
        ax1.set_ylim(0, max(records['train_losses']) * 1.1)
        ax3.set_xlim(0, max(records['epochs']) + 1)
        ax3.set_ylim(0, max(records['lrs']) * 1.1)
    
    fig = ax1.figure
    fig.canvas.draw()
    fig.canvas.flush_events()
    plt.savefig(f'{save_dir}/training_curves_latest.png')

# 初始化图表
fig, ax1, ax2, ax3, lines = setup_plots()

In [11]:
# 训练设置
best_acc = 0.0
patience_counter = 0
patience = 40  # 早停参数

# 训练循环
step_count = 0
for epoch in range(num_epochs):
    start_time = time.time()
    
    # 训练阶段
    model.train()
    total_loss = 0.0
    batch_count = 0
    
    for x_data, y_data in loaders['train']():
        # 更新学习率
        current_lr = lr_scheduler.step()
        # 前向传播和反向传播
        logits = model(x_data)
        loss = loss_fn(logits, y_data)
        loss.backward()
        
        # 应用梯度更新模型参数
        optimizer.step()        # 添加这行：执行优化步骤
        optimizer.clear_grad()  # 添加这行：清除梯度
        
        # 累计损失
        total_loss += loss.numpy()
        batch_count += 1
        step_count += 1
        
        # 打印进度
        if step_count % 50 == 0:
            print(f'Step: {step_count}, LR: {current_lr:.6f}')

    # 验证阶段
    model.eval()
    correct = 0
    total = 0
    val_loss = 0.0
    val_batch_count = 0
    
    with paddle.no_grad():
        for x_data, y_data in loaders['eval']():
            # 前向传播
            logits = model(x_data)
            loss = loss_fn(logits, y_data)
            
            # 计算准确率
            pred = paddle.argmax(logits, axis=1)
            correct += (pred == y_data.flatten()).numpy().sum()
            total += y_data.shape[0]
            
            # 累计验证损失
            val_loss += loss.numpy()
            val_batch_count += 1
            
    # 计算指标
    avg_loss = total_loss / batch_count
    avg_val_loss = val_loss / val_batch_count
    acc = correct / total
    
    # 输出当前epoch信息
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, '
          f'Val Loss: {avg_val_loss:.4f}, Acc: {acc:.4f}, '
          f'LR: {current_lr:.6f}, Time: {time.time()-start_time:.1f}s')

    # 保存最优模型
    if acc > best_acc:
        best_acc = acc
        paddle.save(model.state_dict(), f'{save_dir}/best_model.pdparams')
        
            
        print(f'Best model saved: {best_acc:.4f}')
        patience_counter = 0
    else:
        patience_counter += 1
        print(f'No improvement. Patience: {patience_counter}/{patience}')
        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break
    
    # 更新记录和图表
    records['epochs'].append(epoch + 1)
    records['train_losses'].append(float(avg_loss))
    records['val_accs'].append(float(acc))
    records['lrs'].append(float(current_lr))
    clear_output(wait=True)
    update_plots(records, lines, ax1, ax2, ax3)
    time.sleep(0.1)

print('Training completed!')

Step: 3100, LR: 0.001252
Epoch 66/100, Loss: 0.7453, Val Loss: 0.7611, Acc: 0.9889, LR: 0.001249, Time: 14.4s
No improvement. Patience: 40/40
Early stopping at epoch 66
Training completed!


In [12]:
# 绘制最终结果图
plt.ioff()
plt.figure(figsize=(15, 10))

# 上半部分：损失和准确率
plt.subplot(2, 1, 1)
plt.plot(records['epochs'], records['train_losses'], 'b-', label='Loss')
plt.title('loss&acc', fontsize=15)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.grid(True)
plt.legend(loc='upper left')

ax2 = plt.twinx()
ax2.plot(records['epochs'], records['val_accs'], 'r-', label='Accuracy')
ax2.set_ylabel('Accuracy')
ax2.legend(loc='upper right')
ax2.set_ylim([0, 1.0])

# 下半部分：学习率
plt.subplot(2, 1, 2)
plt.plot(records['epochs'], records['lrs'], 'g-')
plt.title('lr', fontsize=15)
plt.xlabel('Epochs')
plt.ylabel('Learning Rate')
plt.grid(True)

plt.tight_layout()
plt.savefig(f'{save_dir}/training_curves_final.png')
plt.show()

# 清理显存
paddle.device.cuda.empty_cache()

  plt.show()


#
    使用生成的模型进行预测
#

In [15]:
import paddle
from paddle.vision import mobilenet_v3_small
model_dict = paddle.load("./work/mymodel/best_model.pdparams") #加载模型参数   
model = mobilenet_v3_small(num_classes=num_classes, pretrained=True)
model.load_dict(model_dict) #加载模型参数
model.eval() 
correct = 0
total = 0

with paddle.no_grad():  
    for x_data, y_data in loaders['test']():
        # 前向传播
        logits = model(x_data)
        
        # 计算准确率
        pred = paddle.argmax(logits, axis=1)
        correct += (pred == y_data.flatten()).numpy().sum()
        total += y_data.shape[0]

# 4. 计算最终准确率
test_acc = correct / total

# 5. 输出结果
print(f"测试集样本数: {total}")
print(f"正确预测数: {correct}")
print(f"测试集准确率: {test_acc:.4f} ({correct}/{total})")


测试集样本数: 394
正确预测数: 390
测试集准确率: 0.9898 (390/394)
