<a href="https://colab.research.google.com/github/slxslxslx/BiShe/blob/main/DDP_BERT_sentiment1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install datasets transformers

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m28.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl 

In [None]:
# 尝试-这个分布式可以顺利运行
# ds = load_dataset("contemmcm/sentiment140")
# 百万级别1600000，二分类
# 训练集数据量: DatasetDict({
#     complete: Dataset({
#         features: ['text', 'label'],
#         num_rows: 1600000
#     })
# })
# RTX 2080 Ti * 2
# Single GPU training time: 212.33 seconds
#    accuracy                           0.88
# 多GPU,rank=1 end_timeMulti GPU training time: 149.43 seconds
#     accuracy                           0.88
import os

# os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
import os
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"

import time
from datetime import datetime, timedelta, timezone
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split, DistributedSampler
from transformers import BertTokenizer, BertForSequenceClassification
from datasets import load_dataset
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

from sklearn.metrics import classification_report
import numpy as np
import pickle

# 获取北京时区对象
beijing_tz = timezone(timedelta(hours=8))


# 获取当前的UTC时间并转换为北京时间
def get_beijing_time_now():
    return datetime.now(beijing_tz).strftime("%Y-%m-%d %H:%M:%S")


# 加载IMDB数据集和BERT分词器，并进行一些初始化操作
def load_and_preprocess_data():
    print('load_and_preprocess_data开始执行')
    dataset = load_dataset("contemmcm/sentiment140", cache_dir="./dataset/sentiment140")
    print("dataset['complete'][:5]:", dataset['complete'][:5])
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

    # 文本编码函数
    def encode_batch(batch):
      encoded = tokenizer(batch['text'], padding=True, truncation=True, max_length=128)
      print(f"Encoded input_ids lengths: {[len(ids) for ids in encoded['input_ids']]}")
      return encoded

    # 编码数据集
    print('开始编码数据集',get_beijing_time_now())
    encoded_dataset = dataset.map(encode_batch, batched=True)
    encoded_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    print("encoded_dataset['complete'][:5]:", encoded_dataset['complete'][:5])
    print('结束编码数据集',get_beijing_time_now())
    # 转换为普通字典
    encoded_dataset_dict = {
        split: {
            "input_ids": encoded_dataset[split]["input_ids"],
            "attention_mask": encoded_dataset[split]["attention_mask"],
            "label": encoded_dataset[split]["label"],
        }
        for split in encoded_dataset
    }

    # 保存encoded_dataset
    with open('encoded_dataset-sentiment140.pkl','wb') as f:
        pickle.dump(encoded_dataset_dict, f)
        print('保存encoded_dataset-sentiment140.pkl')

    # with open('encoded_dataset-sentiment140.pkl', 'rb') as f:
    #     encoded_dataset = pickle.load(f)

    # 划分训练集和测试集
    # 假设encoded_dataset['complete']是你的数据集，这里获取其长度
    total_length = len(encoded_dataset['complete'])
    # 计算训练集的大小（这里按80%比例计算，可根据实际需求修改比例）
    train_size = int(total_length * 0.8)
    # 计算测试集的大小
    test_size = total_length - train_size
    print('total_length', total_length)
    print('train_size', train_size)
    print('test_size', test_size)
    # train_dataset, test_dataset = random_split(encoded_dataset['complete'], [20000, 5000])
    train_dataset, test_dataset = random_split(encoded_dataset['complete'], [train_size, test_size])

    # 使用数据集中自带的train部分作为训练集
    # train_dataset = encoded_dataset['train']
    # test_dataset = encoded_dataset['test']

    test_loader = DataLoader(test_dataset, batch_size=16)
    print('load_and_preprocess_data执行完毕')
    return train_dataset, test_dataset, test_loader


# 单GPU训练
def train_single_gpu(model, train_dataset, epochs=1):
    print('单GPU训练开始,epochs=', epochs)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    optimizer = optim.Adam(model.parameters(), lr=1e-5)  # 优化器用于根据计算得到的损失值来更新模型的参数
    # 以使得模型能够朝着损失值不断减小的方向进行优化，进而提高模型对数据的拟合能力和预测准确性。
    # 自适应矩估计（Adaptive Moment Estimation）优化算法
    # model.parameters() 获取模型中所有需要被优化的参数

    start_time = time.time()
    print('单GPU start_time', get_beijing_time_now())
    model.train()
    for epoch in range(epochs):
        print('epoch', epoch)
        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)
            loss.backward()
            optimizer.step()

    end_time = time.time()
    print('单GPU end_time', get_beijing_time_now())
    print(f"Single GPU training time: {end_time - start_time:.2f} seconds")

    # 评估模型
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():  # 不进行梯度计算
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs.logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print('单GPU评估:\n', classification_report(all_labels, all_preds))


# 2GPU训练
def train_multi_gpu(rank, world_size, model, train_dataset, test_loader, epochs=1):
    # print('test_loader', test_loader)
    print(f"多GPU分布式训练进程 {rank} 初始化")

    # 设置分布式环境变量
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size, timeout=timedelta(seconds=180))

    # 设定设备
    device = torch.device(f"cuda:{rank}")
    model = model.to(device)
    model = DDP(model, device_ids=[rank])
    # device_ids=[rank] 这个参数就是明确告诉 DDP 模块，当前这个模型副本应该放置在哪个具体的 GPU 设备上进行后续的训练操作。保证每个进程所负责的模型副本和相应的 GPU 紧密关联起来

    # 使用 DistributedSampler 确保每个进程处理不同的数据
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=False, sampler=train_sampler)
    optimizer = optim.Adam(model.parameters(), lr=1e-5)
    criterion = nn.CrossEntropyLoss()

    start_time = time.time()
    print(f'多GPU,rank={rank} start_time', get_beijing_time_now())
    if rank == 0:
        print(f"多GPU训练开始时间: {get_beijing_time_now()}")
    model.train()

    for epoch in range(epochs):
        if rank == 0:
            print(f"Epoch {epoch} 开始")
        train_sampler.set_epoch(epoch)
        for batch in train_loader:
            # print(rank, 'batch')
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)
            loss.backward()
            optimizer.step()

    end_time = time.time()
    print(f'多GPU,rank={rank} end_time', get_beijing_time_now())
    if rank == 0:
        print(f"Multi GPU training time: {end_time - start_time:.2f} seconds")

    # 评估模型
    if rank == 0:
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch in test_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                outputs = model(input_ids, attention_mask=attention_mask)
                preds = torch.argmax(outputs.logits, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        print('多GPU评估:\n', classification_report(all_labels, all_preds))

    dist.destroy_process_group()


# 设置进程启动，通过mp.spawn启动多进程分布式训练
def spawn_train(world_size, model, train_dataset, test_loader):
    mp.spawn(train_multi_gpu, args=(world_size, model, train_dataset, test_loader), nprocs=world_size,
             join=True)  # join=True时mp.spawn 函数会阻塞主进程，直到所有启动的子进程都执行完毕。


# 主函数入口
if __name__ == '__main__':
    train_dataset, test_dataset, test_loader = load_and_preprocess_data()
    model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
    criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数

    # 单GPU训练
    train_single_gpu(model, train_dataset)

    # 多GPU分布式训练!
    # spawn_train(world_size=2, model=model, train_dataset=train_dataset, test_loader=test_loader)

    print('main')


load_and_preprocess_data开始执行


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Generating complete split: 0 examples [00:00, ? examples/s]

dataset['complete'][:5]: {'text': ["Just back home from a little gathering with some old friends.. It was really fun, they're still the same. ", 'Hey @ricebunny i need a web cam!!!   (RiceBunny live > http://ustre.am/ZbT)', 'only a couple more days before i have to go to faggot summer school ', 'Must admit Zac handled it well!!! ', "It's warm and sticky but there's no sun  *Sneeze*"], 'label': [1, 0, 0, 0, 0]}
开始编码数据集 2024-11-21 15:06:28


Map:   0%|          | 0/1600000 [00:00<?, ? examples/s]

encoded_dataset['complete'][:5]: {'label': tensor([1, 0, 0, 0, 0]), 'input_ids': tensor([[  101,  2074,  2067,  2188,  2013,  1037,  2210,  7215,  2007,  2070,
          2214,  2814,  1012,  1012,  2009,  2001,  2428,  4569,  1010,  2027,
          1005,  2128,  2145,  1996,  2168,  1012,   102,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0],
        [  101,  4931,  1030,  5785,  8569, 10695,  2100,  1045,  2342,  1037,
          4773, 11503,   999,   999,   999,  1006,  5785,  8569, 10695,  2100,
          2444,  1028,  8299,  1024,  1013,  1013,  2149,  7913,  1012,  2572,
          1013,  1062, 19279,  1007,   102,     0,     0,     0,     0,     0,
            

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


单GPU训练开始,epochs= 1
单GPU start_time 2024-11-21 15:27:35
epoch 0


RuntimeError: stack expects each tensor to be equal size, but got [51] at entry 0 and [86] at entry 1

In [None]:
from google.colab import files

# 要下载的文件名
file_name = '/content/encoded_dataset-sentiment140.pkl'

# 下载文件
files.download(file_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
import os

# 要查看大小的文件名
file_name = '/content/encoded_dataset-sentiment140.pkl'

# 获取文件大小（单位是字节）
file_size = os.path.getsize(file_name)
print(f"文件 {file_name} 的大小为 {file_size} 字节。")

文件 /content/encoded_dataset-sentiment140.pkl 的大小为 2755836784 字节。


In [None]:
import torch
import pickle
from collections import Counter

# 加载已经保存的encoded_dataset
with open('encoded_dataset-sentiment140.pkl', 'rb') as f:
    encoded_dataset = pickle.load(f)

# 定义分割区间
length_bins = [64, 128, 256, 512, 1024]
length_bins_labels = [f"<= {length}" for length in length_bins] + [f"> {length_bins[-1]}"]

# 初始化统计计数器
length_counts = Counter()

# 遍历数据集计算长度分布
for sample in encoded_dataset['complete']:
    input_ids_length = len(sample['input_ids'])
    for i, max_length in enumerate(length_bins):
        if input_ids_length <= max_length:
            length_counts[length_bins_labels[i]] += 1
            break
    else:
        # 如果大于最大长度，归类到 "> 1024"
        length_counts[length_bins_labels[-1]] += 1

# 打印统计结果
print("文本长度分布情况：")
total_samples = len(encoded_dataset['complete'])
for label in length_bins_labels:
    count = length_counts[label]
    percentage = (count / total_samples) * 100
    print(f"{label}: {count} ({percentage:.2f}%)")


TypeError: string indices must be integers

沾满了RAM,所以运行失败了

In [None]:
import torch
import pickle
from collections import Counter

# 加载已经保存的encoded_dataset
with open('encoded_dataset-sentiment140.pkl', 'rb') as f:
    encoded_dataset = pickle.load(f)

# 定义分割区间
length_bins = [64, 128, 256, 512, 1024]
length_bins_labels = [f"<= {length}" for length in length_bins] + [f"> {length_bins[-1]}"]

# 初始化统计计数器
length_counts = Counter()

# 遍历数据集计算长度分布
for sample in encoded_dataset['complete']:
    input_ids = sample['input_ids']  # 获取 input_ids
    if isinstance(input_ids, torch.Tensor):  # 检查是否为张量
        input_ids_length = len(input_ids)  # 直接获取张量的长度
    elif isinstance(input_ids, list):  # 如果是列表，也获取长度
        input_ids_length = len(input_ids)
    else:
        continue  # 如果 input_ids 类型不符合，跳过
    print('input_ids_length', input_ids_length)

    # 将长度归入对应的区间
    print('将长度归入对应的区间')
    for i, max_length in enumerate(length_bins):
        if input_ids_length <= max_length:
            length_counts[length_bins_labels[i]] += 1
            break
    else:
        # 如果大于最大长度，归类到 "> 1024"
        length_counts[length_bins_labels[-1]] += 1

# 打印统计结果
print("文本长度分布情况：")
total_samples = sum(length_counts.values())  # 统计总样本数
for label in length_bins_labels:
    count = length_counts[label]
    percentage = (count / total_samples) * 100
    print(f"{label}: {count} ({percentage:.2f}%)")


在试一下，优化了

In [None]:
import torch
import pickle
from collections import Counter

batch_size = 100000  # 每次处理10万条数据
length_bins = [64, 128, 256, 512, 1024]
length_bins_labels = [f"<= {length}" for length in length_bins] + [f"> {length_bins[-1]}"]
length_counts = Counter()

def process_batch(batch):
    batch_counts = Counter()
    for sample in batch:
        input_ids = sample['input_ids']
        input_ids_length = len(input_ids) if isinstance(input_ids, (torch.Tensor, list)) else 0
        for i, max_length in enumerate(length_bins):
            if input_ids_length <= max_length:
                batch_counts[length_bins_labels[i]] += 1
                break
        else:
            batch_counts[length_bins_labels[-1]] += 1
    return batch_counts

# 分批处理
with open('encoded_dataset-sentiment140.pkl', 'rb') as f:
    while True:
        try:
            batch = pickle.load(f, batch_size=batch_size)  # 假设实现了分块加载
            batch_counts = process_batch(batch)
            length_counts.update(batch_counts)
        except EOFError:
            break

# 统计结果
print("文本长度分布情况：")
total_samples = sum(length_counts.values())
for label in length_bins_labels:
    count = length_counts[label]
    percentage = (count / total_samples) * 100
    print(f"{label}: {count} ({percentage:.2f}%)")


TypeError: 'batch_size' is an invalid keyword argument for load()

编码可以正常运行，可以进行epoch1

In [None]:
# 尝试-这个分布式可以顺利运行
# ds = load_dataset("contemmcm/sentiment140")
# 百万级别1600000，二分类
# 训练集数据量: DatasetDict({
#     complete: Dataset({
#         features: ['text', 'label'],
#         num_rows: 1600000
#     })
# })

import os

os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
# import os
#
# os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"

import time
from datetime import datetime, timedelta, timezone
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split, DistributedSampler
from transformers import BertTokenizer, BertForSequenceClassification
from datasets import load_dataset
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

from sklearn.metrics import classification_report
import numpy as np
import pickle

# 获取北京时区对象
beijing_tz = timezone(timedelta(hours=8))


# 获取当前的UTC时间并转换为北京时间
def get_beijing_time_now():
    return datetime.now(beijing_tz).strftime("%Y-%m-%d %H:%M:%S")


# 加载IMDB数据集和BERT分词器，并进行一些初始化操作
def load_and_preprocess_data():
    print('load_and_preprocess_data开始执行')
    dataset = load_dataset("contemmcm/sentiment140", cache_dir="./dataset/sentiment140")
    print("dataset['complete'][:5]:", dataset['complete'][:5])
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

    # 文本编码函数
    def encode_batch(batch):
        # encoded = tokenizer(batch['text'], padding=True, truncation=True, max_length=128)
        encoded = tokenizer(
            batch['text'],
            padding='max_length',  # 使用 max_length 填充
            truncation=True,
            max_length=128,
            return_tensors='pt'  # 返回 PyTorch 张量
        )
        print(f"Encoded input_ids lengths: {[len(ids) for ids in encoded['input_ids']]}")
        return encoded

    # 编码数据集
    print('开始编码数据集', get_beijing_time_now())
    encoded_dataset = dataset.map(encode_batch, batched=True)
    encoded_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    print("encoded_dataset['complete'][:5]:", encoded_dataset['complete'][:5])
    # 12min
    print('结束编码数据集', get_beijing_time_now())

    # 这个还是不要了，改了后面都要改
    # 转换为普通字典
    # encoded_dataset_dict = {
    #     split: {
    #         "input_ids": encoded_dataset[split]["input_ids"],
    #         "attention_mask": encoded_dataset[split]["attention_mask"],
    #         "label": encoded_dataset[split]["label"],
    #     }
    #     for split in encoded_dataset
    # }

    # 保存encoded_dataset
    with open('encoded_dataset-sentiment140-original.pkl', 'wb') as f:
        pickle.dump(encoded_dataset, f)
        print('保存encoded_dataset-sentiment140-original.pkl')

    # with open('encoded_dataset-sentiment140.pkl', 'rb') as f:
    #     encoded_dataset = pickle.load(f)

    # print(encoded_dataset)

    # 划分训练集和测试集
    # 假设encoded_dataset['complete']是你的数据集，这里获取其长度
    total_length = len(encoded_dataset['complete'])
    # 计算训练集的大小（这里按80%比例计算，可根据实际需求修改比例）
    train_size = int(total_length * 0.8)
    # 计算测试集的大小
    test_size = total_length - train_size
    print('total_length', total_length)
    print('train_size', train_size)
    print('test_size', test_size)
    # train_dataset, test_dataset = random_split(encoded_dataset['complete'], [20000, 5000])
    train_dataset, test_dataset = random_split(encoded_dataset['complete'], [train_size, test_size])

    # 使用数据集中自带的train部分作为训练集
    # train_dataset = encoded_dataset['train']
    # test_dataset = encoded_dataset['test']

    test_loader = DataLoader(test_dataset, batch_size=64)
    print('load_and_preprocess_data执行完毕')
    return train_dataset, test_dataset, test_loader


# 单GPU训练
def train_single_gpu(model, train_dataset, epochs=1):
    print('单GPU训练开始,epochs=', epochs)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)  # 优化器用于根据计算得到的损失值来更新模型的参数
    # 以使得模型能够朝着损失值不断减小的方向进行优化，进而提高模型对数据的拟合能力和预测准确性。
    # 自适应矩估计（Adaptive Moment Estimation）优化算法
    # model.parameters() 获取模型中所有需要被优化的参数

    start_time = time.time()
    print('单GPU start_time', get_beijing_time_now())
    model.train()
    for epoch in range(epochs):
        print('epoch', epoch)
        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)
            loss.backward()
            optimizer.step()

    end_time = time.time()
    print('单GPU end_time', get_beijing_time_now())
    print(f"Single GPU training time: {end_time - start_time:.2f} seconds")

    # 评估模型
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():  # 不进行梯度计算
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs.logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print('单GPU评估:\n', classification_report(all_labels, all_preds))


# 2GPU训练
def train_multi_gpu(rank, world_size, model, train_dataset, test_loader, epochs=1):
    # print('test_loader', test_loader)
    print(f"多GPU分布式训练进程 {rank} 初始化")

    # 设置分布式环境变量
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size, timeout=timedelta(seconds=180))

    # 设定设备
    device = torch.device(f"cuda:{rank}")
    model = model.to(device)
    model = DDP(model, device_ids=[rank])
    # device_ids=[rank] 这个参数就是明确告诉 DDP 模块，当前这个模型副本应该放置在哪个具体的 GPU 设备上进行后续的训练操作。保证每个进程所负责的模型副本和相应的 GPU 紧密关联起来

    # 使用 DistributedSampler 确保每个进程处理不同的数据
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False, sampler=train_sampler)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    start_time = time.time()
    print(f'多GPU,rank={rank} start_time', get_beijing_time_now())
    if rank == 0:
        print(f"多GPU训练开始时间: {get_beijing_time_now()}")
    model.train()

    for epoch in range(epochs):
        if rank == 0:
            print(f"Epoch {epoch} 开始")
        train_sampler.set_epoch(epoch)
        for batch in train_loader:
            # print(rank, 'batch')
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)
            loss.backward()
            optimizer.step()

    end_time = time.time()
    print(f'多GPU,rank={rank} end_time', get_beijing_time_now())
    if rank == 0:
        print(f"Multi GPU training time: {end_time - start_time:.2f} seconds")

    # 评估模型
    if rank == 0:
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch in test_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                outputs = model(input_ids, attention_mask=attention_mask)
                preds = torch.argmax(outputs.logits, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        print('多GPU评估:\n', classification_report(all_labels, all_preds))

    dist.destroy_process_group()


# 设置进程启动，通过mp.spawn启动多进程分布式训练
def spawn_train(world_size, model, train_dataset, test_loader):
    mp.spawn(train_multi_gpu, args=(world_size, model, train_dataset, test_loader), nprocs=world_size,
             join=True)  # join=True时mp.spawn 函数会阻塞主进程，直到所有启动的子进程都执行完毕。


# 主函数入口
if __name__ == '__main__':
    train_dataset, test_dataset, test_loader = load_and_preprocess_data()
    model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
    criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数

    # 单GPU训练
    train_single_gpu(model, train_dataset)

    # 多GPU分布式训练!
    # spawn_train(world_size=2, model=model, train_dataset=train_dataset, test_loader=test_loader)

    print('main')


load_and_preprocess_data开始执行


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/361 [00:00<?, ?B/s]

polarity.csv:   0%|          | 0.00/124M [00:00<?, ?B/s]

Generating complete split: 0 examples [00:00, ? examples/s]

dataset['complete'][:5]: {'text': ["Just back home from a little gathering with some old friends.. It was really fun, they're still the same. ", 'Hey @ricebunny i need a web cam!!!   (RiceBunny live > http://ustre.am/ZbT)', 'only a couple more days before i have to go to faggot summer school ', 'Must admit Zac handled it well!!! ', "It's warm and sticky but there's no sun  *Sneeze*"], 'label': [1, 0, 0, 0, 0]}


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/334 [00:00<?, ?B/s]

开始编码数据集 2024-11-21 17:37:18


Map:   0%|          | 0/1600000 [00:00<?, ? examples/s]

Encoded input_ids lengths: [128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 12

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


单GPU训练开始,epochs= 1
单GPU start_time 2024-11-21 17:53:46
epoch 0


KeyboardInterrupt: 

In [None]:
pip install tqdm



In [None]:
# 尝试-这个分布式可以顺利运行
# ds = load_dataset("contemmcm/sentiment140")
# 百万级别1600000，二分类
# 训练集数据量: DatasetDict({
#     complete: Dataset({
#         features: ['text', 'label'],
#         num_rows: 1600000
#     })
# })
# RTX 2080 Ti * 2
# Single GPU training time: 212.33 seconds
#    accuracy                           0.88
# 多GPU,rank=1 end_timeMulti GPU training time: 149.43 seconds
#     accuracy                           0.88
import os

os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
# import os
#
# os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"

import time
from datetime import datetime, timedelta, timezone
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split, DistributedSampler
from transformers import BertTokenizer, BertForSequenceClassification
from datasets import load_dataset
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

from sklearn.metrics import classification_report
import numpy as np
import pickle
from tqdm import tqdm  # 引入 tqdm 用于显示进度条

# 获取北京时区对象
beijing_tz = timezone(timedelta(hours=8))


# 获取当前的UTC时间并转换为北京时间
def get_beijing_time_now():
    return datetime.now(beijing_tz).strftime("%Y-%m-%d %H:%M:%S")


# 加载IMDB数据集和BERT分词器，并进行一些初始化操作
def load_and_preprocess_data():
    print('load_and_preprocess_data开始执行')
    dataset = load_dataset("contemmcm/sentiment140", cache_dir="./dataset/sentiment140")
    print("dataset['complete'][:5]:", dataset['complete'][:5])
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

    # 文本编码函数
    def encode_batch(batch):
        # encoded = tokenizer(batch['text'], padding=True, truncation=True, max_length=128)
        encoded = tokenizer(
            batch['text'],
            padding='max_length',  # 使用 max_length 填充
            truncation=True,
            max_length=128,
            return_tensors='pt'  # 返回 PyTorch 张量
        )
        print(f"Encoded input_ids lengths: {[len(ids) for ids in encoded['input_ids']]}")
        return encoded

    # 编码数据集
    print('开始编码数据集', get_beijing_time_now())
    encoded_dataset = dataset.map(encode_batch, batched=True)
    encoded_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    # print("encoded_dataset['complete'][:5]:", encoded_dataset['complete'][:5])
    # 12min
    print('结束编码数据集', get_beijing_time_now())

    # 这个还是不要了，改了后面都要改
    # 转换为普通字典
    # encoded_dataset_dict = {
    #     split: {
    #         "input_ids": encoded_dataset[split]["input_ids"],
    #         "attention_mask": encoded_dataset[split]["attention_mask"],
    #         "label": encoded_dataset[split]["label"],
    #     }
    #     for split in encoded_dataset
    # }

    # 保存encoded_dataset
    with open('encoded_dataset-sentiment140-original.pkl', 'wb') as f:
        pickle.dump(encoded_dataset, f)
        print('保存encoded_dataset-sentiment140-original.pkl')

    # with open('encoded_dataset-sentiment140.pkl', 'rb') as f:
    #     encoded_dataset = pickle.load(f)

    # print(encoded_dataset)

    # 划分训练集和测试集
    # 假设encoded_dataset['complete']是你的数据集，这里获取其长度
    total_length = len(encoded_dataset['complete'])
    # 计算训练集的大小（这里按80%比例计算，可根据实际需求修改比例）
    train_size = int(total_length * 0.8)
    # 计算测试集的大小
    test_size = total_length - train_size
    print('total_length', total_length)
    print('train_size', train_size)
    print('test_size', test_size)
    # train_dataset, test_dataset = random_split(encoded_dataset['complete'], [20000, 5000])
    train_dataset, test_dataset = random_split(encoded_dataset['complete'], [train_size, test_size])

    # 使用数据集中自带的train部分作为训练集
    # train_dataset = encoded_dataset['train']
    # test_dataset = encoded_dataset['test']

    test_loader = DataLoader(test_dataset, batch_size=64)
    print('load_and_preprocess_data执行完毕')
    return train_dataset, test_dataset, test_loader


# 单GPU训练
def train_single_gpu(model, train_dataset, epochs=1):
    print('单GPU训练开始,epochs=', epochs)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)  # 优化器用于根据计算得到的损失值来更新模型的参数
    # 以使得模型能够朝着损失值不断减小的方向进行优化，进而提高模型对数据的拟合能力和预测准确性。
    # 自适应矩估计（Adaptive Moment Estimation）优化算法
    # model.parameters() 获取模型中所有需要被优化的参数

    start_time = time.time()
    print('单GPU start_time', get_beijing_time_now())
    model.train()
    # for epoch in range(epochs):
    #     print('epoch', epoch)
    #     for batch in train_loader:
    #         input_ids = batch['input_ids'].to(device)
    #         attention_mask = batch['attention_mask'].to(device)
    #         labels = batch['label'].to(device)

    #         optimizer.zero_grad()
    #         outputs = model(input_ids, attention_mask=attention_mask)
    #         loss = criterion(outputs.logits, labels)
    #         loss.backward()
    #         optimizer.step()


    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs} 开始...")
        # 使用 tqdm 显示进度条
        with tqdm(total=len(train_loader), desc=f"训练进度 (Epoch {epoch + 1}/{epochs})") as pbar:
            for batch_idx, batch in enumerate(train_loader):
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)

                optimizer.zero_grad()
                outputs = model(input_ids, attention_mask=attention_mask)
                loss = criterion(outputs.logits, labels)
                loss.backward()
                optimizer.step()

                # 更新 tqdm 进度条
                pbar.update(1)
                pbar.set_postfix({"Loss": f"{loss.item():.4f}"})  # 动态显示当前损失值

    end_time = time.time()
    print('单GPU end_time', get_beijing_time_now())
    print(f"Single GPU training time: {end_time - start_time:.2f} seconds")

    # 评估模型
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():  # 不进行梯度计算
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs.logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print('单GPU评估:\n', classification_report(all_labels, all_preds))


# 2GPU训练
def train_multi_gpu(rank, world_size, model, train_dataset, test_loader, epochs=1):
    # print('test_loader', test_loader)
    print(f"多GPU分布式训练进程 {rank} 初始化")

    # 设置分布式环境变量
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size, timeout=timedelta(seconds=180))

    # 设定设备
    device = torch.device(f"cuda:{rank}")
    model = model.to(device)
    model = DDP(model, device_ids=[rank])
    # device_ids=[rank] 这个参数就是明确告诉 DDP 模块，当前这个模型副本应该放置在哪个具体的 GPU 设备上进行后续的训练操作。保证每个进程所负责的模型副本和相应的 GPU 紧密关联起来

    # 使用 DistributedSampler 确保每个进程处理不同的数据
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False, sampler=train_sampler)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    start_time = time.time()
    print(f'多GPU,rank={rank} start_time', get_beijing_time_now())
    if rank == 0:
        print(f"多GPU训练开始时间: {get_beijing_time_now()}")
    model.train()

    for epoch in range(epochs):
        if rank == 0:
            print(f"Epoch {epoch} 开始")
        train_sampler.set_epoch(epoch)
        for batch in train_loader:
            # print(rank, 'batch')
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)
            loss.backward()
            optimizer.step()

    end_time = time.time()
    print(f'多GPU,rank={rank} end_time', get_beijing_time_now())
    if rank == 0:
        print(f"Multi GPU training time: {end_time - start_time:.2f} seconds")

    # 评估模型
    if rank == 0:
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch in test_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                outputs = model(input_ids, attention_mask=attention_mask)
                preds = torch.argmax(outputs.logits, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        print('多GPU评估:\n', classification_report(all_labels, all_preds))

    dist.destroy_process_group()


# 设置进程启动，通过mp.spawn启动多进程分布式训练
def spawn_train(world_size, model, train_dataset, test_loader):
    mp.spawn(train_multi_gpu, args=(world_size, model, train_dataset, test_loader), nprocs=world_size,
             join=True)  # join=True时mp.spawn 函数会阻塞主进程，直到所有启动的子进程都执行完毕。


# 主函数入口
if __name__ == '__main__':
    train_dataset, test_dataset, test_loader = load_and_preprocess_data()
    model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
    criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数

    # 单GPU训练
    train_single_gpu(model, train_dataset)

    # 多GPU分布式训练!
    # spawn_train(world_size=2, model=model, train_dataset=train_dataset, test_loader=test_loader)

    print('main')


load_and_preprocess_data开始执行
dataset['complete'][:5]: {'text': ["Just back home from a little gathering with some old friends.. It was really fun, they're still the same. ", 'Hey @ricebunny i need a web cam!!!   (RiceBunny live > http://ustre.am/ZbT)', 'only a couple more days before i have to go to faggot summer school ', 'Must admit Zac handled it well!!! ', "It's warm and sticky but there's no sun  *Sneeze*"], 'label': [1, 0, 0, 0, 0]}
开始编码数据集 2024-11-21 18:50:08
encoded_dataset['complete'][:5]: {'label': tensor([1, 0, 0, 0, 0]), 'input_ids': tensor([[  101,  2074,  2067,  2188,  2013,  1037,  2210,  7215,  2007,  2070,
          2214,  2814,  1012,  1012,  2009,  2001,  2428,  4569,  1010,  2027,
          1005,  2128,  2145,  1996,  2168,  1012,   102,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,   

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


单GPU训练开始,epochs= 1
单GPU start_time 2024-11-21 18:50:09
Epoch 1/1 开始...


训练进度 (Epoch 1/1):  19%|█▉        | 3827/20000 [1:16:09<5:23:31,  1.20s/it, Loss=0.4321]