In [1]:
import numpy as np
import pandas as pd
import nltk
import re
from nltk.corpus import stopwords
from nltk.stem import LancasterStemmer
from nltk.stem.snowball import SnowballStemmer
from string import punctuation
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import torch
import torch.nn as nn

In [2]:
# Dataset from: https://github.com/JerryWeiAI/NewB/blob/master/README.md
f = open('political_affiliation_dataset.txt', 'r')
X = []
y = []
for line in f.readlines():
    X.append(line.split('\t')[1])
    y.append(int(line.split('\t')[0]))
X = np.array(X)
y = np.array(y)

In [3]:
# import numpy as np

# selected_samples = []
# selected_labels = []

# samples_per_label = 1000
# for label in range(11):
#     label_indices = np.where(y == label)[0]
#     np.random.shuffle(label_indices)
#     selected_indices = label_indices[:samples_per_label]
#     selected_samples.extend(X[selected_indices])
#     selected_labels.extend([label] * samples_per_label)
# X_sample = np.array(selected_samples)
# y_sample = np.array(selected_labels)

# 0 is liberal, 1 is neutral, 2 is conservative
def classify(x):
    if x <= 4:
        return 0
    elif x >= 6:
        return 2
    else:
        return 1
y = np.array(list(map(classify, y)))

In [4]:
train_df = pd.DataFrame({'text': X, 'label': y})
joined_text = []
stemmer = LancasterStemmer()
stopwords_punc = list(stopwords.words('english'))+list(punctuation)
for i in train_df.index:
    text = re.sub('[^a-zA-Z]', ' ', str(train_df['text'][i]))
    text = text.lower()
    text=re.sub("&lt;/?.*?&gt;"," &lt;&gt; ",text)
    text=re.sub("(\\d|\\W)+"," ",text)
    text = text.split()
    lem = SnowballStemmer("english")
    text = [lem.stem(word) for word in text if not word in stopwords_punc] 
    joined_text.append(" ".join(text))
X = np.array(joined_text)

In [5]:
label_0_indices = np.where(y == 0)[0]
label_1_indices = np.where(y == 1)[0]
label_2_indices = np.where(y == 2)[0]

num_samples_per_label = 2000

selected_label_0_indices = np.random.choice(label_0_indices, num_samples_per_label, replace=False)
selected_label_1_indices = np.random.choice(label_1_indices, num_samples_per_label, replace=False)
selected_label_2_indices = np.random.choice(label_2_indices, num_samples_per_label, replace=False)

selected_indices = np.concatenate((selected_label_0_indices, selected_label_1_indices, selected_label_2_indices))

X_sample = X[selected_indices]
y_sample = y[selected_indices]

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
from sklearn.feature_extraction.text import CountVectorizer
ngram_range = (2, 3)
vectorizer = CountVectorizer(analyzer='word', ngram_range=ngram_range)
X_ngrams = vectorizer.fit_transform(X_sample)

In [8]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X_ngrams, y_sample, test_size=0.3, random_state=42)

In [9]:
X_train = torch.tensor(X_train.toarray()).to(device)
y_train = torch.LongTensor(y_train).to(device)

In [10]:
import torch
from torch.utils.data import DataLoader, TensorDataset

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [12]:
class SimpleRNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleRNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.fc = nn.Linear(input_size, num_classes)  

    def forward(self, x):
        out = self.fc(x)  
        return out


input_size = X_train.shape[1]
hidden_size = 64 
num_classes = 3
model = SimpleRNNClassifier(input_size, hidden_size, num_classes)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
loss_fn = torch.nn.CrossEntropyLoss()

for epoch in range(50):
    model.train()
    for batch in train_loader:
        inputs, labels = batch

        # Cast the inputs to the correct data type (float)
        inputs = inputs.float().to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()


# Move test data to GPU
X_test = torch.tensor(X_test.toarray()).float().to(device)
y_test = torch.LongTensor(y_test).to(device)

# Evaluate the model on the test set
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for i in range(0, X_test.shape[0], 32):
        inputs = X_test[i:i+32]
        labels = y_test[i:i+32]

        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f"Test accuracy: {accuracy}")

Test accuracy: 0.39444444444444443
