# Bert

In [None]:
import pandas as pd
import numpy as np
import random
import re
import torch
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import precision_score, recall_score, f1_score

# 数据集读取
class NewsDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
    
    # 读取单个样本
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(int(self.labels[idx]))
        return item
    
    def __len__(self):
        return len(self.labels)

def flat_accuracy(preds,labels):
    '''
    flatten是numpy.ndarray.flatten的一个函数，即返回一个一维数组。
    a.flatten()：a是个数组，a.flatten()就是把a降到一维，默认是按行的方向降 。
    '''
    pred_flat=np.argmax(preds,axis=1).flatten()
    labels_flat=labels.flatten()
    return np.sum(pred_flat==labels_flat)/len(labels_flat)

# 训练函数
def train():
    model.train()
    total_train_loss = 0
    iter_num = 0
    total_iter = len(train_loader)
    for batch in train_loader:
        # 正向传播
        optim.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs[0]
        total_train_loss += loss.item()
        
        # 反向梯度信息
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        
        # 参数更新
        optim.step()
        scheduler.step()

        iter_num += 1
        if(iter_num % 100==0):
            print("epoch: %d, iter_num: %d, loss: %.4f, %.2f%%" % (epoch, iter_num, loss.item(), iter_num/total_iter*100))
        
    print("Epoch: %d, Average training loss: %.4f"%(epoch, total_train_loss/len(train_loader)))

def validation(print_metrics=False):
    model.eval()
    total_eval_accuracy = 0
    total_eval_loss = 0
    all_predictions = []
    all_labels = []
    
    for batch in test_dataloader:
        with torch.no_grad():
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        
        loss = outputs[0]
        logits = outputs[1]

        total_eval_loss += loss.item()
        logits = logits.detach().cpu().numpy()
        label_ids = labels.to('cpu').numpy()
        
        all_predictions.extend(np.argmax(logits, axis=1).flatten())
        all_labels.extend(label_ids.flatten())
        
        total_eval_accuracy += flat_accuracy(logits, label_ids)
    
    avg_val_accuracy = total_eval_accuracy / len(test_dataloader)
    avg_val_loss = total_eval_loss / len(test_dataloader)
    
    if print_metrics:
        precision = precision_score(all_labels, all_predictions)
        recall = recall_score(all_labels, all_predictions)
        f1 = f1_score(all_labels, all_predictions)
        
        print(f"Accuracy: {avg_val_accuracy:.4f}")
        print(f"Average testing loss: {avg_val_loss:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1 Score: {f1:.4f}")
        print("-------------------------------")
    else:
        print(f"Accuracy: {avg_val_accuracy:.4f}")
        print(f"Average testing loss: {avg_val_loss:.4f}")
        print("-------------------------------")

df = pd.read_excel("social support_coding scheme_0313.xlsx",sheet_name=1)
df['content']=df['content'].astype(str)

labels_columns = ['transp_e', 'emp_e', 'symp_e', 'symptom_i', 'experience_i']

for label in labels_columns:
    print(f"Processing label: {label}")
    
    x = df['content'].tolist()
    y = df[label].tolist()
    x_train, x_test, train_labels, test_labels = train_test_split(x, y, test_size=0.2, random_state=22)

    tokenizer = BertTokenizer.from_pretrained('./bert_base_chinese')
    train_encoding = tokenizer(x_train, truncation=True, padding=True, max_length=20)
    test_encoding = tokenizer(x_test, truncation=True, padding=True, max_length=20)

    train_dataset = NewsDataset(train_encoding, train_labels)
    test_dataset = NewsDataset(test_encoding, test_labels)

    model = BertForSequenceClassification.from_pretrained('./bert_base_chinese', num_labels=2)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)

    optim = AdamW(model.parameters(), lr=2e-5)
    total_steps = len(train_loader) * 1
    scheduler = get_linear_schedule_with_warmup(optim, 
                                                num_warmup_steps=0, 
                                                num_training_steps=total_steps)

    for epoch in range(10):
        print(f"------------Epoch: {epoch+1} ----------------")
        train()
        if epoch == 9:
            validation(print_metrics=True)
        else:
            validation(print_metrics=False)

# RoBERTa

In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, RobertaTokenizer, RobertaForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
from torch.utils.data import Dataset, DataLoader

class NewsDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
    
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(int(self.labels[idx]))
        return item
    
    def __len__(self):
        return len(self.labels)

def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

def train():
    model.train()
    total_train_loss = 0
    iter_num = 0
    total_iter = len(train_loader)
    
    for batch in train_loader:
        optim.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs[0]
        total_train_loss += loss.item()
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optim.step()
        scheduler.step()
        
        iter_num += 1
        if iter_num % 100 == 0:
            print(f"epoch: {epoch}, iter_num: {iter_num}, loss: {loss.item():.4f}, {iter_num/total_iter*100:.2f}%")
    
    print(f"Epoch: {epoch}, Average training loss: {total_train_loss/len(train_loader):.4f}")

def validation(print_metrics=False):
    model.eval()
    total_eval_accuracy = 0
    total_eval_loss = 0
    all_predictions = []
    all_labels = []
    
    for batch in test_dataloader:
        with torch.no_grad():
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        
        loss = outputs[0]
        logits = outputs[1]

        total_eval_loss += loss.item()
        logits = logits.detach().cpu().numpy()
        label_ids = labels.to('cpu').numpy()
        
        all_predictions.extend(np.argmax(logits, axis=1).flatten())
        all_labels.extend(label_ids.flatten())
        
        total_eval_accuracy += flat_accuracy(logits, label_ids)
    
    avg_val_accuracy = total_eval_accuracy / len(test_dataloader)
    avg_val_loss = total_eval_loss / len(test_dataloader)
    
    if print_metrics:
        precision = precision_score(all_labels, all_predictions)
        recall = recall_score(all_labels, all_predictions)
        f1 = f1_score(all_labels, all_predictions)
        
        print(f"Accuracy: {avg_val_accuracy:.4f}")
        print(f"Average testing loss: {avg_val_loss:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1 Score: {f1:.4f}")
        print("-------------------------------")
        with open('evaluate.csv', "a") as file:
            file.write("accuracy: " + str(avg_val_accuracy))
            file.write('\n')
            file.write("precision:" + str(precision))
            file.write('\n')
            file.write("recall:" + str(recall))
            file.write('\n')
            file.write("f1score:" + str(f1))
            file.write('\n')
    else:
        print(f"Accuracy: {avg_val_accuracy:.4f}")
        print(f"Average testing loss: {avg_val_loss:.4f}")
        print("-------------------------------")

df = pd.read_excel("social support_coding scheme_0313.xlsx", sheet_name=1)
df['content'] = df['content'].astype(str)

labels_to_process = ['transp_e', 'emp_e', 'symp_e', 'symptom_i', 'experience_i', 'objective_i']
pretrained_models = ['chinese-roberta-wwm-ext', 'chinese-roberta-wwm-ext-large']
# pretrained_models = ['RoBERTa_zh_L12_PyTorch', 'RoBERTa_zh_Large_PyTorch']
# pretrained_models = ['roberta-base-chinese-extractive-qa','chinese_roberta_L-4_H-512', 'roberta-base-finetuned-dianping-chinese', 'roberta-base-finetuned-jd-binary-chinese']

for pretrained in pretrained_models:
    for label in labels_to_process:
        print(f"Processing label: {label}")
        
        x = df['content'].tolist()
        y = df[label].tolist()
        x_train, x_test, train_labels, test_labels = train_test_split(x, y, test_size=0.2, random_state=22)
    
        tokenizer = AutoTokenizer.from_pretrained('./'+pretrained)   # 可以参考 https://huggingface.co/docs/transformers/v4.43.4/en/autoclass_tutorial#autotokenizer
        # tokenizer = RobertaTokenizer.from_pretrained('./'+pretrained)   # 会报错
        model = AutoModelForSequenceClassification.from_pretrained('./'+pretrained, num_labels=2)
        # model = RobertaForSequenceClassification.from_pretrained('./'+pretrained, num_labels=2)
    
        with open('evaluate.csv', "a") as file:
            file.write(pretrained)  # 写入使用的预训练模型
            file.write('\n')
            file.write("label: " + label)
            file.write('\n')
        
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        
        train_encoding = tokenizer(x_train, truncation=True, padding=True, max_length=20)
        test_encoding = tokenizer(x_test, truncation=True, padding=True, max_length=20)
    
        train_dataset = NewsDataset(train_encoding, train_labels)
        test_dataset = NewsDataset(test_encoding, test_labels)
    
        train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
        test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)
    
        optim = AdamW(model.parameters(), lr=2e-5)
        total_steps = len(train_loader) * 1
        scheduler = get_linear_schedule_with_warmup(optim, 
                                                    num_warmup_steps=0, 
                                                    num_training_steps=total_steps)
    
        for epoch in range(10):
            print(f"------------Epoch: {epoch+1} ----------------")
            train()
            if epoch == 9:
                validation(print_metrics=True)
            else:
                validation(print_metrics=False)