In [None]:
!pip install transformers[torch]

!pip install datasets

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import transformers

transformers.utils.logging.set_verbosity_debug()

# dataset


In [None]:
!wget --no-check-certificate 'https://drive.google.com/uc?export=download&id=1gDuwzgSk8rxUnQKR0Hyn70A5SMt1l4_9' -O data.zip

!unzip data.zip


directory = './data/investopedia'
dataframes = []

for filename in os.listdir(directory):
    if filename.endswith('.csv'):
        filepath = os.path.join(directory, filename)
        df = pd.read_csv(filepath)
        dataframes.append(df)

df = pd.concat(dataframes, ignore_index=True)


df = df.loc[df['Title'] != 'No Title Found']
text = pd.Series(df['Title'] + df['Summary'], dtype=str).reset_index(drop=True)
# Calculate the lengths of the strings
lengths = text.str.len()

# Plot the histogram
plt.hist(lengths, bins=30, edgecolor='black')
plt.xlabel('Length of Text')
plt.ylabel('Frequency')
plt.title('Histogram of Text Lengths')
plt.show()



### training

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling
from sklearn.model_selection import train_test_split
import pandas as pd
from datasets import Dataset
import transformers
import json

# Enable logging for debugging
transformers.utils.logging.set_verbosity_debug()

# Assuming your data is in a DataFrame called `text`
df = text

# Tokenizer and Model Initialization
model_name = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForMaskedLM.from_pretrained(model_name)

# Dataset Preprocessing
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)

# Split dataset into train and test sets
train_texts, test_texts = train_test_split(df.tolist(), test_size=0.1, random_state=42)
train_dataset = Dataset.from_dict({"text": train_texts})
test_dataset = Dataset.from_dict({"text": test_texts})

train_dataset = train_dataset.map(tokenize_function, batched=True, num_proc=1, remove_columns=["text"])
test_dataset = test_dataset.map(tokenize_function, batched=True, num_proc=1, remove_columns=["text"])

# Define Data Collator for Masked Language Modeling
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)

# Define Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    learning_rate=3e-5,
    weight_decay=0.01,
)

# Define the compute_metrics function to calculate perplexity
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    # Convert logits and labels from NumPy arrays to PyTorch tensors
    logits = torch.tensor(logits)
    labels = torch.tensor(labels)
    # Shift so that tokens < n predict n
    shift_logits = logits[..., :-1, :].contiguous()
    shift_labels = labels[..., 1:].contiguous()
    # Flatten the tokens
    loss_fct = torch.nn.CrossEntropyLoss()
    loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

    perplexity = torch.exp(loss)
    return {"perplexity": perplexity.item()}

# Define Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

# Train and Save the Model
trainer.train()
trainer.save_model("./finbert_finetuned")
tokenizer.save_pretrained("./finbert_finetuned")

# Save Trainer State
trainer.state.save_to_json("./finbert_finetuned/trainer_state.json")

# Save Training Arguments
with open("./finbert_finetuned/training_args.json", "w") as f:
    json.dump(training_args.to_dict(), f)


### Testing

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM
import random
from nltk.corpus import stopwords
from nltk import word_tokenize, pos_tag

# Download stopwords if not already available
import nltk
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')

# Load the fine-tuned model and tokenizer
model_name = "./finbert_finetuned"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForMaskedLM.from_pretrained(model_name)

# Set of English stopwords
stop_words = set(stopwords.words('english'))

def mask_word_tokens(text, tokenizer, mask_probability=0.10):
    """
    Mask tokens randomly, excluding punctuation, stop words, and one-character words.
    """
    tokens = tokenizer.tokenize(text)
    tokens_with_pos = pos_tag(tokens)
    
    # Exclude stopwords, punctuation, and one-character words
    eligible_tokens = [
        i for i, (token, pos) in enumerate(tokens_with_pos)
        if token.lower() not in stop_words and len(token) > 1 and token.isalnum()
    ]
    
    num_tokens_to_mask = max(1, int(len(eligible_tokens) * mask_probability))
    mask_indices = random.sample(eligible_tokens, num_tokens_to_mask)
    
    masked_tokens = tokens.copy()
    for idx in mask_indices:
        masked_tokens[idx] = tokenizer.mask_token
    
    return tokenizer.convert_tokens_to_string(masked_tokens)

# Function to predict masked tokens
def predict_masked_tokens(test_text, model, tokenizer):
    masked_text = mask_word_tokens(test_text, tokenizer, mask_probability=0.10)
    print(f"Original text ---- {test_text}")
    print(f"Masked text ---- {masked_text}")
    
    inputs = tokenizer(masked_text, return_tensors="pt")
    
    with torch.no_grad():
        logits = model(**inputs).logits
    
    # Find the indices of the masked tokens
    mask_token_indices = torch.where(inputs.input_ids == tokenizer.mask_token_id)[1]
    
    # Decode the predicted tokens
    predicted_tokens = []
    for index in mask_token_indices:
        predicted_token_id = logits[0, index].argmax(axis=-1)
        predicted_token = tokenizer.decode(predicted_token_id)
        predicted_tokens.append(predicted_token)
    
    # Replace the [MASK] tokens with the predicted tokens
    output_text = masked_text
    for predicted_token in predicted_tokens:
        output_text = output_text.replace('[MASK]', predicted_token, 1)
    
    print(f"Predicted text: {output_text}")

# Example test text
test_text = (
    "Unsecured Debt Definition: Unsecured debts are loans that are not collateralized."
    " They generally require higher interest rates because they offer the lender limited protection against default."
    " Lenders can mitigate this risk by reporting defaults to credit rating agencies."
)

# Run the prediction
predict_masked_tokens(test_text, model, tokenizer)
