In [106]:
!pip install -U datasets torch imbalanced-learn transformers

Collecting transformers
  Obtaining dependency information for transformers from https://files.pythonhosted.org/packages/51/51/b87caa939fedf307496e4dbf412f4b909af3d9ca8b189fc3b65c1faa456f/transformers-4.46.3-py3-none-any.whl.metadata
  Downloading transformers-4.46.3-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.1/44.1 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers<0.21,>=0.20 (from transformers)
  Obtaining dependency information for tokenizers<0.21,>=0.20 from https://files.pythonhosted.org/packages/50/f6/2841de926bc4118af996eaf0bdf0ea5b012245044766ffc0347e6c968e63/tokenizers-0.20.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading tokenizers-0.20.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting safetensors>=0.4.1 (from transformers)
  Obtaining dependency information for safetensors>=0.4.1 from https://files.pythonhosted.org/packages/e6

In [73]:
from datasets import load_dataset

dataset = load_dataset("takala/financial_phrasebank", "sentences_50agree", trust_remote_code = True)


# preprocess data

In [74]:
import torch
from torch.nn.utils.rnn import pad_sequence

def tokenize(data):
    tokens = [sentence.lower().split() for sentence in data]
    return tokens

def build_vocab(data):
    tokens = tokenize(data)

    unique_tokens = set(token for tokens in tokens for token in tokens)
    vocab = {token: idx for idx, token in enumerate(unique_tokens, start=2)}
    vocab["<unk>"] = 0  # Unknown tokens
    vocab["<pad>"] = 1  # Padding token
    
    return vocab

In [75]:
sentence_data = dataset['train']['sentence']
vocab = build_vocab(sentence_data)

In [76]:
# processes the data
def process_data(dataset, vocab):
    # tokenize the data
    tokenized_data = tokenize(dataset['sentence'])

    # convert the data into numerical data 
    processed_data = [torch.tensor([vocab.get(token, vocab["<unk>"]) for token in tokens]) for tokens in tokenized_data]

    # pad the data
    padded_data = pad_sequence(processed_data, batch_first=True, padding_value=vocab["<pad>"])

    # attention mask
    attention_mask = (padded_data != vocab["<pad>"]).int()
    return padded_data.float(), attention_mask

In [77]:
dataset = dataset['train']

In [78]:
vocab = build_vocab(dataset['sentence'])
padded_data, attention_mask = process_data(dataset, vocab)

output = torch.tensor([torch.tensor(label) for label in dataset['label']])

In [79]:
print(output.shape, padded_data.shape)


torch.Size([4846]) torch.Size([4846, 81])


In [80]:
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE


# Convert to numpy (for sklearn's train_test_split)
padded_data_np = padded_data.numpy()
output_np = output.numpy()

# generate synthetic data
smote = SMOTE()
padded_data_np, output_np = smote.fit_resample(padded_data_np, output_np)

# Use train_test_split from sklearn
train_inputs, test_inputs, train_labels, test_labels = train_test_split(
    padded_data_np, output_np, test_size=0.2, random_state=42, stratify=output_np
)

# Convert back to torch tensors
train_inputs = torch.tensor(train_inputs)
test_inputs = torch.tensor(test_inputs)
train_labels = torch.tensor(train_labels)
test_labels = torch.tensor(test_labels)

In [81]:
# in train labels print the occurence of every value
print(train_labels.unique(return_counts=True))
print(test_labels.unique(return_counts=True))

(tensor([0, 1, 2]), tensor([2303, 2303, 2303]))
(tensor([0, 1, 2]), tensor([576, 576, 576]))


In [82]:
print(train_inputs.shape, test_inputs.shape, train_labels.shape, test_labels.shape)

torch.Size([6909, 81]) torch.Size([1728, 81]) torch.Size([6909]) torch.Size([1728])


## Preprocess using Pretrained tokenizer


In [74]:
from transformers import AutoTokenizer

# Replace 'bert-base-uncased' with your specific model's name
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')



In [4]:
def tokenize_function(examples):
    return tokenizer(
        examples['sentence'],   # Adjust key based on your dataset
        padding="max_length",  # Pad to max_length (helps batching)
        truncation=True,       # Truncate sequences longer than max_length
        max_length=128         # Set a suitable max_length
        
    )

# Tokenize the dataset
tokenized_dataset = dataset.map(tokenize_function, batched=True)

Map:   0%|          | 0/4846 [00:00<?, ? examples/s]

In [36]:
print(train_data.column_names)


['sentence', 'label', 'input_ids', 'token_type_ids', 'attention_mask']


In [38]:
split_dataset = tokenized_dataset['train'].train_test_split(test_size=0.2)

train_data = split_dataset['train']
test_data = split_dataset['test']



# Set the format for PyTorch tensors
train_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
test_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])



In [49]:
train_data

Dataset({
    features: ['sentence', 'label', 'input_ids', 'token_type_ids', 'attention_mask'],
    num_rows: 3876
})

# models

In [50]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Define the RNN model
import torch
import torch.nn as nn

class RNNModel(nn.Module):
    def __init__(self, 
                 input_size: int, 
                 hidden_size: int = 128,
                 num_layers: int = 2,
                 num_classes: int = 2,
                 dropout_rate: float = 0.3):
        
        super(RNNModel, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # RNN Layer
        self.rnn = nn.RNN(input_size, 
                          hidden_size, 
                          num_layers, 
                          batch_first=True, 
                          dropout=dropout_rate if num_layers > 1 else 0.0)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, 
                         x.size(0), 
                         self.hidden_size).to(x.device)
        
        # Forward propagate through RNN
        out, _ = self.rnn(x, h0)  # out: [batch_size, seq_length, hidden_size]
        
        # Get the output from the last time step
        out = out[:, -1, :]  # [batch_size, hidden_size]
        
        # Pass through the fully connected layer
        out = self.fc(out)  # [batch_size, num_classes]
        return out


class GRUModel(nn.Module):
    def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 2,
                 num_classes: int = 2, dropout_rate: float = 0.3):
        """
        GRU Model for sequence classification
        """
        super(GRUModel, self).__init__()
        
        # Determine number of directions
        
        # GRU Layer
        self.gru = nn.GRU(
            input_size=input_size, 
            hidden_size=hidden_size, 
            num_layers=num_layers, 
            batch_first=True,
            dropout=dropout_rate if num_layers > 1 else 0,
        )
        
        # Dropout and normalization
        self.dropout = nn.Dropout(dropout_rate)
        
        # Fully connected layers
        fc_input_size = hidden_size
        self.fc1 = nn.Linear(fc_input_size, fc_input_size // 2)
        self.fc2 = nn.Linear(fc_input_size // 2, num_classes)
        
        # Layer normalization
        self.layer_norm = nn.LayerNorm(fc_input_size // 2)
    
    def forward(self, x):
        """
        Forward pass of the GRU model
        """
        # Initialize hidden state
        batch_size = x.size(0)
        h0 = torch.zeros(self.gru.num_layers, batch_size, 
            self.gru.hidden_size, device=x.device
        
        )
        
        # GRU processing
        out, hidden = self.gru(x, h0)
        
        # Extract the last hidden state
     
        hidden = hidden[-1]
        
        # Apply dropout and fully connected layers
        x = self.dropout(hidden)
        x = F.relu(self.fc1(x))
        x = self.layer_norm(x)
        x = self.dropout(x)
        logits = self.fc2(x)
        
        return logits

In [51]:
class LSTMModel(nn.Module):
    def __init__(self, input_size: int, hidden_size: int = 128,num_layers: int = 3,
                 num_classes: int = 3,dropout_rate: float = 0.3):
        
        super(LSTMModel, self).__init__()
        
        # LSTM Layer
        self.lstm = nn.LSTM(
            input_size=input_size, 
            hidden_size=hidden_size, 
            num_layers=num_layers, 
            batch_first=True,
            dropout=dropout_rate,
        )
        
        # Dropout and normalization
        self.dropout = nn.Dropout(dropout_rate)
        
        # Fully connected layers
        fc_input_size = hidden_size
        self.fc1 = nn.Linear(fc_input_size, fc_input_size // 2)
        self.fc = nn.Linear(fc_input_size // 2, fc_input_size // 2)
        self.fc2 = nn.Linear(fc_input_size // 2, num_classes)
        
        # Layer normalization
        self.layer_norm = nn.LayerNorm(fc_input_size // 2)
    
    def forward(self, x):
        """
        Forward pass of the LSTM model
        
        """
        # Initialize hidden and cell states
        batch_size = x.size(0)
        h0 = torch.zeros(self.lstm.num_layers, batch_size, 
            self.lstm.hidden_size, device=x.device)
        c0 = torch.zeros(self.lstm.num_layers, batch_size, 
            self.lstm.hidden_size, device=x.device)
        
        # LSTM processing
        _, (hidden, _) = self.lstm(x, (h0, c0))
        
        hidden = hidden[-1]
        
        # Apply dropout and fully connected layers
        x = self.dropout(hidden)
        x = F.relu(self.fc1(x))
        x = self.layer_norm(x)
        x = self.dropout(x)
        # x = F.relu(self.fc(x))
        # x = self.layer_norm(x)
        # x = self.dropout(x)
        logits = self.fc2(x)
        
        return logits

In [52]:
train_data['input_ids'].shape

torch.Size([3876, 128])

In [63]:
# parameters
parameters = {
    'input_size': train_data['input_ids'].shape[1],
    'hidden_size': 128,
    'num_layers': 5,
    'num_classes': 3,
    'dropout_rate': 0.3,
    
}
num_epochs = 5
learning_rate = 0.001


In [67]:
# Initialize model, loss, optimizer
# model = RNNModel(**parameters)
model = LSTMModel(**parameters)
#model = GRUModel(**parameters)

# trying to balance out the classes

criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [68]:
print(train_inputs.shape)

torch.Size([3876, 128])


In [69]:
# Training loop

model.train()  # Set model to training mode

for epoch in range(num_epochs):
    
    train_inputs = train_data['input_ids'].float()
    train_labels = train_data['label']

    # Forward pass
    outputs = model(train_inputs.unsqueeze(1))  # Shape: [batch_size, seq_length, input_size]
    
    # Calculate the loss
    loss = criterion(outputs, train_labels)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Calculate accuracy
    _, predicted = torch.max(outputs, dim=1)
    correct = (predicted == train_labels).sum().item()
    accuracy = correct / train_labels.size(0)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Accuracy: {accuracy:.4f}')



Epoch [1/5], Loss: 1.0624, Accuracy: 0.4600
Epoch [2/5], Loss: 1.0002, Accuracy: 0.5459
Epoch [3/5], Loss: 0.9771, Accuracy: 0.5655
Epoch [4/5], Loss: 0.9687, Accuracy: 0.5725
Epoch [5/5], Loss: 0.9620, Accuracy: 0.5810


In [70]:
train_inputs.unsqueeze(-1).shape

torch.Size([3876, 128, 1])

In [71]:
print(model)

d = {}
# Evaluation
model.eval()  # Set model to evaluation mode
with torch.no_grad():
    test_inputs = test_data['input_ids'].float()
    test_labels = test_data['label']
    
    test_outputs = model(test_inputs.unsqueeze(1))
    print(test_outputs)
    _, predicted = torch.max(test_outputs, 1)
    
    
    accuracy = (predicted == test_labels).sum().item() / test_labels.size(0)
    print(f'Test Accuracy: {accuracy:.4f}')



print(predicted)


LSTMModel(
  (lstm): LSTM(128, 128, num_layers=5, batch_first=True, dropout=0.3)
  (dropout): Dropout(p=0.3, inplace=False)
  (fc1): Linear(in_features=128, out_features=64, bias=True)
  (fc): Linear(in_features=64, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=3, bias=True)
  (layer_norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
)
tensor([[-1.4999,  0.6492, -0.1939],
        [-1.4971,  0.6520, -0.1993],
        [-1.4985,  0.6512, -0.1973],
        ...,
        [-1.4969,  0.6504, -0.1966],
        [-1.5016,  0.6498, -0.1951],
        [-1.4991,  0.6495, -0.1964]])
Test Accuracy: 0.5979
tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1

In [72]:
# save the model
torch.save(model.state_dict(), 'GRU.pth')