In [None]:
from transformers import AutoTokenizer, AutoModel
import torch.nn as nn
import torch
import numpy as np
import pandas as pd
import pandas as pd
from nltk.corpus import sentiwordnet as swn
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from collections import defaultdict

In [None]:
model = AutoModel.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")

In [None]:
from torch.utils.data import Dataset, DataLoader

class Data_set(Dataset):
    def __init__(self, file):
        self.dataset = pd.read_csv(file, sep='\t')
#         self.dataset = load_from_disk('./ChnSentiCorp')[split]

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, i):
        text = self.dataset['text_a'][i]
        label = self.dataset['label'][i]
        
        return text, label
    
train_data = Data_set('D:\\jupyter\\Chinese\\train.tsv')
dev_data = Data_set('D:\\jupyter\\Chinese\\dev.tsv')
test_data = Data_set('D:\\jupyter\\Chinese\\test.tsv')

In [None]:
def ss_data(data):
    t_data = [i[0] for i in data]
    t_label = [i[1] for i in data]
    data_token = tokenizer.batch_encode_plus(batch_text_or_text_pairs=t_data, truncation = True, padding ='max_length',
                                             max_length = 512, return_tensors ='pt', return_length=True)
    #input_ids:编码之后的数字
    #attention_mask:是补零的位置是0,其他位置是1
    input_ids = data_token['input_ids']
    attention_mask = data_token['attention_mask']
#     token_type_ids = data_token['token_type_ids'] # Roberta 不需要这个
    labels = torch.LongTensor(t_label)
    
    return input_ids, attention_mask, labels


train_data_loader = DataLoader(train_data, batch_size = 16,collate_fn = ss_data, shuffle=True, drop_last=False)
dev_data_loader = DataLoader(dev_data, batch_size = 16, collate_fn = ss_data, shuffle=True, drop_last=False)
test_data_loader = DataLoader(test_data, batch_size = 16, collate_fn = ss_data, shuffle=True, drop_last=False)

In [None]:
class Sentiment(nn.Module):
    def __init__(self):
        super(Sentiment, self).__init__()
        self.bert = AutoModel.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
        self.out = nn.Linear(768, 3)
    
    def forward(self, input_ids,  attention_mask):
        output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        out1 = self.out(output[1])
        out = out1.softmax(dim=1)
        return out


In [None]:
model = Sentiment()
model.to(device)

In [None]:
history = defaultdict(list)
best_accuracy = 0
epochs = 1
validation = True
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
total_steps = len(train_data_loader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer,num_warmup_steps=0,num_training_steps=total_steps)
loss_fn = nn.CrossEntropyLoss()
model.train()
for epcoh in range(epochs):
    losses = []
    correct_predictions = 0
    
    for data in tqdm(train_data_loader):
        data = [i.to(device) for i in data]
        outputs = model(input_ids=data[0], attention_mask=data[1])
        preds = outputs.argmax(dim=1)
        target = data[2]
        loss = loss_fn(outputs,target)

        correct_predictions += torch.sum(preds == target)
        losses.append(loss.item())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        
        train_acc = correct_predictions.double() / len(train_data)
        train_loss = np.mean(losses)
        
        
    if validation:
        model = model.eval()
        losses = []
        correct_predictions = 0
        with torch.no_grad():
            for data in tqdm(dev_data_loader):
                data = [i.to(device) for i in data]
                outputs = model(input_ids=data[0], attention_mask=data[1])
                preds = outputs.argmax(dim=1)
                target = data[2]
                loss = loss_fn(outputs, target)
                correct_predictions += torch.sum(preds == target)
                losses.append(loss.item())
                
        dev_acc = correct_predictions.double() / len(dev_data)
        dev_loss = np.mean(losses)
    
    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(dev_acc)
    history['val_loss'].append(dev_loss)
    if dev_acc > best_accuracy:
        print('Save Model')
        torch.save(model.state_dict(), 'best_model_state.bin')
        best_accuracy = dev_acc
        print(dev_acc,train_acc)

In [None]:
# 数据结构示例
# DataFrame 包含以下列：['text_id', 'original_text', 'sentiment_polarity', 'opinion_words']
data = {
    "text_id": [1, 2],
    "original_text": [
        "This product is absolutely amazing!",
        "The service is terrible and disappointing."
    ],
    "sentiment_polarity": ["positive", "neutral"，"negative"],
    "opinion_words": [
        ["amazing", "absolutely"],
        ["terrible", "disappointing"]
    ]
}

df = pd.DataFrame(data)

# 词性映射到 SentiWordNet 格式
def get_swn_pos(tag):
    if tag.startswith('J'):
        return 'a'  # 形容词
    elif tag.startswith('N'):
        return 'n'  # 名词
    elif tag.startswith('R'):
        return 'r'  # 副词
    elif tag.startswith('V'):
        return 'v'  # 动词
    return None

# 计算情感强度
def get_sentiment_intensity(word, pos):
    try:
        synsets = list(swn.senti_synsets(word, pos))
        if synsets:
            swn_scores = [synset.pos_score() - synset.neg_score() for synset in synsets]
            return sum(swn_scores) / len(swn_scores)  # 平均分
    except:
        return 0  # 如果词未在 SentiWordNet 中找到
    return 0

# 将情感强度分类为5个类别
def classify_intensity(intensity):
    if intensity <= -0.6:
        return "very negative"
    elif -0.6 < intensity <= -0.2:
        return "negative"
    elif -0.2 < intensity <= 0.2:
        return "neutral"
    elif 0.2 < intensity <= 0.6:
        return "positive"
    else:
        return "very positive"

# 构建情感词典和分析强度
def analyze_sentiment_intensity(df):
    sentiment_dict = defaultdict(dict)

    for _, row in df.iterrows():
        opinion_words = row['opinion_words']
        tokenized_text = word_tokenize(row['original_text'])
        pos_tags = pos_tag(tokenized_text)

        for word in opinion_words:
            pos = next((get_swn_pos(tag) for w, tag in pos_tags if w.lower() == word.lower()), None)
            if pos:
                intensity = get_sentiment_intensity(word, pos)
                intensity_category = classify_intensity(intensity)
                sentiment_dict[word]['intensity'] = intensity
                sentiment_dict[word]['intensity_category'] = intensity_category
                sentiment_dict[word]['polarity'] = row['sentiment_polarity']

    return sentiment_dict

# 调用函数
sentiment_intensity_dict = analyze_sentiment_intensity(df)

# 显示结果
for word, details in sentiment_intensity_dict.items():
    print(f"Word: {word}, Intensity: {details['intensity']:.2f}, Intensity Category: {details['intensity_category']}, Polarity: {details['polarity']}")

# 将结果保存到 DataFrame
results = [
    {"word": word, "intensity": details['intensity'], "intensity_category": details['intensity_category'], "polarity": details['polarity']}
    for word, details in sentiment_intensity_dict.items()
]
results_df = pd.DataFrame(results)

# 保存到 CSV
results_df.to_csv("sentiment_intensity_results.csv", index=False)
