## 下载预训练权重

In [None]:
!pip install download

In [None]:
from download import download
url = "https://ascend-professional-construction-dataset.obs.cn-north-4.myhuaweicloud.com:443/ComputerVision/mobilenetV2-200_1067.zip" 
path = download(url, "./", kind="zip", replace=True)

In [None]:
import zipfile
import os

# Define the paths to the zip files
zip_files = "mobilenetV2-200_1067.zip"

# Extract each zip file to the current directory
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    # Extract all the contents into the current directory
    zip_ref.extractall(os.getcwd())
    print(f"Extracted {zip_file} to {os.getcwd()}")

## 导入库

In [None]:
import math
import numpy as np
import os
import random

from matplotlib import pyplot as plt
from easydict import EasyDict
from PIL import Image
import numpy as np
import mindspore.nn as nn
from mindspore import ops as P
from mindspore.ops import add
from mindspore import Tensor
import mindspore.common.dtype as mstype
import mindspore.dataset as de
import mindspore.dataset.vision as C
import mindspore.dataset.transforms as C2
import mindspore as ms
from mindspore import set_context, nn, Tensor, load_checkpoint, save_checkpoint, export
from mindspore.train import Model
from mindspore.train import Callback, LossMonitor, ModelCheckpoint, CheckpointConfig

# Log level includes 3(ERROR), 2(WARNING), 1(INFO), 0(DEBUG).
os.environ['GLOG_v'] = '3'  # Set logging level
set_context(mode=ms.GRAPH_MODE, device_target="Ascend", device_id=0)  # 设置采用图模式执行，设备为Ascend


In [None]:
# 垃圾分类数据集标签，以及用于标签映射的字典。
garbage_classes = {
    '第一部分': ['NS', 'S'],
    '第二部分': ['塑料瓶盖', '饮料瓶', '玻璃瓶', '纸杯', '塑料袋', '垃圾袋']
}

class_cn = ['NS', 'S']
class_en = ['NS', 'S']
index_en = {'NS': 0, 'S': 1}

# 配置参数
config = EasyDict({
    "num_classes": 2,
    "image_height": 224,
    "image_width": 224,
    "data_split": (0.9, 0.1),
    "backbone_out_channels": 1280,
    "batch_size": 32,
    "eval_batch_size": 8,
    "epochs": 20,
    "lr_max": 0.0005,
    "momentum": 0.9,
    "weight_decay": 1e-4,
    "save_ckpt_epochs": 1,
    "save_ckpt_path": "./ckpt2", #./表示当前目录下
    # "dataset_path": "./data_en",
    "dataset_path": "./model_generator",  # Updated to new dataset path
    "class_index": index_en,
    "pretrained_ckpt": "./mobilenetV2-200_1067.ckpt"
})

## 数据集定义

In [None]:
def create_dataset(dataset_path, config, training=True, buffer_size=1000):
    """ 
    create a train or eval dataset

    Args:
        dataset_path (string): the path of dataset.
        config (struct): the config of train and eval in different platform.

    Returns:
        train_dataset, val_dataset
    """
    data_path = os.path.join(dataset_path, 'train' if training else 'test')
    ds = de.ImageFolderDataset(data_path, num_parallel_workers=4, class_indexing=config.class_index)
    resize_height = config.image_height
    resize_width = config.image_width

    normalize_op = C.Normalize(mean=[0.485*255, 0.456*255, 0.406*255], std=[0.229*255, 0.224*255, 0.225*255])
    change_swap_op = C.HWC2CHW()
    type_cast_op = C2.TypeCast(mstype.int32)

    if training:
        crop_decode_resize = C.RandomCropDecodeResize(resize_height, scale=(0.08, 1.0), ratio=(0.75, 1.333))
        horizontal_flip_op = C.RandomHorizontalFlip(prob=0.5)
        #color_adjust = C.RandomColorAdjust(brightness=0.4, contrast=0.4, saturation=0.4)

        #train_trans = [crop_decode_resize, horizontal_flip_op, color_adjust, normalize_op, change_swap_op]
        train_trans = [crop_decode_resize, horizontal_flip_op, normalize_op, change_swap_op]
        train_ds = ds.map(input_columns="image", operations=train_trans, num_parallel_workers=4)
        train_ds = train_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=4)

        train_ds = train_ds.shuffle(buffer_size=buffer_size)
        ds = train_ds.batch(config.batch_size, drop_remainder=True)

    else:
        decode_op = C.Decode()
        resize_op = C.Resize((int(resize_width * 0.875), int(resize_height * 0.875)))
        center_crop = C.CenterCrop(resize_width)

        eval_trans = [decode_op, resize_op, center_crop, normalize_op, change_swap_op]
        eval_ds = ds.map(input_columns="image", operations=eval_trans, num_parallel_workers=4)
        eval_ds = eval_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=4)
        ds = eval_ds.batch(config.eval_batch_size, drop_remainder=True)

    return ds

In [None]:
# 显示处理过的前4张图片
ds = create_dataset(dataset_path=config.dataset_path, config=config, training=True)
print(ds.get_dataset_size())
data = ds.create_dict_iterator(output_numpy=True)._get_next()
images = data['image']
labels = data['label']

for i in range(1, 5):
    plt.subplot(2, 2, i)
    plt.imshow(np.transpose(images[i], (1, 2, 0)))
    plt.title(f'标签: {class_en[labels[i]]}')
    plt.xticks([])
    plt.yticks([])

plt.show()

## MobileNetV2模型搭建

In [None]:
import math
from mindspore import nn, Tensor, ops as P
import numpy as np

__all__ = ['MobileNetV2', 'MobileNetV2Backbone', 'MobileNetV2Head', 'mobilenet_v2']

def _make_divisible(v, divisor, min_value=None):
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class GlobalAvgPooling(nn.Cell):
    def __init__(self):
        super(GlobalAvgPooling, self).__init__()

    def construct(self, x):
        x = P.ReduceMean()(x, (2, 3))
        return x

class ConvBNReLU(nn.Cell):
    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
        super(ConvBNReLU, self).__init__()
        padding = (kernel_size - 1) // 2
        in_channels = in_planes
        out_channels = out_planes
        if groups == 1:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='pad', padding=padding)
        else:
            out_channels = in_planes
            conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='pad',
                             padding=padding, group=in_channels)

        layers = [conv, nn.BatchNorm2d(out_planes), nn.ReLU6()]
        self.features = nn.SequentialCell(layers)

    def construct(self, x):
        output = self.features(x)
        return output

class InvertedResidual(nn.Cell):
    def __init__(self, inp, oup, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        assert stride in [1, 2]

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = stride == 1 and inp == oup

        layers = []
        if expand_ratio != 1:
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
        layers.extend([
            ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
            nn.Conv2d(hidden_dim, oup, kernel_size=1, stride=1, has_bias=False),
            nn.BatchNorm2d(oup),
        ])
        self.conv = nn.SequentialCell(layers)
        self.cast = P.Cast()

    def construct(self, x):
        identity = x
        x = self.conv(x)
        if self.use_res_connect:
            return P.Add()(identity, x)
        return x

class MobileNetV2Backbone(nn.Cell):
    def __init__(self, width_mult=1., inverted_residual_setting=None, round_nearest=8,
                 input_channel=32, last_channel=1280):
        super(MobileNetV2Backbone, self).__init__()
        block = InvertedResidual
        self.cfgs = inverted_residual_setting or [
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]

        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.out_channels = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features = [ConvBNReLU(3, input_channel, stride=2)]
        
        for t, c, n, s in self.cfgs:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t))
                input_channel = output_channel
        features.append(ConvBNReLU(input_channel, self.out_channels, kernel_size=1))
        self.features = nn.SequentialCell(features)
        self._initialize_weights()

    def construct(self, x):
        x = self.features(x)
        return x

    def _initialize_weights(self):
        self.init_parameters_data()
        for _, m in self.cells_and_names():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.set_data(Tensor(np.random.normal(0, np.sqrt(2. / n), m.weight.data.shape).astype("float32")))
                if m.bias is not None:
                    m.bias.set_data(Tensor(np.zeros(m.bias.data.shape, dtype="float32")))
            elif isinstance(m, nn.BatchNorm2d):
                m.gamma.set_data(Tensor(np.ones(m.gamma.data.shape, dtype="float32")))
                m.beta.set_data(Tensor(np.zeros(m.beta.data.shape, dtype="float32")))

    @property
    def get_features(self):
        return self.features

class MobileNetV2Head(nn.Cell):
    def __init__(self, input_channel=1280, num_classes=1000, has_dropout=False, activation="None"):
        super(MobileNetV2Head, self).__init__()
        head = ([GlobalAvgPooling(), nn.Dense(input_channel, num_classes, has_bias=True)] if not has_dropout else
                [GlobalAvgPooling(), nn.Dropout(0.2), nn.Dense(input_channel, num_classes, has_bias=True)])
        self.head = nn.SequentialCell(head)
        self.need_activation = True
        if activation == "Sigmoid":
            self.activation = nn.Sigmoid()
        elif activation == "Softmax":
            self.activation = nn.Softmax()
        else:
            self.need_activation = False
        self._initialize_weights()

    def construct(self, x):
        x = self.head(x)
        if self.need_activation:
            x = self.activation(x)
        return x

    def _initialize_weights(self):
        self.init_parameters_data()
        for _, m in self.cells_and_names():
            if isinstance(m, nn.Dense):
                m.weight.set_data(Tensor(np.random.normal(0, 0.01, m.weight.data.shape).astype("float32")))
                if m.bias is not None:
                    m.bias.set_data(Tensor(np.zeros(m.bias.data.shape, dtype="float32")))

    @property
    def get_head(self):
        return self.head

class MobileNetV2(nn.Cell):
    def __init__(self, num_classes=1000, width_mult=1., has_dropout=False, inverted_residual_setting=None,
                 round_nearest=8, input_channel=32, last_channel=1280):
        super(MobileNetV2, self).__init__()
        self.backbone = MobileNetV2Backbone(width_mult=width_mult, inverted_residual_setting=inverted_residual_setting,
                                            round_nearest=round_nearest, input_channel=input_channel,
                                            last_channel=last_channel).get_features
        self.head = MobileNetV2Head(input_channel=self.backbone.out_channels, num_classes=num_classes,
                                    has_dropout=has_dropout).get_head

    def construct(self, x):
        x = self.backbone(x)
        x = self.head(x)
        return x

class MobileNetV2Combine(nn.Cell):
    def __init__(self, backbone, head):
        super(MobileNetV2Combine, self).__init__(auto_prefix=False)
        self.backbone = backbone
        self.head = head

    def construct(self, x):
        x = self.backbone(x)
        x = self.head(x)
        return x

def mobilenet_v2(backbone, head):
    return MobileNetV2Combine(backbone, head)

def cosine_lr_schedule(total_steps, lr_init, lr_end, lr_max, warmup_steps):
    """
    Generate learning rate array with a cosine decay and linear warmup.

    Args:
       total_steps (int): Total training steps.
       lr_init (float): Initial learning rate.
       lr_end (float): Final learning rate.
       lr_max (float): Maximum learning rate.
       warmup_steps (int): Number of warmup steps.

    Returns:
       list: Learning rate array.
    """
    lr_init, lr_end, lr_max = float(lr_init), float(lr_end), float(lr_max)
    decay_steps = total_steps - warmup_steps
    lr_all_steps = []
    inc_per_step = (lr_max - lr_init) / warmup_steps if warmup_steps else 0
    for i in range(total_steps):
        if i < warmup_steps:
            lr = lr_init + inc_per_step * (i + 1)
        else:
            cosine_decay = 0.5 * (1 + math.cos(math.pi * (i - warmup_steps) / decay_steps))
            lr = (lr_max - lr_end) * cosine_decay + lr_end
        lr_all_steps.append(lr)

    return lr_all_steps

添加检查点 Checkpoint

In [None]:
def switch_precision(net, data_type):
    if ms.get_context('device_target') == "Ascend":
        net.to_float(data_type)
        for _, cell in net.cells_and_names():
            if isinstance(cell, nn.Dense):
                cell.to_float(ms.float32)

## 模型训练与测试

In [None]:
from mindspore.amp import FixedLossScaleManager
from mindspore import save_checkpoint
import os
import mindspore as ms

# 设置绝对路径以避免路径解析问题
CKPT_PATH = os.path.abspath("./ckpt")
os.makedirs(CKPT_PATH, exist_ok=True)

LOSS_SCALE = 1024

# 加载数据集
train_dataset = create_dataset(dataset_path=config.dataset_path, config=config)
eval_dataset = create_dataset(dataset_path=config.dataset_path, config=config)
step_size = train_dataset.get_dataset_size()

# 设置模型
backbone = MobileNetV2Backbone()

# 冻结backbone的参数（如果不需要训练）
for param in backbone.get_parameters():
    param.requires_grad = False

# 从预训练模型中加载参数
load_checkpoint(config.pretrained_ckpt, backbone)

head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
network = mobilenet_v2(backbone, head)

# 定义损失函数、优化器和学习率调度
loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
loss_scale = FixedLossScaleManager(LOSS_SCALE, drop_overflow_update=False)
lrs = cosine_lr_schedule(config.epochs * step_size, lr_init=0.0, lr_end=1e-5, lr_max=config.lr_max, warmup_steps=5)
opt = nn.Momentum(network.trainable_params(), learning_rate=lrs, momentum=config.momentum, weight_decay=config.weight_decay, loss_scale=LOSS_SCALE)

# 训练循环函数
def train_loop(model, dataset, loss_fn, optimizer):
    def forward_fn(data, label):
        logits = model(data)
        loss = loss_fn(logits, label)
        return loss

    grad_fn = ms.value_and_grad(forward_fn, None, optimizer.parameters)

    def train_step(data, label):
        loss, grads = grad_fn(data, label)
        optimizer(grads)
        return loss

    size = dataset.get_dataset_size()
    model.set_train()
    for batch, (data, label) in enumerate(dataset.create_tuple_iterator()):
        loss = train_step(data, label)

        if batch % 10 == 0:
            loss_val, current = loss.asnumpy(), batch
            print(f"loss: {loss_val:>7f}  [{current:>3d}/{size:>3d}]")

# 测试循环函数
def test_loop(model, dataset, loss_fn):
    num_batches = dataset.get_dataset_size()
    model.set_train(False)
    total, test_loss, correct = 0, 0, 0
    for data, label in dataset.create_tuple_iterator():
        pred = model(data)
        total += data.shape[0]
        test_loss += loss_fn(pred, label).asnumpy()
        correct += (pred.argmax(axis=1) == label).asnumpy().sum()
    test_loss /= num_batches
    correct /= total
    print(f"Test: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

# 训练与评估
print("============== Starting Training ==============")
epochs = config.epochs
for t in range(epochs):
    print(f"Epoch {t + 1}\n-------------------------------")
    train_loop(network, train_dataset, loss, opt)
    save_checkpoint(network, os.path.join(CKPT_PATH, "save_mobilenetV2_model.ckpt"))
    test_loop(network, eval_dataset, loss)
print("Done!")

## 模型预测

In [None]:
import numpy as np
import os
from PIL import Image
from mindspore import Tensor, load_checkpoint
from sklearn.metrics import confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt


def image_process(image):
    """Process one image at a time.

    Args:
        image: shape (H, W, C)
    """
    mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]
    std = [0.229 * 255, 0.224 * 255, 0.225 * 255]
    image = (np.array(image) - mean) / std
    image = image.transpose((2, 0, 1))  # Change to (C, H, W) format
    img_tensor = Tensor(np.array([image], np.float32))  # Add batch dimension
    return img_tensor

def infer_one(network, image_path):
    """Infer a single image and return predicted label."""
    image = Image.open(image_path).resize((config.image_height, config.image_width))
    logits = network(image_process(image))
    pred = np.argmax(logits.asnumpy(), axis=1)[0]
    return pred

def infer_folder(network, folder_path, label_map):
    """Infer all images in a folder."""
    true_labels = []
    pred_labels = []
    
    for class_name, label in label_map.items():
        class_folder = os.path.join(folder_path, class_name)
        if not os.path.exists(class_folder):
            continue
        
        for image_name in os.listdir(class_folder):
            image_path = os.path.join(class_folder, image_name)
            true_labels.append(label)
            pred_labels.append(infer_one(network, image_path))
    
    return true_labels, pred_labels

def plot_metrics(true_labels, pred_labels, label_map):
    """Plot confusion matrix and ROC curve."""
    # Confusion Matrix
    cm = confusion_matrix(true_labels, pred_labels)
    print("Confusion Matrix:\n", cm)
    
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.colorbar()
    plt.xticks(ticks=np.arange(len(label_map)), labels=label_map.keys(), rotation=45)
    plt.yticks(ticks=np.arange(len(label_map)), labels=label_map.keys())
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")
    
    # ROC Curve
    n_classes = len(label_map)
    y_true = np.eye(n_classes)[true_labels]  # One-hot encode true labels
    y_pred_proba = np.eye(n_classes)[pred_labels]  # Simulated probabilities for simplicity

    plt.subplot(1, 2, 2)
    for i, label in enumerate(label_map.keys()):
        fpr, tpr, _ = roc_curve(y_true[:, i], y_pred_proba[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f"Class {label} (AUC = {roc_auc:.2f})")

    plt.plot([0, 1], [0, 1], "k--")
    plt.title("ROC Curve")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.legend(loc="lower right")
    plt.tight_layout()
    plt.show()

def infer_sleep(folder_path):
    # Load network
    backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)
    head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
    network = mobilenet_v2(backbone, head)
    load_checkpoint(os.path.join(config.save_ckpt_path, CKPT), network)
    
    # Define label mapping (adjust according to your dataset)
    label_map = {"NS": 0, "S": 1}  # Example class mapping
    
    # Folder containing subfolders for each class
    #folder_path = "./For_test2/" +file_name
    
    # Perform inference and compute metrics
    true_labels, pred_labels = infer_folder(network, folder_path, label_map)
    S_num = sum(pred_labels)
    print("Sleep_time: %.2f min" %(S_num/2))
    return S_num/2
    #plot_metrics(true_labels, pred_labels, label_map)

def infer_sas(folder_path):
    # Load network
    backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)
    head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
    network = mobilenet_v2(backbone, head)
    load_checkpoint(os.path.join(config.save_ckpt_path, CKPT), network)
    
    # Define label mapping (adjust according to your dataset)
    label_map = {"N": 0, "A": 1}  # Example class mapping
    
    # Perform inference and compute metrics
    true_labels, pred_labels = infer_folder(network, folder_path, label_map)
    A_num = sum(pred_labels)
    print("A_num: %d" %(A_num))
    return A_num
    
# Run inference
mother_name = './0-AP'
folder_path = mother_name + "/For_test_s"
config.save_ckpt_path = "./ckpt2"
CKPT = "save_mobilenetV2_SLEEP_model.ckpt"

# 获取当前路径下的文件名，返回List
fileNames = os.listdir(folder_path) 
for file in fileNames:
    # 将文件命加入到当前文件路径后面
    newDir = os.path.join(folder_path,file)
    time_60s = infer_sleep(newDir)
    print("%s 睡眠时长= %.2f min" %(file, time_60s))




# folder_path = mother_name + "./For_test/" +file_name
# config.save_ckpt_path = "./ckpt"
# CKPT = "save_mobilenetV2_model.ckpt"
# sas_num = infer_sas(folder_path)

# print("AHI = %.2f" %(sas_num*120/time_30s))