### DeBERTa sentiment analysis

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
path = '/content/drive/MyDrive/Kenkyu/Finance/2024/data/'
%cd $path
%ls

In [None]:
!pip install transformers datasets pandas torch

In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F

model_name = "./DeBERTa/best_model_f1"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

In [None]:
import re

def remove_prefix(text):
    pattern = r'.【.*?】'
    result = re.sub(pattern, '', text)
    return result

In [None]:
model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
def process_chunk(chunk, text_column_index):
    texts = chunk.iloc[:, text_column_index].tolist()
    texts = [remove_prefix(text) for text in texts]
    print(texts)

    def preprocess_function(texts):
        return tokenizer(texts, padding='max_length', truncation=True, max_length=128, return_tensors="pt")

    inputs = preprocess_function(texts)

    with torch.no_grad():
        inputs = {key: val.to(device) for key, val in inputs.items()}
        outputs = model(**inputs)
        probs = F.softmax(outputs.logits, dim=-1)
        positive_probs = probs[:, 1].cpu().numpy()
        negative_probs = probs[:, 0].cpu().numpy()

    select_columns = [0, 1, 2, 3, 4, 6, 7]
    result_chunk = chunk.iloc[:, select_columns].copy()
    result_chunk['positive_probability'] = positive_probs
    result_chunk['negative_probability'] = negative_probs

    return result_chunk

In [None]:
import os
temp_dir = "./temp_chunks"
os.makedirs(temp_dir, exist_ok=True)

In [None]:
input_csv = './MDA_DataSet_2014_2022_TSE1.csv'
output_csv = './MDA_DataSet_2014_2022_TSE1_deberta.csv'
chunk_size = 50

In [None]:
chunk_number = 0
for chunk in pd.read_csv(input_csv, chunksize=chunk_size):
    processed_chunk = process_chunk(chunk, text_column_index=5)
    temp_chunk_file = os.path.join(temp_dir, f"chunk_{chunk_number}.csv")
    processed_chunk.to_csv(temp_chunk_file, index=False)
    chunk_number += 1

header_written = False
with open(output_csv, 'w', encoding='utf-8') as fout:
    for i in range(chunk_number):
        temp_chunk_file = os.path.join(temp_dir, f"chunk_{i}.csv")
        with open(temp_chunk_file, 'r', encoding='utf-8') as fin:
            if not header_written:
                fout.write(fin.read())
                header_written = True
            else:
                next(fin)
                fout.write(fin.read())

for i in range(chunk_number):
    temp_chunk_file = os.path.join(temp_dir, f"chunk_{i}.csv")
    os.remove(temp_chunk_file)
os.rmdir(temp_dir)