# 钢铁缺陷检测分析

## 项目概述

这是一个专注于钢铁缺陷检测的计算机视觉项目，基于深度学习技术对钢铁表面缺陷进行自动识别和分类。

### 项目目标
- 实现钢铁表面缺陷的自动检测和分类
- 提高缺陷检测的准确率和召回率
- 优化模型推理速度，满足实时检测需求
- 开发可部署的缺陷检测系统原型

### 技术栈
- **深度学习框架**: PyTorch
- **图像处理**: OpenCV, PIL
- **数据处理**: Pandas, NumPy
- **可视化**: Matplotlib, Seaborn

## 1. 环境设置和数据加载

In [None]:
# 导入必要的库
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 设置matplotlib中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans', 'Arial', 'sans-serif']
plt.rcParams['axes.unicode_minus'] = False

print("环境设置完成!")

环境设置完成!


## 2. 数据探索性分析 (EDA)

In [8]:
# 数据路径设置
data_dir = "/root/CASE-钢铁缺陷检测-P1/data/"
train_dir = os.path.join(data_dir, "train", "IMAGES")
test_dir = os.path.join(data_dir, "test", "IMAGES")
val_dir = os.path.join(data_dir, "validation", "IMAGES")

# 检查数据目录结构
def check_data_structure():
    print("数据目录结构:")
    for dir_path in [train_dir, test_dir, val_dir]:
        if os.path.exists(dir_path):
            num_images = len([f for f in os.listdir(dir_path) if f.endswith(('.jpg', '.png', '.jpeg'))])
            print(f"  {dir_path}: {num_images} 张图像")
        else:
            print(f"  {dir_path}: 目录不存在")

check_data_structure()

数据目录结构:
  /root/CASE-钢铁缺陷检测-P1/data/train/IMAGES: 1400 张图像
  /root/CASE-钢铁缺陷检测-P1/data/test/IMAGES: 400 张图像
  /root/CASE-钢铁缺陷检测-P1/data/validation/IMAGES: 目录不存在


## 3. 数据预处理和增强

In [9]:
# 定义数据增强和预处理
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print("数据预处理管道定义完成!")

数据预处理管道定义完成!


## 4. 自定义数据集类

In [10]:
class SteelDefectDataset(Dataset):
    """钢铁缺陷检测数据集类"""
    
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        # 假设数据目录结构: data_dir/class_name/image.jpg
        if os.path.exists(data_dir):
            for class_name in os.listdir(data_dir):
                class_dir = os.path.join(data_dir, class_name)
                if os.path.isdir(class_dir):
                    for img_name in os.listdir(class_dir):
                        if img_name.endswith(('.jpg', '.png', '.jpeg')):
                            self.image_paths.append(os.path.join(class_dir, img_name))
                            self.labels.append(class_name)
        
        # 创建标签映射
        self.classes = sorted(list(set(self.labels)))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.idx_to_class = {idx: cls for cls, idx in self.class_to_idx.items()}
        
        # 将标签转换为索引
        self.label_indices = [self.class_to_idx[label] for label in self.labels]
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.label_indices[idx]
        
        # 加载图像
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label
    
    def get_class_distribution(self):
        """获取类别分布"""
        from collections import Counter
        return Counter(self.labels)

print("数据集类定义完成!")

数据集类定义完成!


## 5. CNN基础模型

In [11]:
class CNNModel(nn.Module):
    """自定义CNN模型"""
    
    def __init__(self, num_classes=6):
        super(CNNModel, self).__init__()
        
        # 卷积层
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        
        # 池化层
        self.pool = nn.MaxPool2d(2, 2)
        
        # 全连接层
        self.fc1 = nn.Linear(128 * 28 * 28, 512)  # 224/2/2/2 = 28
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        
        # 激活函数和Dropout
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        # 卷积层1
        x = self.pool(self.relu(self.conv1(x)))
        
        # 卷积层2
        x = self.pool(self.relu(self.conv2(x)))
        
        # 卷积层3
        x = self.pool(self.relu(self.conv3(x)))
        
        # 展平
        x = x.view(-1, 128 * 28 * 28)
        
        # 全连接层
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        
        return x

# 测试模型
model = CNNModel(num_classes=6)
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")

模型参数量: 51,606,854


## 6. 训练函数

In [12]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, device='cuda'):
    """训练模型"""
    
    model = model.to(device)
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    for epoch in range(num_epochs):
        # 训练阶段
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            if batch_idx % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
        train_loss = running_loss / len(train_loader)
        train_acc = 100. * correct / total
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        
        val_loss = val_loss / len(val_loader)
        val_acc = 100. * correct / total
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
    
    return {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_accs': train_accs,
        'val_accs': val_accs
    }

print("训练函数定义完成!")

训练函数定义完成!


## 7. 模型评估

In [13]:
def evaluate_model(model, test_loader, device='cuda'):
    """评估模型性能"""
    
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # 计算评估指标
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
    
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    cm = confusion_matrix(all_labels, all_preds)
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'confusion_matrix': cm
    }

print("评估函数定义完成!")

评估函数定义完成!


## 8. 可视化工具

In [14]:
def plot_training_history(history):
    """绘制训练历史"""
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    # 损失曲线
    axes[0].plot(history['train_losses'], label='训练损失')
    axes[0].plot(history['val_losses'], label='验证损失')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('训练和验证损失曲线')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 准确率曲线
    axes[1].plot(history['train_accs'], label='训练准确率')
    axes[1].plot(history['val_accs'], label='验证准确率')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy (%)')
    axes[1].set_title('训练和验证准确率曲线')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('user_data/training_curves.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_confusion_matrix(cm, class_names):
    """绘制混淆矩阵"""
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('预测标签')
    plt.ylabel('真实标签')
    plt.title('混淆矩阵')
    plt.tight_layout()
    plt.savefig('prediction_result/confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.show()

print("可视化工具定义完成!")

可视化工具定义完成!


## 9. 主训练流程

In [15]:
def main():
    """主训练流程"""
    
    # 设置设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"使用设备: {device}")
    
    # 创建数据集（这里需要实际数据）
    # train_dataset = SteelDefectDataset(train_dir, transform=train_transform)
    # val_dataset = SteelDefectDataset(val_dir, transform=val_transform)
    # test_dataset = SteelDefectDataset(test_dir, transform=val_transform)
    
    # 创建数据加载器
    # train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
    # val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
    # test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
    
    # 创建模型
    model = CNNModel(num_classes=6)
    
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    print("模型和优化器设置完成!")
    print("注意: 需要实际数据才能运行训练流程")
    
    return model, criterion, optimizer

# 运行主流程
model, criterion, optimizer = main()

使用设备: cuda
模型和优化器设置完成!
注意: 需要实际数据才能运行训练流程


## 10. 保存和加载模型

In [16]:
def save_model(model, path='model/cnn_v1_model.pth'):
    """保存模型"""
    torch.save({
        'model_state_dict': model.state_dict(),
        'model_architecture': model.__class__.__name__
    }, path)
    print(f"模型已保存到: {path}")

def load_model(path='model/cnn_v1_model.pth', model_class=CNNModel, num_classes=6):
    """加载模型"""
    checkpoint = torch.load(path, map_location='cpu')
    model = model_class(num_classes=num_classes)
    model.load_state_dict(checkpoint['model_state_dict'])
    print(f"模型已从 {path} 加载")
    return model

print("模型保存和加载函数定义完成!")

模型保存和加载函数定义完成!


## 11. 预测和结果保存

In [17]:
def save_predictions(predictions, labels, image_paths, class_names, save_path='prediction_result/test_predictions.csv'):
    """保存预测结果"""
    results = []
    for i, (pred, true_label, img_path) in enumerate(zip(predictions, labels, image_paths)):
        results.append({
            'image_id': i,
            'image_path': img_path,
            'true_label': class_names[true_label],
            'predicted_label': class_names[pred],
            'is_correct': pred == true_label
        })
    
    df = pd.DataFrame(results)
    df.to_csv(save_path, index=False)
    print(f"预测结果已保存到: {save_path}")
    return df

def save_metrics(metrics, save_path='prediction_result/performance_metrics.csv'):
    """保存性能指标"""
    metrics_df = pd.DataFrame([{
        'accuracy': metrics['accuracy'],
        'precision': metrics['precision'],
        'recall': metrics['recall'],
        'f1_score': metrics['f1_score']
    }])
    metrics_df.to_csv(save_path, index=False)
    print(f"性能指标已保存到: {save_path}")

print("结果保存函数定义完成!")

结果保存函数定义完成!


## 12. 总结和下一步计划

In [18]:
print("钢铁缺陷检测项目框架搭建完成!")
print("\n下一步计划:")
print("1. 准备钢铁缺陷数据集")
print("2. 运行数据预处理和增强")
print("3. 训练CNN基础模型")
print("4. 评估模型性能")
print("5. 尝试ResNet和EfficientNet等先进模型")
print("6. 优化模型参数和超参数")
print("7. 部署模型进行实时检测")

钢铁缺陷检测项目框架搭建完成!

下一步计划:
1. 准备钢铁缺陷数据集
2. 运行数据预处理和增强
3. 训练CNN基础模型
4. 评估模型性能
5. 尝试ResNet和EfficientNet等先进模型
6. 优化模型参数和超参数
7. 部署模型进行实时检测
