In [2]:
# 自动计算cell的计算时间
%load_ext autotime

%matplotlib inline
%config InlineBackend.figure_format='svg' #矢量图设置，让绘图更清晰

time: 703 ms (started: 2021-08-28 14:19:18 +08:00)


In [3]:
%%bash

# 增加更新
git add *.ipynb *.md

git remote -v

git commit -m '更新 #1 Aug 28, 2021'

#git push origin master
git push

origin	git@github.com:ustchope/pytorch_lightning_study.git (fetch)
origin	git@github.com:ustchope/pytorch_lightning_study.git (push)
[main 944aae4] 更新 #1 Aug 28, 2021
 1 file changed, 6 insertions(+)
 create mode 100644 "PL\345\210\206\344\270\244\346\255\245.ipynb"


To git@github.com:ustchope/pytorch_lightning_study.git
   9704207..944aae4  main -> main


time: 3.73 s (started: 2021-08-28 14:19:31 +08:00)


使用 PyTorch Lightning 组织您的代码使您的代码：
* 保持所有的灵活性（这都是纯 PyTorch），但删除了大量的样板
* 通过将研究代码与工程分离，更具可读性
* 更容易重现
* 通过自动化大部分训练循环和棘手的工程来减少出错的可能性
* 无需更改模型即可扩展到任何硬件

# 导入包

In [4]:
import os
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, random_split
import pytorch_lightning as pl

time: 4.88 s (started: 2021-08-28 14:21:27 +08:00)


# 第 1 步：定义 LightningModule

In [11]:
class LitAutoEncoder(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3))
        self.decoder = nn.Sequential(nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28))

    def forward(self, x):
        # 在 Lightning 中，forward 定义了预测/推理动作
        embedding = self.encoder(x)
        return embedding

    def training_step(self, batch, batch_idx):
        # training_step 定义了训练循环。
        # 独立于forward
        x, y = batch
        x = x.view(x.size(0), -1)
        z = self.encoder(x)
        x_hat = self.decoder(z)
        loss = F.mse_loss(x_hat, x)
        # 默认登录到 TensorBoard
        self.log("train_loss", loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

time: 1.12 ms (started: 2021-08-28 14:42:04 +08:00)


## 系统与模型

lightning module 模块定义的是系统而不是模型。

![](https://tva1.sinaimg.cn/large/008i3skNgy1gtwhk8xdjgj615y0p4jsp02.jpg)

系统示例如下：
* Autoencoder
* BERT
* DQN
* GAN
* Image classifier
* Seq2seq
* SimCLR
* VAE
* and a lot more

在幕后，LightningModule 仍然只是一个 torch.nn.Module，它将所有研究代码分组到一个文件中以使其独立：
* 训练循环
* 验证循环
* 测试循环
* 模型或模型系统
* 优化器

您可以通过覆盖可用回调钩子中的 20 多个钩子中的任何一个来自定义训练的任何部分（例如向后传递）

In [7]:
class LitAutoEncoder(pl.LightningModule):
    def backward(self, loss, optimizer, optimizer_idx):
        loss.backward()

time: 1.01 ms (started: 2021-08-28 14:33:51 +08:00)


## FORWARD vs TRAINING_STEP

在 Lightning 中，我们将训练与推理分开。 `training_step` 定义了完整的训练循环。 我们鼓励用户使用`forward`来定义推理操作。

例如，在这种情况下，我们可以定义自动编码器作为嵌入提取器：

In [None]:
def forward(self, x):
    embeddings = self.encoder(x)
    return embeddings

当然，没有什么能阻止您在 training_step 中使用 forward。

In [None]:
def training_step(self, batch, batch_idx):
    ...
    z = self(x)

这真的归结为您的应用程序。 但是，我们建议您将两个意图分开。
* 使用`forward`进行推理（预测）。
* 使用 `training_step` 进行训练。

# 第 2 步：使用 Lightning Trainer

首先，根据需要定义数据。 Lightning 只需要一个 DataLoader 用于训练/验证/测试拆分。

In [10]:
dataset = MNIST(os.getcwd(), download=True, transform=transforms.ToTensor())
train_loader = DataLoader(dataset)

time: 40.1 ms (started: 2021-08-28 14:39:53 +08:00)


接下来，初始化PL模块和 PyTorch Lightning训练器，然后使用数据和模型调用 fit。

In [None]:
# init model
autoencoder = LitAutoEncoder()

# most basic trainer, uses good defaults (auto-tensorboard, checkpoints, logs, and more)
# trainer = pl.Trainer(gpus=8) (if you have GPUs)
trainer = pl.Trainer(gpus=2)
trainer.fit(autoencoder, train_loader)

`Trainer`自动化：
* Epoch 和批量迭代
* 调用 optimizer.step()，backward()，zero_grad()
* 调用 .eval()，启用/禁用 grads
* 权重装载
* Tensorboard（见记录器选项）
* 多 GPU 支持
* TPU
* 16 位精度 AMP 支持

就是这样！

这些是您在 Lightning 中需要了解的主要 2 个概念。 PL的所有其他特性要么是 Trainer 的特性，要么是 LightningModule 的特性。

**基本功能**

---

# 手动与自动优化

## 自动优化

使用 Lightning，您无需担心何时启用/禁用 grads、执行反向传递或更新优化器，只要您从 training_step 返回带有附加图的损失，Lightning 将自动执行优化。

In [None]:
def training_step(self, batch, batch_idx):
    loss = self.encoder(batch)
    return loss

## 手动优化

但是，对于某些研究，例如 GAN、强化学习或具有多个优化器或内部循环的研究，您可以关闭自动优化并完全控制自己的训练循环。

关闭自动优化，您可以控制训练循环！

In [None]:
def __init__(self):
    self.automatic_optimization = False


def training_step(self, batch, batch_idx):
    # 使用 use_pl_optimizer=False 访问您的优化器。 默认为真
    opt_a, opt_b = self.optimizers(use_pl_optimizer=True)

    loss_a = self.generator(batch)
    opt_a.zero_grad()
    # 使用 `manual_backward()` 而不是 `loss.backward` 来自动化半精度等......
    self.manual_backward(loss_a)
    opt_a.step()

    loss_b = self.discriminator(batch)
    opt_b.zero_grad()
    self.manual_backward(loss_b)
    opt_b.step()

# 预测或部署

完成训练后，您有 3 个选项可以使用 LightningModule 进行预测。

## 选项 1：子模型

拉出系统内的任何模型进行预测。

In [None]:
# ----------------------------------
# 用作嵌入提取器
# ----------------------------------
autoencoder = LitAutoEncoder.load_from_checkpoint("path/to/checkpoint_file.ckpt")
encoder_model = autoencoder.encoder
encoder_model.eval()

# ----------------------------------
# 用作图像生成器
# ----------------------------------
decoder_model = autoencoder.decoder
decoder_model.eval()

## 选项 2：`forward`

您还可以添加`forward`方法来根据需要进行预测。

In [None]:
# ----------------------------------
# 使用 AE 提取嵌入
# ----------------------------------
class LitAutoEncoder(LightningModule):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential()

    def forward(self, x):
        embedding = self.encoder(x)
        return embedding


autoencoder = LitAutoEncoder()
autoencoder = autoencoder(torch.rand(1, 28 * 28))

In [None]:
# ----------------------------------
# 或使用AE生成图像
# ----------------------------------
class LitAutoEncoder(LightningModule):
    def __init__(self):
        super().__init__()
        self.decoder = nn.Sequential()

    def forward(self):
        z = torch.rand(1, 3)
        image = self.decoder(z)
        image = image.view(1, 1, 28, 28)
        return image


autoencoder = LitAutoEncoder()
image_sample = autoencoder()

## 选项 3：生产

对于生产系统，onnx 或 torchscript 要快得多。 确保您已添加转发方法或仅跟踪您需要的子模型。

In [None]:
# ----------------------------------
# torchscript
# ----------------------------------
autoencoder = LitAutoEncoder()
torch.jit.save(autoencoder.to_torchscript(), "model.pt")
os.path.isfile("model.pt")

In [None]:
# ----------------------------------
# onnx
# ----------------------------------
with tempfile.NamedTemporaryFile(suffix=".onnx", delete=False) as tmpfile:
    autoencoder = LitAutoEncoder()
    input_sample = torch.randn((1, 28 * 28))
    autoencoder.to_onnx(tmpfile.name, input_sample, export_params=True)
    os.path.isfile(tmpfile.name)

# 使用 CPU/GPU/TPU

在 Lightning 中使用 CPU、GPU 或 TPU 是微不足道的。 无需更改您的代码，只需更改 Trainer 选项即可。

In [None]:
# train on CPU
trainer = Trainer()

In [None]:
# train on 8 CPUs
trainer = Trainer(num_processes=8)

In [None]:
# train on 1024 CPUs across 128 machines
trainer = pl.Trainer(num_processes=8, num_nodes=128)

In [None]:
# train on 1 GPU
trainer = pl.Trainer(gpus=1)

In [None]:
# train on multiple GPUs across nodes (32 gpus here)
trainer = pl.Trainer(gpus=4, num_nodes=8)

In [None]:
# train on gpu 1, 3, 5 (3 gpus total)
trainer = pl.Trainer(gpus=[1, 3, 5])

In [None]:
# Multi GPU with mixed precision
trainer = pl.Trainer(gpus=2, precision=16)

In [None]:
# Train on TPUs
trainer = pl.Trainer(tpu_cores=8)

无需更改一行代码，您现在可以使用上述代码执行以下操作：

In [None]:
# 使用 16 位精度在 TPU 上训练
# 只使用一半的训练数据并在训练周期的每个epoch检查验证
trainer = pl.Trainer(tpu_cores=8, precision=16, limit_train_batches=0.5, val_check_interval=0.25)

# Checkpoints

Lightning 会自动保存您的模型。 训练完成后，您可以按如下方式加载检查点：

In [None]:
model = LitModel.load_from_checkpoint(path)

上面的检查点包含初始化模型和设置状态字典所需的所有参数。 如果你更喜欢手动完成，这里是等效的

In [None]:
# load the ckpt
ckpt = torch.load("path/to/checkpoint.ckpt")

# equivalent to the above
model = LitModel()
model.load_state_dict(ckpt["state_dict"])

# 数据流

每个循环（训练、验证、测试）都有三个可以实现的钩子：
* x_step
* x_step_end
* x_epoch_end

为了说明数据如何流动，我们将使用训练循环（即：x=training）

In [None]:
outs = []
for batch in data:
    out = training_step(batch)
    outs.append(out)
training_epoch_end(outs)

Lightning 中的等效项是：

In [None]:
def training_step(self, batch, batch_idx):
    prediction = ...
    return prediction


def training_epoch_end(self, training_step_outputs):
    for prediction in predictions:
        ...

如果您使用 DP 或 DDP2 分布式模式（即：跨 GPU 拆分批次），请使用 x_step_end 手动聚合（或不实施它以让 Lightning 自动聚合）。

In [None]:
for batch in data:
    model_copies = copy_model_per_gpu(model, num_gpus)
    batch_split = split_batch_per_gpu(batch, num_gpus)

    gpu_outs = []
    for model, batch_part in zip(model_copies, batch_split):
        # LightningModule hook
        gpu_out = model.training_step(batch_part)
        gpu_outs.append(gpu_out)

    # LightningModule hook
    out = training_step_end(gpu_outs)

Lightning 中的等效项是：

In [None]:
def training_step(self, batch, batch_idx):
    loss = ...
    return loss


def training_step_end(self, losses):
    gpu_0_loss = losses[0]
    gpu_1_loss = losses[1]
    return (gpu_0_loss + gpu_1_loss) * 1 / 2

# 日志记录

要登录 Tensorboard、您最喜欢的记录器和/或进度条，请使用 `log()` 方法，该方法可以从 `LightningModule`中的任何方法调用。

In [None]:
def training_step(self, batch, batch_idx):
    self.log("my_metric", x)

`log()` 方法有几个选项：
* `on_step`（记录训练中该步骤的指标）
* `on_epoch`（在`epoch`结束时自动累积和记录）
* `prog_bar`（记录到进度条）
* `logger`（像 `Tensorboard` 一样记录到记录器）

根据调用日志的位置，Lightning 会自动为您确定正确的模式。 但是当然您可以通过手动设置标志来覆盖默认行为

> 设置 on_epoch=True 将在整个训练时期累积您的记录值。

In [None]:
def training_step(self, batch, batch_idx):
    self.log("my_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)

> 进度条中显示的损失值在最后一个值上进行了平滑（平均），因此它不同于训练/验证步骤中返回的实际损失。

您还可以直接使用记录器的任何方法：

In [None]:
def training_step(self, batch, batch_idx):
    tensorboard = self.logger.experiment
    tensorboard.any_summary_writer_method_you_want()

训练开始后，您可以使用自己喜欢的记录器或启动 Tensorboard 日志来查看日志：

In [None]:
tensorboard --logdir ./lightning_logs

> Lightning 会在进度条中自动显示从 `training_step` 返回的损失值。 所以，不需要像 `self.log('loss', loss, prog_bar=True)` 那样显式地记录日志。

# 可选扩展

## 回调

回调是一个任意的自包含程序，可以在训练循环的任意部分执行。

这是一个添加一个不太花哨的学习率衰减规则的示例：

In [None]:
from pytorch_lightning.callbacks import Callback


class DecayLearningRate(Callback):
    def __init__(self):
        self.old_lrs = []

    def on_train_start(self, trainer, pl_module):
        # 跟踪初始学习率
        for opt_idx, optimizer in enumerate(trainer.optimizers):
            group = [param_group["lr"] for param_group in optimizer.param_groups]
            self.old_lrs.append(group)

    def on_train_epoch_end(self, trainer, pl_module):
        for opt_idx, optimizer in enumerate(trainer.optimizers):
            old_lr_group = self.old_lrs[opt_idx]
            new_lr_group = []
            for p_idx, param_group in enumerate(optimizer.param_groups):
                old_lr = old_lr_group[p_idx]
                new_lr = old_lr * 0.98
                new_lr_group.append(new_lr)
                param_group["lr"] = new_lr
            self.old_lrs[opt_idx] = new_lr_group


# And pass the callback to the Trainer
decay_callback = DecayLearningRate()
trainer = Trainer(callbacks=[decay_callback])

你可以用回调做的事情：
* 在训练的某个时候发送电子邮件
* 培养模型
* 更新学习率
* 可视化渐变
* …
* 你只受限于你的想象力

# LightningDataModules

DataLoaders 和数据处理代码往往会散落在各处。 通过将数据代码组织到 LightningDataModule 中，使其可重用。

In [None]:
class MNISTDataModule(LightningDataModule):
    def __init__(self, batch_size=32):
        super().__init__()
        self.batch_size = batch_size

    # 在进行分布式训练时，Datamodules 有两个可选参数用于对下载/准备/拆分数据进行粒度控制：

    # 可选，仅在 1 个 GPU/机器上调用
    def prepare_data(self):
        MNIST(os.getcwd(), train=True, download=True)
        MNIST(os.getcwd(), train=False, download=True)

    # 可选，为每个 GPU/机器调用（分配状态正常）
    def setup(self, stage: Optional[str] = None):
        # 变换
        transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
        # 分割数据集
        if stage in (None, "fit"):
            mnist_train = MNIST(os.getcwd(), train=True, transform=transform)
            self.mnist_train, self.mnist_val = random_split(mnist_train, [55000, 5000])
        if stage == (None, "test"):
            self.mnist_test = MNIST(os.getcwd(), train=False, transform=transform)

    # 返回每个拆分的数据加载器
    def train_dataloader(self):
        mnist_train = DataLoader(self.mnist_train, batch_size=self.batch_size)
        return mnist_train

    def val_dataloader(self):
        mnist_val = DataLoader(self.mnist_val, batch_size=self.batch_size)
        return mnist_val

    def test_dataloader(self):
        mnist_test = DataLoader(self.mnist_test, batch_size=self.batch_size)
        return mnist_test

LightningDataModule 旨在实现跨项目共享和重用数据拆分和转换。 它封装了处理数据所需的所有步骤：下载、标记化、处理等。

现在你可以简单地将你的 LightningDataModule 传递给训练器：

In [None]:
# init model
model = LitModel()

# init data
dm = MNISTDataModule()

# train
trainer = pl.Trainer()
trainer.fit(model, dm)

# test
trainer.test(datamodule=dm)

DataModules 对于基于数据构建模型特别有用。 阅读有关数据模块的更多信息。

# 调试

Lightning 有很多调试工具。 以下是其中几个的示例：

In [None]:
# 仅使用 10 个训练批次和 3 个 val 批次
trainer = Trainer(limit_train_batches=10, limit_val_batches=3)

In [None]:
# 自动过度拟合模型的理智批次进行健全性测试
trainer = Trainer(overfit_batches=1)

In [None]:
# 对所有代码进行单元测试 - 对代码的每一行执行一次以查看是否存在错误，而不是等待数小时在验证时崩溃
trainer = Trainer(fast_dev_run=True)

In [None]:
# 仅训练 20% 的 epoch
trainer = Trainer(limit_train_batches=0.2)

In [None]:
# 每 25% 的训练epoch运行一次验证
trainer = Trainer(val_check_interval=0.25)

In [None]:
# 分析您的代码以查找速度/内存瓶颈
Trainer(profiler="simple")

# 其他很酷的功能

定义和训练第一个 Lightning 模型后，您可能想尝试其他很酷的功能，例如
* 自动提前停止
* 自动截断反向传播时间
* 自动调整批量大小
* 自动找到一个好的学习率
* 直接从 S3 加载检查点
* 扩展到大规模计算集群
* 每个训练/验证/测试循环使用多个数据加载器
* 使用多个优化器进行强化学习甚至 GAN