In [43]:
import torch
from torch import nn 
import torch.nn.functional as F
from torch.utils.data import DataLoader,Dataset
from torch.utils.data.distributed import DistributedSampler


# 准备数据集

这里我们模拟一个线性的数据集。 (input_shape=20) -> (output_shape = 1)

In [44]:
import torch

from torch.utils.data import Dataset

class DDPTrainDataset(Dataset):
    
    def __init__(self, size: int):
        super().__init__()
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)] 
        
        
    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        
        return self.data[index]

# 定义数据集接口和准备数据加载

In [45]:
def get_train_dataset():
    
    train_dataset = DDPTrainDataset(size=2096)
    model = torch.nn.Linear(20, 1) 
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
    return train_dataset, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
    
    return DataLoader(
        dataset,
        batch_size = batch_size,
        pin_memory=True,
        shuffle = True
    )
    

In [46]:
train_dataset, model, optimizer = get_train_dataset()

d = prepare_dataloader(train_dataset, batch_size = 3)

print(len(d))
print(next(iter(d)))
print(len(next(iter(d))[0]))

for data in d:
    print(data)
    print(data[0].shape)
    break 

699
[tensor([[0.4656, 0.2774, 0.6411, 0.4094, 0.2635, 0.0147, 0.5033, 0.6247, 0.3505,
         0.4241, 0.1776, 0.9870, 0.8843, 0.2066, 0.5938, 0.5882, 0.4147, 0.4700,
         0.2333, 0.1819],
        [0.1698, 0.2924, 0.0185, 0.8407, 0.0124, 0.9079, 0.9204, 0.1655, 0.4217,
         0.7464, 0.0678, 0.2419, 0.0433, 0.2892, 0.0292, 0.1639, 0.6973, 0.3199,
         0.4906, 0.8398],
        [0.5716, 0.3419, 0.5777, 0.0492, 0.5661, 0.3120, 0.7183, 0.2138, 0.2442,
         0.6456, 0.5655, 0.1854, 0.9365, 0.3166, 0.9549, 0.5539, 0.3119, 0.2418,
         0.2596, 0.9206]]), tensor([[0.4416],
        [0.9753],
        [0.1021]])]
3
[tensor([[0.3404, 0.9200, 0.2126, 0.5422, 0.8707, 0.9628, 0.0770, 0.2595, 0.2012,
         0.6956, 0.4363, 0.3337, 0.1730, 0.2882, 0.7458, 0.2677, 0.2670, 0.3767,
         0.3765, 0.6355],
        [0.5410, 0.7625, 0.4727, 0.0571, 0.9136, 0.3922, 0.6478, 0.9339, 0.9876,
         0.5733, 0.1305, 0.4316, 0.7166, 0.4530, 0.3302, 0.9528, 0.6323, 0.2309,
         0.1198, 0.0

# 定义训练过程

In [47]:


class Trainer:
    
    def __init__(self, gpu_id, model,  train_data, optimizer, save_every: int):
        super().__init__()
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id) 
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
    
    def _run_batch(self, source, target):
        
        output = self.model(source)
        loss = F.cross_entropy(output, target)
        
        loss.backward()
        self.optimizer.step()
        self.optimizer.zero_grad()
        return loss
        
    
    def _run_epoch(self, epoch):
        
        bs = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {bs} | Steps: {len(self.train_data)}")

        input_size = 0
        
        for data in self.train_data:
            source, target = data
            source = source.to(self.gpu_id)
            target = target.to(self.gpu_id)
            loss = self._run_batch(source, target)
            
            input_size += source.shape[0]
        print(f"Epoch {epoch} completed, Handle_Input_Size = {input_size}, loss = {loss}")
    
    def _save_model(self, epoch):
        path = f"model_epoch_{epoch}.pth"
        torch.save(self.model.state_dict(), path)
    
    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            
            if epoch % self.save_every == 0:
                self._save_model(epoch)

# 定义入口

In [48]:
def main(device, total_epochs, save_every, batch_szie):
    train_dataset, model, optimizer = get_train_dataset()
    model = model
    train_data = prepare_dataloader(train_dataset, batch_size = batch_szie)
    trainer = Trainer(device, model, train_data, optimizer, save_every)
    trainer.train(total_epochs)

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--total_epochs', type=int, help='total epochs to train')
    parser.add_argument('--save_every', type=int, help='save model every n epochs')
    parser.add_argument('--batch_size', type=int, help='batch size on each device', default=32)
    
    
    args = parser.parse_args(['--total_epochs', '32', '--save_every', '10', '--batch_size', '32'])
    
    device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    
    main(device, args.total_epochs, args.save_every, args.batch_size)

[GPUcuda:0] Epoch 0 | Batchsize: 32 | Steps: 66
Epoch 0 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 1 | Batchsize: 32 | Steps: 66
Epoch 1 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 2 | Batchsize: 32 | Steps: 66
Epoch 2 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 3 | Batchsize: 32 | Steps: 66
Epoch 3 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 4 | Batchsize: 32 | Steps: 66
Epoch 4 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 5 | Batchsize: 32 | Steps: 66
Epoch 5 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 6 | Batchsize: 32 | Steps: 66
Epoch 6 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 7 | Batchsize: 32 | Steps: 66
Epoch 7 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 8 | Batchsize: 32 | Steps: 66
Epoch 8 completed, Handle_Input_Size = 2096, loss = -0.0
[GPUcuda:0] Epoch 9 | Batchsize: 32 | Steps: 66
Epoch 9