In [23]:

import pandas as pd
import re
import torch
import torch.nn as nn
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [25]:
#For mac users
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
print(f"Using device: {device}")

Using device: mps


In [22]:
import torch

print(f"GPU available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU name: {torch.cuda.get_device_name(0)}")
else:
    print("GPU used: False")

GPU available: False
GPU used: False


# Prepare data

## Read input

In [2]:
input_file = './data.csv'
column_names = ['target','id','date','flag','user','text']
input_df = pd.read_csv(input_file, names=column_names,encoding='latin-1')[["id","target", "text"]] \
    .sample(100000)

## Download stopwords

In [3]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/weronikaskiba/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

## Clean input

In [4]:
transformed_df = input_df \
    .dropna() 
transformed_df['target'] = transformed_df['target'].map({0: 0, 4: 1}) # Normalize target values

## Stemming

In [5]:
def steeming_transform(text):
    stemmer = PorterStemmer()
    with_removed_non_letter_signs = re.sub('[^a-zA-Z]', ' ', text)
    with_lower_case = with_removed_non_letter_signs.lower()
    with_word_tokenization = with_lower_case.split()
    with_stemmed = [stemmer.stem(word) for word in with_word_tokenization if not word in set(stopwords.words('english'))]
    output = ' '.join(with_stemmed)
    return output

In [6]:
transformed_df['processed_text'] = transformed_df['text'].apply(steeming_transform)

## Indexing

In [7]:
X = transformed_df['processed_text'].values
Y = transformed_df['target'].values

In [8]:
from sklearn.feature_extraction.text import TfidfVectorizer
# converting the textual data to numerical data
vectorizer = TfidfVectorizer()
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, stratify=Y, random_state=2)
X_train = vectorizer.fit_transform(X_train).toarray()
X_test = vectorizer.transform(X_test).toarray()

## Dataset

In [9]:
class TweetDataset(Dataset):
    def __init__(self,x,y):
        self.x = x
        self.y = y
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self,index):
        return self.x[index],self.y[index]

In [10]:
test_dataset = TweetDataset(X_test,Y_test)
train_dataset = TweetDataset(X_train,Y_train)   

In [11]:
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)

In [12]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        lstm_out, _ = self.lstm(x, (h0, c0))
        output = self.fc(lstm_out[:, -1, :])
        return output


In [13]:
input_size = X_train.shape[1]
hidden_size = 64
output_size = 2   
num_layers = 1

model = LSTM(input_size, hidden_size, output_size, num_layers)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [14]:
for name, param in model.named_parameters():
    print(f"{name}: {param.requires_grad}")

lstm.weight_ih_l0: True
lstm.weight_hh_l0: True
lstm.bias_ih_l0: True
lstm.bias_hh_l0: True
fc.weight: True
fc.bias: True


In [15]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0  
    for batch_features, batch_labels in train_dataloader:
        batch_features = batch_features.unsqueeze(1).float()
        
        predictions = model(batch_features) 
        loss = criterion(predictions, batch_labels)  

        optimizer.zero_grad()  
        loss.backward()  
        optimizer.step() 
        epoch_loss += loss.item()  
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(train_dataloader):.4f}")

Epoch [1/10], Loss: 0.5222
Epoch [2/10], Loss: 0.3659
Epoch [3/10], Loss: 0.1984
Epoch [4/10], Loss: 0.0590
Epoch [5/10], Loss: 0.0223
Epoch [6/10], Loss: 0.0125
Epoch [7/10], Loss: 0.0105
Epoch [8/10], Loss: 0.0076
Epoch [9/10], Loss: 0.0069
Epoch [10/10], Loss: 0.0059


In [19]:
torch.save(model.state_dict(), 'model.pth')
print("Model saved to 'model.pth'")

Model saved to 'model.pth'


In [16]:
model.eval() 
test_loss = 0.0

with torch.no_grad(): 
    for batch_features, batch_labels in test_dataloader:
        batch_features = batch_features.unsqueeze(1).float()
        predictions = model(batch_features)
        loss = criterion(predictions, batch_labels)
        test_loss += loss.item()

print(f"Test Loss: {test_loss/len(test_dataloader):.4f}")


Test Loss: 2.4008


In [17]:
model.eval() 
test_loss = 0.0
correct_predictions = 0
total_predictions = 0

with torch.no_grad(): 
    for batch_features, batch_labels in test_dataloader:
        batch_features = batch_features.unsqueeze(1).float()

        # Forward pass
        predictions = model(batch_features)

        # Calculate loss
        loss = criterion(predictions, batch_labels)
        test_loss += loss.item()

        # For classification, get predicted class indices
        _, predicted_classes = torch.max(predictions, dim=1)

        # Count the number of correct predictions
        correct_predictions += (predicted_classes == batch_labels).sum().item()
        total_predictions += batch_labels.size(0)

# Calculate average loss
avg_loss = test_loss / len(test_dataloader)

# Calculate accuracy
accuracy = correct_predictions / total_predictions

print(f"Test Loss: {avg_loss:.4f}")
print(f"Test Accuracy: {accuracy * 100:.2f}%")


Test Loss: 2.4008
Test Accuracy: 71.34%


In [18]:
from sklearn.metrics import precision_score, recall_score, f1_score

model.eval() 
test_loss = 0.0
correct_predictions = 0
total_predictions = 0
all_predictions = []
all_labels = []

with torch.no_grad(): 
    for batch_features, batch_labels in test_dataloader:
        batch_features = batch_features.unsqueeze(1).float()

        # Forward pass
        predictions = model(batch_features)

        # Calculate loss
        loss = criterion(predictions, batch_labels)
        test_loss += loss.item()

        # Get predicted class indices
        _, predicted_classes = torch.max(predictions, dim=1)

        # Accumulate predictions and labels for later metric calculation
        all_predictions.extend(predicted_classes.cpu().numpy())
        all_labels.extend(batch_labels.cpu().numpy())

        # Count the number of correct predictions
        correct_predictions += (predicted_classes == batch_labels).sum().item()
        total_predictions += batch_labels.size(0)

# Calculate average loss
avg_loss = test_loss / len(test_dataloader)

# Calculate accuracy
accuracy = correct_predictions / total_predictions

# Calculate precision, recall, and F1 score (using sklearn)
precision = precision_score(all_labels, all_predictions, average='weighted')
recall = recall_score(all_labels, all_predictions, average='weighted')
f1 = f1_score(all_labels, all_predictions, average='weighted')

print(f"Test Loss: {avg_loss:.4f}")
print(f"Test Accuracy: {accuracy * 100:.2f}%")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1 Score: {f1:.4f}")


Test Loss: 2.4008
Test Accuracy: 71.34%
Test Precision: 0.7134
Test Recall: 0.7134
Test F1 Score: 0.7133
