# LSTM

In [None]:
# IMPORTS

import pandas as pd
import numpy as np
import model_utils 
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import torch.nn as nn
from collections import Counter

In [4]:
# NORMALIZE AND CONCAT ALL DATASETS

normalized_dfs = [
    # Misinfo Dataset
    model_utils.normalize_misinfo_dataset("datasets/misinfo-dataset/DataSet_Misinfo_FAKE.csv"),
    model_utils.normalize_misinfo_dataset("datasets/misinfo-dataset/DataSet_Misinfo_TRUE.csv"),
    model_utils.normalize_misinfo_dataset("datasets/misinfo-dataset/EXTRA_RussianPropagandaSubset.csv"),
    # Fake News Net Dataset
    model_utils.normalize_fakenewsnet_dataset("datasets/fakenewsnet-dataset/gossipcop_fake.csv"),
    model_utils.normalize_fakenewsnet_dataset("datasets/fakenewsnet-dataset/gossipcop_real.csv"),
    model_utils.normalize_fakenewsnet_dataset("datasets/fakenewsnet-dataset/politifact_fake.csv"),
    model_utils.normalize_fakenewsnet_dataset("datasets/fakenewsnet-dataset/politifact_real.csv"),
    # Liar Dataset
    model_utils.normalize_liar_dataset("datasets/liar-dataset/train.tsv")
]

df = pd.concat(normalized_dfs, ignore_index=True)
df = df.dropna(subset=["text"]).reset_index(drop=True)

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

def tokenize(text):
    return text.lower().split()

counter = Counter()
for text in train_df["text"]:
    counter.update(tokenize(text))

vocab_size = 10000
most_common = counter.most_common(vocab_size - 2)  # Reserve 0 for PAD, 1 for UNK
word2idx = {"<PAD>": 0, "<UNK>": 1}
word2idx.update({word: i + 2 for i, (word, _) in enumerate(most_common)})

def encode(text):
    return [word2idx.get(w, 1) for w in tokenize(text)[:300]]  # truncate at 300


#### Instantiate Models and Classes

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim=100, hidden_dim=64, pad_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)
        _, (hidden, _) = self.lstm(packed)
        logits = self.fc(hidden[-1])
        return logits.squeeze(1)

class Dataset(Dataset):
    def __init__(self, texts, labels):
        self.data = [torch.tensor(encode(t)) for t in texts]
        self.labels = torch.tensor(labels).float()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

def collate_fn(batch):
    data, labels = zip(*batch)
    lengths = torch.tensor([len(seq) for seq in data])
    padded = nn.utils.rnn.pad_sequence(data, batch_first=True, padding_value=0)
    return padded, lengths, torch.tensor(labels)

train_dataset = Dataset(train_df["text"].tolist(), train_df["label"].tolist())
test_dataset = Dataset(test_df["text"].tolist(), test_df["label"].tolist())
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn)

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = LSTMClassifier(vocab_size=len(word2idx), pad_idx=word2idx["<PAD>"]).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

print(f"Using device: {device}")

#### Training Loop

In [None]:
for epoch in range(3): # 3 epochs
    model.train()
    total_loss = 0
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/3")
    for x, lengths, y in pbar:
        x, lengths, y = x.to(device), lengths.to(device), y.to(device)
        optimizer.zero_grad()
        # Predict logits
        logits = model(x, lengths)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        pbar.set_postfix(loss=total_loss / (pbar.n + 1))
    print(f"Epoch {epoch+1}: Loss = {total_loss/len(train_loader):.4f}")

Using device: mps


Epoch 1/3: 100%|██████████| 2985/2985 [12:56<00:00,  3.84it/s, loss=0.494]


Epoch 1: Loss = 0.4942


Epoch 2/3: 100%|██████████| 2985/2985 [12:47<00:00,  3.89it/s, loss=0.372]


Epoch 2: Loss = 0.3722


Epoch 3/3: 100%|██████████| 2985/2985 [13:01<00:00,  3.82it/s, loss=0.309]

Epoch 3: Loss = 0.3092





#### Evaluate Model

In [17]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for x, lengths, y in tqdm(test_loader, desc="Evaluating"):
        x, lengths = x.to(device), lengths.to(device)
        logits = model(x, lengths)
        probs = torch.sigmoid(logits).squeeze()
        preds = (probs > 0.5).int().cpu().numpy()
        labels = y.int().cpu().numpy()

        all_preds.extend(preds)
        all_labels.extend(labels)

# Compute and print results
acc = accuracy_score(all_labels, all_preds)
prec = precision_score(all_labels, all_preds, zero_division=0)
rec = recall_score(all_labels, all_preds, zero_division=0)
f1 = f1_score(all_labels, all_preds, zero_division=0)

print(f"\nTest Accuracy:  {acc:.4f}")
print(f"Test Precision: {prec:.4f}")
print(f"Test Recall:    {rec:.4f}")
print(f"Test F1 Score:  {f1:.4f}")


Evaluating: 100%|██████████| 747/747 [00:46<00:00, 15.93it/s]



Test Accuracy:  0.8844
Test Precision: 0.8587
Test Recall:    0.9107
Test F1 Score:  0.8839
