In [1]:
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

# Step 1: Load the dataset
df = pd.read_csv('train.csv', encoding='latin-1')
df = df[['text', 'sentiment']]
df = df[df['sentiment'] != 'neutral']
df['sentiment'] = df['sentiment'].map({'negative': 0, 'positive': 1})
df = df.dropna(subset=['text'])

# Step 2: Clean the data
def clean_text(text):
    text = str(text)
    text = text.lower()
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r'[^a-z\s]', '', text)
    tokens = word_tokenize(text)
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(word) for word in tokens]
    return ' '.join(tokens)

df['cleaned_text'] = df['text'].apply(clean_text)

# Step 3: Preprocess the data using TensorFlow
max_vocab_size = 10000
max_sequence_length = 50

tokenizer = Tokenizer(num_words=max_vocab_size, oov_token='<OOV>')
tokenizer.fit_on_texts(df['cleaned_text'])
word_index = tokenizer.word_index

sequences = tokenizer.texts_to_sequences(df['cleaned_text'])
padded_sequences = pad_sequences(sequences, maxlen=max_sequence_length, padding='post', truncating='post')
labels = np.array(df['sentiment'])

X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)

# Step 4: Convert to PyTorch tensors and create DataLoader
X_train_tensor = torch.tensor(X_train, dtype=torch.long)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Step 5: Define the RNN model with Vanilla RNN
class SentimentRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(SentimentRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)  # Using Vanilla RNN
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        embedded = self.embedding(x)
        rnn_out, hidden = self.rnn(embedded) 
        out = self.fc(rnn_out[:, -1, :])
        out = self.sigmoid(out)
        return out

# Hyperparameters
vocab_size = min(max_vocab_size, len(word_index) + 1)
embedding_dim = 100
hidden_dim = 128
output_dim = 1

# Initialize model
model = SentimentRNN(vocab_size, embedding_dim, hidden_dim, output_dim)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training
num_epochs = 5
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X).squeeze()
        loss = criterion(outputs, batch_y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}')

# Evaluation
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X).squeeze()
        predictions = (outputs >= 0.5).float()
        total += batch_y.size(0)
        correct += (predictions == batch_y).sum().item()
    accuracy = correct / total
    print(f'Test Accuracy: {accuracy:.4f}')

# Step 6: Test on a new example
def predict_sentiment(text, tokenizer, model, max_sequence_length, device):
    cleaned_text = clean_text(text)
    sequence = tokenizer.texts_to_sequences([cleaned_text])
    padded = pad_sequences(sequence, maxlen=max_sequence_length, padding='post', truncating='post')
    tensor = torch.tensor(padded, dtype=torch.long).to(device)
    model.eval()
    with torch.no_grad():
        output = model(tensor).squeeze()
        prediction = (output >= 0.5).float().item()
    return 'Positive' if prediction == 1 else 'Negative'

new_tweet = "I love this beautiful day!"
print(f'Tweet: {new_tweet}')
print(f'Sentiment: {predict_sentiment(new_tweet, tokenizer, model, max_sequence_length, device)}')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\sarae\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\sarae\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\sarae\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\sarae\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Epoch [1/5], Loss: 0.6945
Epoch [2/5], Loss: 0.6928
Epoch [3/5], Loss: 0.6928
Epoch [4/5], Loss: 0.6929
Epoch [5/5], Loss: 0.6948
Test Accuracy: 0.5115
Tweet: I love this beautiful day!
Sentiment: Positive
