In [2]:
import mindspore
import mindspore.nn as nn
import mindspore.dataset as ds

# 定义超参数
num_classes = 10 # 分类的类别数
num_features = 784 # 输入的特征数（28x28的图片）
hidden_size = 256 # 隐藏层的神经元数
batch_size = 64 # 批次大小
learning_rate = 0.01 # 学习率
num_epochs = 10 # 训练的轮数

# 创建数据集，这里使用MNIST数据集作为示例
train_dataset = ds.MnistDataset("mnist/train")
test_dataset = ds.MnistDataset("mnist/test")

# 定义数据转换和增强操作
transform = [
ds.ToTensor(), # 将图片转换为张量
ds.Normalize((0.1307,), (0.3081,)) # 将图片归一化
]

# 应用数据转换和增强操作
train_dataset = train_dataset.map(operations=transform, input_columns="image")
test_dataset = test_dataset.map(operations=transform, input_columns="image")

# 创建数据加载器
train_loader = ds.GeneratorDataset(train_dataset, ["image", "label"], batch_size=batch_size, shuffle=True)
test_loader = ds.GeneratorDataset(test_dataset, ["image", "label"], batch_size=batch_size)

# 定义全连接神经网络模型
class FCN(nn.Cell):
    def __init__(self, num_features, hidden_size, num_classes):
        super(FCN, self).__init__()
        self.fc1 = nn.Dense(num_features, hidden_size) # 第一层全连接层，输入特征数为num_features，输出特征数为hidden_size
        self.relu = nn.ReLU() # 激活函数，使用ReLU函数
        self.fc2 = nn.Dense(hidden_size, num_classes) # 第二层全连接层，输入特征数为hidden_size，输出特征数为num_classes

    def construct(self, x):
        x = self.fc1(x) # 将输入x通过第一层全连接层得到输出x
        x = self.relu(x) # 将输出x通过激活函数得到输出x
        x = self.fc2(x) # 将输出x通过第二层全连接层得到输出x
        return x

# 创建模型实例
model = FCN(num_features, hidden_size, num_classes)

# 定义优化器，使用随机梯度下降（SGD）算法
optimizer = nn.SGD(model.trainable_params(), learning_rate=learning_rate)

# 定义损失函数，使用交叉熵损失函数
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True)

# 定义评估指标，使用准确率（accuracy）
metric = mindspore.nn.Accuracy()

# 定义训练循环函数
def train(model, train_loader, loss_fn, optimizer, metric):
    model.train() # 将模型设置为训练模式
    for images, labels in train_loader: # 遍历训练数据集中的每个批次
        images = images.reshape(-1, num_features) # 将图片张量展平为一维向量，方便输入全连接层
        output = model(images) # 将输入图片通过模型得到输出预测值
        loss = loss_fn(output, labels) # 计算预测值和真实标签之间的损失值
        optimizer.clear_grad() # 清除梯度缓存
        loss.backward() # 反向传播计算梯度值
        optimizer.step() # 更新模型参数
        metric.update(output, labels) # 更新评估指标的值

# 定义测试循环函数
def test(model, test_loader, metric):
    model.eval() # 将模型设置为评估模式
    for images, labels in test_loader: # 遍历测试数据集中的每个批次
        images = images.reshape(-1, num_features) # 将图片张量展平为一维向量，方便输入全连接层
        output = model(images) # 将输入图片通过模型得到输出预测值
        metric.update(output, labels) # 更新评估指标的值

# 开始训练和测试模型
for epoch in range(num_epochs): # 遍历每个训练轮数
    train(model, train_loader, loss_fn, optimizer, metric) # 调用训练循环函数进行训练
    train_acc = metric.eval() # 计算训练集上的准确率值
    metric.clear() # 清除评估指标的缓存值
    test(model, test_loader, metric) # 调用测试循环函数进行测试
    test_acc = metric.eval() # 计算测试集上的准确率值
    metric.clear() # 清除评估指标的缓存值
    print(f"Epoch {epoch+1}, Train Accuracy: {train_acc}, Test Accuracy: {test_acc}") # 打印每个轮数的训练集和测试集上的准确率值

In [3]:
import mindspore
import mindspore.nn as nn
import mindspore.ops as ops
import numpy as np

# 定义超参数
scale = 0.01 # L1正则化因子
learning_rate = 0.1 # 学习率
epochs = 100 # 迭代次数

# 定义数据集
x = np.array([[0.2, 0.7], [0.3, 0.9], [0.4, 0.8], [0.6, 0.4], [0.7, 0.5], [0.9, 0.3]])
y = np.array([[2.4], [3.2], [3.6], [2.8], [3.4], [3.8]])
x = mindspore.Tensor(x, mindspore.float32)
y = mindspore.Tensor(y, mindspore.float32)

# 定义模型
class LinearRegression(nn.Cell):
    def __init__(self):
        super(LinearRegression, self).__init__()
        self.w = mindspore.Parameter(mindspore.Tensor(np.random.randn(2, 1), mindspore.float32), name="w")
        self.b = mindspore.Parameter(mindspore.Tensor(np.random.randn(1), mindspore.float32), name="b")
        self.matmul = ops.MatMul()
        self.add = ops.Add()
        self.l1_regularizer = nn.L1Regularizer(scale) # L1正则化函数

    def construct(self, x):
        y_pred = self.add(self.matmul(x, self.w), self.b) # 预测值
        reg_loss = self.l1_regularizer(self.w) # 正则化损失
        return y_pred, reg_loss

# 定义损失函数
def mse_loss(y_true, y_pred):
    return ops.ReduceMean()(ops.Square()(y_true - y_pred)) # 均方误差损失


# 定义模型
net = LinearRegression()

# 定义优化器
optimizer = nn.SGD(params=net.trainable_params(), learning_rate=learning_rate)

# 定义训练过程
def train(net, x, y):
    for epoch in range(epochs):
        y_pred, reg_loss = net(x) # 前向传播
        loss = mse_loss(y, y_pred) + reg_loss # 总损失
        grads = mindspore.grad_all_with_sens(net)(x, loss) # 计算梯度
        optimizer(grads) # 更新参数
        print(f"Epoch {epoch}, Loss {loss.asnumpy()}")
        return net

# 训练模型
net = train(net, x, y)

# 打印模型参数
print("w:", net.w.asnumpy())
print("b:", net.b.asnumpy())

AttributeError: module 'mindspore' has no attribute 'grad_all_with_sens'

In [None]:
import mindspore
import mindspore.nn as nn
from mindspore.train.callback import Callback

# 定义一个自定义的回调函数，继承自Callback类
class LossAccCallback(Callback):
    def __init__(self, model, train_dataset, test_dataset):
        super(LossAccCallback, self).__init__()
        self.model = model # 模型实例
        self.train_dataset = train_dataset # 训练数据集
        self.test_dataset = test_dataset # 测试数据集
        self.train_loss = [] # 记录每个epoch的训练集上的loss
        self.train_acc = [] # 记录每个epoch的训练集上的accuracy
        self.test_loss = [] # 记录每个epoch的测试集上的loss
        self.test_acc = [] # 记录每个epoch的测试集上的accuracy

    def epoch_end(self, run_context):
    # 在每个epoch结束时，调用该方法
        cb_params = run_context.original_args() # 获取回调参数
        epoch_num = cb_params.cur_epoch_num # 获取当前的epoch数
        train_loss = self.model.eval(self.train_dataset, dataset_sink_mode=False)["loss"] # 获取训练集上的loss
        train_acc = self.model.eval(self.train_dataset, dataset_sink_mode=False)["acc"] # 获取训练集上的accuracy
        test_loss = self.model.eval(self.test_dataset, dataset_sink_mode=False)["loss"] # 获取测试集上的loss
        test_acc = self.model.eval(self.test_dataset, dataset_sink_mode=False)["acc"] # 获取测试集上的accuracy
        self.train_loss.append(train_loss) # 将训练集上的loss添加到列表中
        self.train_acc.append(train_acc) # 将训练集上的accuracy添加到列表中
        self.test_loss.append(test_loss) # 将测试集上的loss添加到列表中
        self.test_acc.append(test_acc) # 将测试集上的accuracy添加到列表中
        print(f"Epoch {epoch_num}, train loss: {train_loss}, train acc: {train_acc}, test loss: {test_loss}, test acc: {test_acc}") # 打印每个epoch的结果

# 创建模型实例，这里使用一个全连接神经网络作为示例
model = nn.Dense(784, 10)

# 创建损失函数实例，这里使用交叉熵损失函数作为示例
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True)

# 创建优化器实例，这里使用Adam优化器作为示例
optimizer = nn.Adam(model.trainable_params())

# 创建训练数据集和测试数据集，这里使用MNIST数据集作为示例
train_dataset = mindspore.dataset.MnistDataset("mnist_data", usage="train")
test_dataset = mindspore.dataset.MnistDataset("mnist_data", usage="test")

# 创建自定义的回调函数实例，传入模型，训练数据集和测试数据集参数
callback = LossAccCallback(model, train_dataset, test_dataset)

# 调用model.train方法，开始训练模型，传入callback参数为自定义的回调函数实例
model.train(epoch=10, train_dataset=train_dataset, callbacks=callback)