# **Sentiment Analysis with Logistic Regression in Pytorch**

Motivated from Coursera NLP specialization Course 1 Week 1 Assignment "Sentiment analysis with logistic regression", only using nltk, numpy, pandas, etc., **which was not implemented in pytorch or tensorflow**.

- Credits:
  - Coursera Natural Language Processing Specialization by Deeplearning.ai Course 1 Week 1 Assignment (https://www.coursera.org/learn/classification-vector-spaces-in-nlp/) 
  - Introduction to Deep learning in Pytorch (Korean) (https://wikidocs.net/60037)


In [1]:
import nltk
nltk.download('twitter_samples')
nltk.download('stopwords')

[nltk_data] Downloading package twitter_samples to /root/nltk_data...
[nltk_data]   Package twitter_samples is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [2]:
from nltk.corpus import twitter_samples
# Select the set of positive and negative tweets
all_positive_tweets = twitter_samples.strings('positive_tweets.json')
all_negative_tweets = twitter_samples.strings('negative_tweets.json')

In [3]:
import torch

In [4]:
# Split the edata into two pieces, one for training and one for testing (validation set)
train_size = int(0.8 * len(all_positive_tweets))
test_size = len(all_positive_tweets) - train_size
train_pos, test_pos = torch.utils.data.random_split(all_positive_tweets, [train_size, test_size])

In [5]:
train_neg, test_neg = torch.utils.data.random_split(all_negative_tweets, [train_size, test_size])

In [6]:
train_x = train_pos + train_neg
test_x = test_pos + test_neg

In [7]:
# Combine positive and negative labels
train_y = torch.cat([torch.ones(len(train_pos), 1), torch.zeros(len(train_neg), 1)], dim=0)
test_y = torch.cat([torch.ones(len(test_pos), 1), torch.zeros(len(test_neg), 1)], dim=0)

In [8]:
# Print the shape train and test sets
print("train_y.shape = ", train_y.size())
print("test_y.shape = ", test_y.size())

train_y.shape =  torch.Size([8000, 1])
test_y.shape =  torch.Size([2000, 1])


In [9]:
import re
import string
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import TweetTokenizer

def process_tweet(tweet):
  """Process tweet function.
  Input:
    tweet: a string containing a tweet
  Output:
    tweets_clean: a list of words containing the processed tweet
  """
  stemmer = PorterStemmer()
  stopwords_english = stopwords.words('english')

  # remove stock market stickers like $GE
  tweet = re.sub(r'\$\w*', '', tweet)

  # remove RT
  tweet = re.sub(r'^RT[\s]+', '', tweet)

  # remove hyperlinks
  tweet = re.sub(r'https?:\/\/.*[\r\n]*', '', tweet)

  # remove hashtags
  # only removing the hash # sign from the word
  tweet = re.sub(r'#', '', tweet)

  #tokenize tweets
  tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True, reduce_len=True)
  tweet_tokens = tokenizer.tokenize(tweet)

  tweets_clean = []

  for word in tweet_tokens:
    if (word not in stopwords_english and word not in string.punctuation):
      stem_word = stemmer.stem(word)
      tweets_clean.append(stem_word)

  return tweets_clean


def build_freqs(tweets, labels):
  """Build frequencies.
  Input: 
    tweets: a list of tweets
    labels: an m x 1 array with the sentiment label of each tweet (0 or 1)
  Output:
    freqs: a dictionary mapping each (word, sentiment) pair to its frequency
  """

  labelList = torch.squeeze(labels).tolist()

  freqs = {}
  for label, tweet in zip(labelList, tweets):
    for word in process_tweet(tweet):
      pair = (word, label)
      if pair in freqs:
        freqs[pair] += 1
      else:
        freqs[pair] = 1
  return freqs

In [10]:
# create frequency dictionary
freqs = build_freqs(train_x, train_y)

# check the output
print("type(freqs) = " + str(type(freqs)))
print("len(freqs) = " + str(len(freqs.keys())))

type(freqs) = <class 'dict'>
len(freqs) = 11256


In [11]:
# test the function below
print('This is an example of a positive tweet: \n', train_x[0])
print('\nThis is an example of the processed version of the tweet: \n', process_tweet(train_x[0]))

This is an example of a positive tweet: 
 @JYGClub thanks admin :)

This is an example of the processed version of the tweet: 
 ['thank', 'admin', ':)']


In [12]:
def extract_features(tweet, freqs):
  """Extracting the features.
  Input:
    tweet: a list of words for one tweet
    freqs: a dictionary corresponding to the frequencies of each tuple(word, label)
  Output:
    x: a feature vector of dimension (1,3)
  """

  # process_tweet tokenizes, stems, and removes stopwords
  word_l = process_tweet(tweet)

  # 3 elements in the form of a 1 x 3 vector
  x = torch.zeros((1,3))

  # bias term is set to 1
  x[0,0] = 1

  for word in word_l:
    # increment the word count for positive label 1
    x[0,1] += freqs.get((word, 1), 0)
    # increment the word count for negative label 0
    x[0,2] += freqs.get((word, 0), 0)
  
  assert(x.size()== (1, 3))
  return x

In [13]:
# Check your function

# test 1
# test on training data
tmp1 = extract_features(train_x[0], freqs)
print(tmp1)

tensor([[1.0000e+00, 3.3400e+03, 8.7000e+01]])


In [14]:
# test 2:
# check for when the words are not in the freqs dictionary
tmp2 = extract_features('blorb bleeeeb bloooob', freqs)
print(tmp2)

tensor([[1., 0., 0.]])


In [15]:
# Training the model

# collect the features 'x' and stack them into a matrix 'X'
X = torch.zeros((len(train_x), 3))
for i in range(len(train_x)):
  X[i, :] = extract_features(train_x[i], freqs)

# training labels corresponding to X
Y = train_y

In [16]:
# source code below from -
# source code URL #1: https://wikidocs.net/60037

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

torch.manual_seed(1)

X = torch.FloatTensor(X)
Y = torch.FloatTensor(Y)

class LogisticRegression(nn.Module):
  def __init__(self, input_dim, output_dim):
    super().__init__()
    self.linear = nn.Linear(input_dim, output_dim)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    return self.sigmoid(self.linear(X))

model = LogisticRegression(3, 1)
optimizer = optim.SGD(model.parameters(), lr=1e-3)
num_iters = 1500

for epoch in range(num_iters + 1):
  
  model.train()
  hypothesis = model(X)
  cost = F.binary_cross_entropy(hypothesis, Y)

  optimizer.zero_grad()
  cost.backward()
  optimizer.step()

  if epoch % 100 == 0:
    prediction = hypothesis >= torch.FloatTensor([0.5])
    correct_prediction = prediction.float() == Y
    accuracy = correct_prediction.sum().item() / len(correct_prediction)
    print('Epoch {:4d}/{} Cost: {:.6F} Accuracy {:2.2f}%'.format(
        epoch, num_iters, cost.item(), accuracy * 100))


Epoch    0/1500 Cost: 48.874947 Accuracy 50.20%
Epoch  100/1500 Cost: 48.653866 Accuracy 50.20%
Epoch  200/1500 Cost: 47.923008 Accuracy 50.20%
Epoch  300/1500 Cost: 49.895924 Accuracy 50.00%
Epoch  400/1500 Cost: 49.895882 Accuracy 50.00%
Epoch  500/1500 Cost: 49.895836 Accuracy 50.00%
Epoch  600/1500 Cost: 49.895798 Accuracy 50.00%
Epoch  700/1500 Cost: 49.895748 Accuracy 50.00%
Epoch  800/1500 Cost: 49.895702 Accuracy 50.00%
Epoch  900/1500 Cost: 49.895657 Accuracy 50.00%
Epoch 1000/1500 Cost: 5.486577 Accuracy 94.10%
Epoch 1100/1500 Cost: 2.042902 Accuracy 97.72%
Epoch 1200/1500 Cost: 1.263322 Accuracy 98.58%
Epoch 1300/1500 Cost: 1.082209 Accuracy 98.72%
Epoch 1400/1500 Cost: 0.915523 Accuracy 98.88%
Epoch 1500/1500 Cost: 0.681667 Accuracy 99.06%


In [17]:
# Test the model

X = torch.zeros((len(test_x), 3))
for i in range(len(test_x)):
  X[i, :] = extract_features(test_x[i], freqs)

# training labels corresponding to X
Y = test_y

model.eval()
hypothesis = model(X)

prediction = hypothesis >= torch.FloatTensor([0.5])
correct_prediction = prediction.float() == Y
accuracy = correct_prediction.sum().item() / len(correct_prediction)

In [18]:
 print("Logistic regression model's accuracy =", accuracy * 100, "%")

Logistic regression model's accuracy = 98.6 %


In [20]:
my_tweet = 'This is a ridiculously bright movie. The plot was terrible and I was sad until the ending!'
print(my_tweet)
X = torch.zeros((1, 3))
X[0, :] = extract_features(my_tweet, freqs)

model.eval()
hypothesis = model(X)

prediction = hypothesis >= torch.FloatTensor([0.5])
print(f"- prediction: {'Positive sentiment' if prediction else 'Negative sentiment'}")

This is a ridiculously bright movie. The plot was terrible and I was sad until the ending!
- prediction: Negative sentiment


In [21]:
my_tweet = 'I\'m happy :)'
print(my_tweet)
X = torch.zeros((1, 3))
X[0, :] = extract_features(my_tweet, freqs)

model.eval()
hypothesis = model(X)

prediction = hypothesis >= torch.FloatTensor([0.5])
print(f"- prediction: {'Positive sentiment' if prediction else 'Negative sentiment'}")

I'm happy :)
- prediction: Positive sentiment
