In [3]:
import os
from torchvision import transforms
import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader, Dataset
from glob import glob
from matplotlib import pyplot as plt
import numpy as np
import cv2
import random

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [4]:
tfms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])
normalize = transforms.Normalize(mean=[0.485,0.456,0.406],std=[0.229,0.224,0.225])
denormalize = transforms.Normalize(
    mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],
    std=[1/0.229, 1/0.224, 1/0.255]
)
#预处理函数 preprocess image
def preprocess_image(img):
    img = torch.tensor(img).permute(2,0,1)[None].float
    img = normalize(img)
    return img.to(device)
#数据读取路径 read path
def stems(split):
    # split 是 'train' 或 'test'
    img_dir = f"D:/wenbo/dataset1/dataset1/images_prepped_{split}/"
    # 找到所有图片文件
    items = glob.glob(os.path.join(img_dir, "*.png"))
    # 只取文件名，不要路径和扩展名
    stems = [os.path.splitext(os.path.basename(p))[0] for p in items]
    return stems
#目标： 把原来 2D 的 img (224×224) 变成 seg_labels (224×224×n_classes)
def get_segmentation_arr(img, n_classes):
    seg_labels = np.zeros((224, 224, n_classes))
    for c in range(n_classes):
        seg_labels[:, :, c] = (img == c).astype(int)
    return seg_labels


In [5]:
class SegData(Dataset):
    def __init__(self, split):
        self.items = stems(split)
        # print(self.items)
        self.split = split
    def __len__(self):
        return len(self.items)
    def __getitem__(self, ix):
        image = cv2.imread(f'dataset1/images_prepped_{self.split}/{self.items[ix]}.jpg')
        image = cv2.resize(image, (224,224))
        mask  = cv2.imread(f'dataset1/annotations_prepped_{self.split}/{self.items[ix]}.png')
        mask  = cv2.resize(mask, (224,224))
        return image, mask
    def choose(self):#随机选一张图和标签
        return self[random.randint(len(self))]
    def collate_fn(self, batch):#拼接加预处理
        ims, ce_masks = [], []
        for item in batch:
            img, mask = item
            img = preprocess_image(img)
            ims.append(img)
            ce_masks.append(torch.tensor(mask)[None].long())
        images = torch.cat(ims).to(device)
        ce_masks = torch.cat(ce_masks).to(device)
        return images, ce_masks


In [6]:
#定义训练集和验证数据集 数据加载器
trn_ds = SegData('train')
val_ds = SegData('val')
trn_dl = DataLoader(trn_ds, batch_size=4, shuffle=True, collate_fn=trn_ds.collate_fn)
val_dl = DataLoader(val_ds, batch_size=1, shuffle=True, collate_fn=val_ds.collate_fn)

AttributeError: 'function' object has no attribute 'glob'

In [12]:
#定义卷积构建块
def conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels,kernel_size=3,stride=1,padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    )
def up_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
        nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )

In [13]:
from torchvision.models import vgg16_bn
class UNet(nn.Module):
    def __init__(self, pretrained=True, out_channels=12):
        super().__init__()
        self.encoder = vgg16_bn(pretrained=pretrained).features

        self.block1 = nn.Sequential(*self.encoder[:6])
        self.block2 = nn.Sequential(*self.encoder[6:13])
        self.block3 = nn.Sequential(*self.encoder[13:20])
        self.block4 = nn.Sequential(*self.encoder[20:27])
        self.block5 = nn.Sequential(*self.encoder[27:34])
        #decoder
        self.up_conv6 = up_conv(1024, 512)
        self.conv6 = conv(512 + 512, 512)
        self.up_conv7 = up_conv(512, 256)
        self.conv7 = conv(256 + 512, 256)
        self.up_conv8 = up_conv(256, 128)
        self.conv8 = conv(128 + 256, 128)
        self.up_conv9 = up_conv(128, 64)
        self.conv9 = conv(64 + 128, 64)
        self.up_conv10 = up_conv(64, 32)
        self.conv10 = conv(32 + 64, 32)
        self.conv11 = nn.Conv2d(32, out_channels, kernel_size=1)

In [14]:
#前向传播
def forward(self, x):
    block1 = self.block1(x)
    block2 = self.block2(block1)
    block3 = self.block3(block2)
    block4 = self.block4(block3)
    block5 = self.block5(block4)

    bottleneck = self.bottleneck(block5)
    x = self.conv_bottleneck(bottleneck)

    x = self.up_conv6(x)
    x = torch.cat([x, block5], dim=1)
    x = self.conv6(x)

    x = self.up_conv7(x)
    x = torch.cat([x, block4], dim=1)
    x = self.conv7(x)

    x = self.up_conv8(x)
    x = torch.cat([x, block3], dim=1)
    x = self.conv8(x)

    x = self.up_conv9(x)
    x = torch.cat([x, block2], dim=1)
    x = self.conv9(x)

    x = self.up_conv10(x)
    x = torch.cat([x, block1], dim=1)
    x = self.conv10(x)

    x = self.conv11(x)
    return x


In [15]:
#定义函数UNetLoss计算损失和准确率值
ce = nn.CrossEntropyLoss()

def UNetLoss(preds, targets):
    ce_loss = ce(preds, targets)
    acc = (torch.max(preds, 1)[1] == targets).float().mean()
    return ce_loss, acc
def train_batch(model, data, optimizer, criterion):
    model.train()                          # 打开训练模式（启用BN/Dropout等）
    ims, ce_masks = data                   # 解包 dataloader 给的一批数据

    _masks = model(ims)                    # 前向，得到 logits (B, C, H, W)
    optimizer.zero_grad()                  # 清空历史梯度

    loss, acc = criterion(_masks, ce_masks)# 计算交叉熵与像素准确率
    loss.backward()                        # 反向传播
    optimizer.step()                       # 更新参数（如 Adam/SGD）

    return loss.item(), acc.item()
@torch.no_grad()
def validate_batch(model, data, criterion):
    model.eval()                           # 评估模式（冻结BN/关闭Dropout）
    ims, masks = data

    _masks = model(ims)                    # 前向
    loss, acc = criterion(_masks, masks)   # 只算指标，不反传
    return loss.item(), acc.item()
model = UNet().to(device)
criterion = UNetLoss
optimizer = optim.Adam(model.parameters(), lr=1e-3)
n_epochs = 30


In [16]:
#训练模型
train_loss_epochs = []
val_loss_epochs = []
for epoch in range(n_epochs):
    N = len(trn_dl)
    trn_loss = []
    val_loss = []
for ix, data in enumerate(trn_dl):
    loss, acc = train_batch(model, data, optimizer, criterion)
    pos = (epoch + (ix+1)/N)
    trn_loss.append(loss)
train_loss_epochs.append(np.average(trn_loss))
N = len(val_dl)
for bx, data in enumerate(val_dl):
    loss, acc = validate_batch(model, data, criterion)
    pos = (epoch + (ix+1)/N)
    val_loss.append(loss)
val_loss_epochs.append(np.average(val_loss))
epochs = np.arange(n_epochs)+1
plt.plot(epochs, train_loss_epochs, 'bo-', label='Training loss')
plt.plot(epochs, val_loss_epochs, 'r-', label='Test loss')
plt.title('Training and Test loss over increasing epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(False)
plt.show()


NameError: name 'trn_dl' is not defined

In [None]:
im,mask = next(iter(val_dl))
_mask = model(im)
_,_mask = torch.max(_mask,dim=1)