In [1]:
import json
import torch
import random

from torch import nn
from torch.utils.data import Dataset, DataLoader

from transformers import AutoConfig
from transformers import BertPreTrainedModel, BertModel
from transformers import BertTokenizer
from transformers import Trainer, TrainingArguments

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cuda')

In [2]:
checkpoint = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(checkpoint)

In [3]:
def process_data(file_path):
    with open(file_path, "r") as file:
        datas = json.load(file)

    dataset = []
    for data in datas:
        user_info = []
        for _, value in data["user_info"].items():
            if value:
                user_info.append(str(value))
        
        parameter = []
        for _, value in data["parameter"].items():
            parameter.append(value)
        dataset.append({"user_info": user_info, "parameter": parameter})

    return dataset

In [4]:
train_path = "../train_data/type_1/train.json"
valid_path = "../train_data/type_1/valid.json"

train_dataset = process_data(train_path)
valid_dataset = process_data(valid_path)

In [5]:
class HearingAidDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        user_info = self.dataset[idx]["user_info"]
        parameter = self.dataset[idx]["parameter"]
        return user_info, parameter

In [6]:
def collate_fn(batch):
    user_info, parameter = zip(*batch)
    user_info = [" ".join(item) for item in user_info]
    encoded_user_info = tokenizer(user_info, return_tensors="pt", padding=True, truncation=True)
    parameter = torch.tensor(list(parameter), dtype=torch.float32)
    return encoded_user_info, parameter

training_dataset = HearingAidDataset(train_dataset)
valid_dataset = HearingAidDataset(valid_dataset)

train_loader = DataLoader(training_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)

In [7]:
batch_X, batch_y = next(iter(train_loader))
print('batch_X shape:', {k: v.shape for k, v in batch_X.items()})
print('batch_y shape:', batch_y.shape)

batch_X shape: {'input_ids': torch.Size([64, 27]), 'token_type_ids': torch.Size([64, 27]), 'attention_mask': torch.Size([64, 27])}
batch_y shape: torch.Size([64, 119])


In [8]:
# Bert For Regression
# class BertForRegression(BertPreTrainedModel):
#     def __init__(self, config):
#         super().__init__(config)
#         self.num_labels = config.num_labels
#         self.bert = BertModel(config, add_pooling_layer=False)
#         self.dropout = nn.Dropout(config.hidden_dropout_prob)
#         self.multioutput = nn.Linear(config.hidden_size, self.config.num_labels)
#         self.post_init()

#     def forward(self, x):
#         x = self.bert(**x)
#         x = self.dropout(x.last_hidden_state[:, 0])
#         x = self.multioutput(x)
#         return x

In [9]:
# config = AutoConfig.from_pretrained(checkpoint, num_labels=119)
# model = BertForRegression.from_pretrained(checkpoint, config=config).to(DEVICE)
# model

In [10]:
# Bert For Regression with additional optimization
class BertForRegressionOptimized(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.bert = BertModel(config, add_pooling_layer=False)
        self.dropout = nn.Dropout(0.3)  # 可根据需要调整 Dropout 概率
        self.hidden_layer = nn.Linear(config.hidden_size, config.hidden_size // 2)
        self.output_layer = nn.Linear(config.hidden_size // 2, config.num_labels)
        self.activation = nn.ReLU()
        self.post_init()

    def forward(self, x):
        # 提取 Bert 输出特征
        bert_outputs = self.bert(**x)
        pooled_output = bert_outputs.last_hidden_state[:, 0]  # 获取 [CLS] token 的特征

        # Dropout & 隐藏层
        pooled_output = self.dropout(pooled_output)
        hidden_output = self.activation(self.hidden_layer(pooled_output))
        
        # 输出层
        output = self.output_layer(hidden_output)
        return output

In [11]:
config = AutoConfig.from_pretrained(checkpoint, num_labels=119)
model = BertForRegressionOptimized.from_pretrained(checkpoint, config=config).to(DEVICE)

model

Some weights of BertForRegressionOptimized were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['hidden_layer.bias', 'hidden_layer.weight', 'output_layer.bias', 'output_layer.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForRegressionOptimized(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, el

In [12]:
outputs = model(batch_X.to(DEVICE))
print(outputs)

tensor([[-0.0307,  0.0020,  0.0319,  ..., -0.0054,  0.1356,  0.0161],
        [-0.0256,  0.0019,  0.0177,  ..., -0.0046,  0.1351,  0.0839],
        [-0.0142,  0.0239, -0.0127,  ...,  0.0214,  0.1423,  0.0408],
        ...,
        [-0.0519,  0.0320,  0.0206,  ..., -0.0326,  0.1124,  0.0807],
        [-0.0381,  0.0232, -0.0090,  ..., -0.0388,  0.1141,  0.0817],
        [-0.0351, -0.0041, -0.0429,  ..., -0.0410,  0.1014,  0.0428]],
       device='cuda:0', grad_fn=<AddmmBackward0>)


In [13]:
learning_rate = 1e-5
loss_fn = nn.SmoothL1Loss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [14]:
from transformers import get_scheduler

epochs = 50
num_training_steps = epochs * len(train_loader)
lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps,
)
print(num_training_steps)

4800


In [15]:
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X = {k: v.to(DEVICE) for k, v in batch_X.items()}
        batch_y = batch_y.to(DEVICE)
        outputs = model(batch_X)
        loss = loss_fn(outputs, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        lr_scheduler.step()

        total_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {(total_loss / len(train_loader)):.3f}")


Epoch 1, Loss: 34.070
Epoch 2, Loss: 33.466


KeyboardInterrupt: 