In [30]:
# imports
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
import re

In [31]:
# load data set
df = pd.read_csv("news.csv", encoding='latin-1', header=None) 
df.columns = ['label', 'text']
print(df.head())
print(df.columns)

      label                                               text
0   neutral  According to Gran , the company has no plans t...
1   neutral  Technopolis plans to develop in stages an area...
2  negative  The international electronic industry company ...
3  positive  With the new production plant the company woul...
4  positive  According to the company 's updated strategy f...
Index(['label', 'text'], dtype='object')


In [32]:
# clean text
def clean_text(text):
    text = text.lower() # convert string to lowercase
    text = re.sub(r"http\S+|www\S+|https\S+", '', text, flags=re.MULTILINE)  # remove links
    text = re.sub(r'\@w+|\#','', text)  # remove mentions and hashtags
    text = re.sub(r'[^a-zA-Z\s]', '', text)  # remove numbers and punctuation
    return text
df['text'] = df['text'].astype(str).apply(clean_text) # updates text column to string and cleans

# encode labels
encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['label'])  

print("Label classes:", encoder.classes_)

Label classes: ['negative' 'neutral' 'positive']


In [33]:
# train/tests split
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['text'], df['label'], test_size=0.2, random_state=42
)
# here were training it so it can predict what label each text goes under

In [34]:
# create data set class
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset
from collections import Counter


# splits text at any whitespace
def tokenizer(text):
    return text.split()

# goes through all of the text and yeilds the tokens
def yield_tokens(data_iter):
    for text in data_iter:
        yield tokenizer(text)


def build_vocab(texts, min_freq=1):
    counter = Counter()
    for text in texts:
        counter.update(tokenizer(text))
    # start indices at 2 so we can reserve 0 for PAD and 1 for UNK
    vocab = {word: i+2 for i, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab["<pad>"] = 0
    vocab["<unk>"] = 1
    return vocab

vocab_dict = build_vocab(train_texts, min_freq=2)  
print("Vocab size:", len(vocab_dict))

def numericalize(text, vocab):
    tokens = tokenizer(text)  # split into words
    return [vocab.get(token, vocab["<unk>"]) for token in tokens]



Vocab size: 4039


In [35]:

class NewsDataset(Dataset):
    def __init__(self, texts, labels, vocab):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab

    # number of samples in dataset
    def __len__(self):
        return len(self.texts)

    # call when we want only one sample
    def __getitem__(self, idx):
        text = self.texts.iloc[idx]
        label = self.labels.iloc[idx]
        tokens = torch.tensor(numericalize(text, self.vocab), dtype=torch.long) #self.vocab is for mapping words to numbers
        return tokens, torch.tensor(label, dtype=torch.long) #returns tokens and labels
    
# make data into dataset object
train_dataset = NewsDataset(train_texts, train_labels, vocab_dict)
test_dataset = NewsDataset(test_texts, test_labels, vocab_dict)

def collate_batch(batch):
    texts, labels = zip(*batch)
    texts = pad_sequence(texts, batch_first=True, padding_value=0) #makes all lists same length by adding zeros at end
    labels = torch.tensor(labels, dtype=torch.long)
    return texts, labels

# train 32 healines per batch
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_batch) # reshuffle every epoch
test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_batch)

print("Train batches:", len(train_loader))
print("Test batches:", len(test_loader))





Train batches: 122
Test batches: 31


In [36]:
import torch.nn as nn

class TextClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes):
        super(TextClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim) # turns vocab id into dense vector
        self.fc = nn.Linear(embed_dim, num_classes) # input is average embedding of a sentence and output is num_classes(2)cwhich is buy or sell
    
    def forward(self, x):
        # x = (batch size, seq len)
        embedded = self.embedding(x) # gives us original output and embed_dim
        pooling = embedded.mean(dim = 1) # average embedding across all words in teh sentence, used to get one vec per sentence
        output = self.fc(pooling) # passes vector into classifier
        return output

In [37]:
#initialize model, loss, optimizer
vocab_size = max(vocab_dict.values()) + 1 # amount of unique words
embed_dim = 50 # size of each word vector, the larger the number the more expressive the word is
num_classes = len(set(train_labels)) # number of classes -> 2(buy/sell)
model = TextClassifier(vocab_size, embed_dim, num_classes)
criterion = nn.CrossEntropyLoss() # diff between prediction and target
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001) # optimizer that updates weights using gradients


In [38]:
# model training loop
def train_model(self, train_loader, criterion, optimizer, epochs = 5):
    model.train() # put in training mode
    for epoch in range(epochs):
        total_loss = 0
        for texts, label in train_loader:
            optimizer.zero_grad() # reset gradients
            outputs = model(texts) # forward pass
            loss = criterion(outputs, label) # calculate error
            loss.backward() # back propagation
            optimizer.step() # update weights
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")

In [39]:
def evaluate_model(self, test_loader):
    model.eval() # put in evaluation mode
    correct, total = 0, 0
    with torch.no_grad(): # gradients not needed in eval mode
        for texts, labels in test_loader:
            outputs = model(texts) # forward pass
            _, predicted = torch.max(outputs, 1) # get prediction index 0 = sell 1 = buy
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f"Test Accuracy: {100 * correct / total:.2f}%")

In [42]:
# Train the model
train_model(model, train_loader, criterion, optimizer, epochs=10)

# Evaluate on test set
evaluate_model(model, test_loader)


Epoch 1/10, Loss: 74.0861
Epoch 2/10, Loss: 71.8376
Epoch 3/10, Loss: 68.4318
Epoch 4/10, Loss: 66.1710
Epoch 5/10, Loss: 62.4513
Epoch 6/10, Loss: 60.1393
Epoch 7/10, Loss: 57.3580
Epoch 8/10, Loss: 55.3067
Epoch 9/10, Loss: 52.5003
Epoch 10/10, Loss: 50.0892
Test Accuracy: 75.46%


In [None]:
# use model
def prediction(text, model, vocab, max_len = 50):
    model.eval()
    with torch.no_grad():
        # tokenize
        tokens = text.lower().split()
        token_ids = [vocab_dict.get(word, vocab["<unk"]) for word in tokens]

        # pad
        if len(tokens) < max_len:
            token_ids += [vocab["<pad>"]] * (max_len - len(token_ids))
        else:
            token_ids = token_ids[:max_len]

        input_tensor = torch.tensor([token_ids])
        output = model(input_tensor)

        probs = F.softmax(output, dim=1) # rescales input and has it sum to 1, probability distribution
        predicted_class = torch.argmax(probs, dim=1).item() # picks class with highest probability
        return predicted_class, probs.numpy()